package com.google.cloud.spanner.watcher.it;

import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.FixedShardProvider;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerDatabaseChangeWatcher;
import com.google.cloud.spanner.watcher.SpannerDatabaseTailer;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.threeten.bp.Duration;

@RunWith(Parameterized.class)
@Category({StressIntegrationTest.class})
/* loaded from: input_file:com/google/cloud/spanner/watcher/it/ITSpannerDatabaseTailerStressTest.class */
public class ITSpannerDatabaseTailerStressTest {
    private static final String CREATE_INDEX = "CREATE INDEX IDX_SHARD_%s ON %s (%s)";

    @Parameterized.Parameter(0)
    public int changeCount;

    @Parameterized.Parameter(1)
    public int changeRunners;
    private static Database database;
    private final Object lock = new Object();
    private final ConcurrentMap<TestKey, Timestamp> lastWrittenTimestamps = new ConcurrentHashMap();
    private final ConcurrentMap<TestKey, Timestamp> lastReceivedTimestamps = new ConcurrentHashMap();
    private final AtomicInteger sentChanges = new AtomicInteger();
    private final Map<Long, Integer> idToShardValueMapping = new HashMap();
    private static final String[] TABLE_NAMES = {"TABLE1", "TABLE2", "TABLE3", "TABLE4", "TABLE5", "TABLE6", "TABLE7"};
    private static final String[] FIXED_SHARD_COLS = {"ShardInt64", "ShardFloat64", "ShardBool", "ShardString", "ShardBytes", "ShardDate", "ShardTimestamp"};
    private static final ImmutableList<Value[]> SHARD_VALUES = ImmutableList.of(new Value[]{Value.int64(1), Value.float64(3.14d), Value.bool(true), Value.string("EAST"), Value.bytes(ByteArray.copyFrom("EAST")), Value.date(Date.fromYearMonthDay(2020, 6, 5)), Value.timestamp(Timestamp.now())}, new Value[]{Value.int64(-1), Value.float64(6.662d), Value.bool(false), Value.string("WEST"), Value.bytes(ByteArray.copyFrom("WEST")), Value.date(Date.fromYearMonthDay(2019, 6, 5)), Value.timestamp(Timestamp.ofTimeSecondsAndNanos(10000, 0))});
    private static final Logger logger = Logger.getLogger(ITSpannerTableTailerStressTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();

    /* loaded from: input_file:com/google/cloud/spanner/watcher/it/ITSpannerDatabaseTailerStressTest$GenerateChangesCallable.class */
    final class GenerateChangesCallable implements Callable<Void> {
        private final Random rnd = new Random();
        private final DatabaseClient client;
        private final int numChanges;

        GenerateChangesCallable(DatabaseClient databaseClient, int i) {
            this.client = databaseClient;
            this.numChanges = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            Integer num;
            for (int i = 0; i < this.numChanges; i++) {
                LinkedList linkedList = new LinkedList();
                LinkedList linkedList2 = new LinkedList();
                for (int i2 = 0; i2 < ITSpannerDatabaseTailerStressTest.TABLE_NAMES.length; i2++) {
                    String str = ITSpannerDatabaseTailerStressTest.TABLE_NAMES[i2];
                    String str2 = ITSpannerDatabaseTailerStressTest.FIXED_SHARD_COLS[i2];
                    long nextInt = this.rnd.nextInt(ITSpannerDatabaseTailerStressTest.this.changeCount / 2);
                    synchronized (ITSpannerDatabaseTailerStressTest.this.lock) {
                        num = (Integer) ITSpannerDatabaseTailerStressTest.this.idToShardValueMapping.get(Long.valueOf(nextInt));
                        if (num == null) {
                            num = Integer.valueOf(this.rnd.nextInt(ITSpannerDatabaseTailerStressTest.SHARD_VALUES.size()));
                            ITSpannerDatabaseTailerStressTest.this.idToShardValueMapping.put(Long.valueOf(nextInt), num);
                        }
                    }
                    linkedList2.add(num);
                    linkedList.add(ITSpannerTableTailerStressTest.createRandomMutation(str, nextInt, str2, ((Value[]) ITSpannerDatabaseTailerStressTest.SHARD_VALUES.get(num.intValue()))[i2]));
                }
                Timestamp write = this.client.write(linkedList);
                ITSpannerDatabaseTailerStressTest.this.sentChanges.addAndGet(ITSpannerDatabaseTailerStressTest.TABLE_NAMES.length);
                synchronized (ITSpannerDatabaseTailerStressTest.this.lock) {
                    int i3 = 0;
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        TestKey testKey = new TestKey(TableId.of(ITSpannerDatabaseTailerStressTest.database.getId(), ITSpannerDatabaseTailerStressTest.TABLE_NAMES[i3]), ((Value[]) ITSpannerDatabaseTailerStressTest.SHARD_VALUES.get(((Integer) linkedList2.get(i3)).intValue()))[i3], Long.valueOf(((Value) ((Mutation) it.next()).asMap().get("ColInt64")).getInt64()));
                        Timestamp timestamp = (Timestamp) ITSpannerDatabaseTailerStressTest.this.lastWrittenTimestamps.get(testKey);
                        if (timestamp == null || write.compareTo(timestamp) > 0) {
                            ITSpannerDatabaseTailerStressTest.this.lastWrittenTimestamps.put(testKey, write);
                        }
                        i3++;
                    }
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/spanner/watcher/it/ITSpannerDatabaseTailerStressTest$TestKey.class */
    public static class TestKey {
        private final TableId tableId;
        private final Value shardId;
        private final Long id;

        TestKey(TableId tableId, Value value, Long l) {
            this.tableId = tableId;
            this.shardId = value;
            this.id = l;
        }

        public String toString() {
            return this.tableId.getTable() + ":" + this.shardId.toString() + ":" + this.id;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof TestKey)) {
                return false;
            }
            TestKey testKey = (TestKey) obj;
            return Objects.equals(this.tableId, testKey.tableId) && Objects.equals(this.shardId, testKey.shardId) && Objects.equals(this.id, testKey.id);
        }

        public int hashCode() {
            return Objects.hash(this.tableId, this.shardId, this.id);
        }
    }

    @Parameterized.Parameters(name = "change count= {0}, runners= {1}")
    public static Collection<Object[]> parameters() {
        ArrayList arrayList = new ArrayList();
        int i = 8;
        while (true) {
            int i2 = i;
            if (i2 > 256) {
                return arrayList;
            }
            int i3 = i2;
            while (true) {
                int i4 = i3;
                if (i4 <= 1024) {
                    arrayList.add(new Object[]{Integer.valueOf(i4), Integer.valueOf(i2)});
                    i3 = i4 * 2;
                }
            }
            i = i2 * 2;
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        Preconditions.checkState(TABLE_NAMES.length == FIXED_SHARD_COLS.length);
        UnmodifiableIterator it = SHARD_VALUES.iterator();
        while (it.hasNext()) {
            Preconditions.checkState(FIXED_SHARD_COLS.length == ((Value[]) it.next()).length);
        }
        SpannerTestHelper.setupSpanner(env);
        LinkedList linkedList = new LinkedList();
        for (String str : TABLE_NAMES) {
            linkedList.add(String.format("CREATE TABLE %s (\n  ColInt64       INT64       NOT NULL,\n  ColFloat64     FLOAT64     NOT NULL,\n  ColBool        BOOL        NOT NULL,\n  ColString      STRING(100) NOT NULL,\n  ColStringMax   STRING(MAX) NOT NULL,\n  ColJson        JSON        NOT NULL,\n  ColBytes       BYTES(100)  NOT NULL,\n  ColBytesMax    BYTES(MAX)  NOT NULL,\n  ColDate        DATE        NOT NULL,\n  ColTimestamp   TIMESTAMP   NOT NULL,\n  ColShardId     STRING(MAX)         ,\n  ShardInt64     INT64               ,\n  ShardFloat64   FLOAT64             ,\n  ShardBool      BOOL                ,\n  ShardString    STRING(100)         ,\n  ShardBytes     BYTES(100)          ,\n  ShardDate      DATE                ,\n  ShardTimestamp TIMESTAMP           ,\n  ColCommitTS    TIMESTAMP   NOT NULL OPTIONS (allow_commit_timestamp=true),\n  \n  ColInt64Array     ARRAY<INT64>,\n  ColFloat64Array   ARRAY<FLOAT64>,\n  ColBoolArray      ARRAY<BOOL>,\n  ColStringArray    ARRAY<STRING(100)>,\n  ColStringMaxArray ARRAY<STRING(MAX)>,\n  ColJsonArray      ARRAY<JSON>,\n  ColBytesArray     ARRAY<BYTES(100)>,\n  ColBytesMaxArray  ARRAY<BYTES(MAX)>,\n  ColDateArray      ARRAY<DATE>,\n  ColTimestampArray ARRAY<TIMESTAMP>\n) PRIMARY KEY (ColInt64)\n", str));
        }
        for (int i = 0; i < TABLE_NAMES.length; i++) {
            String str2 = TABLE_NAMES[i];
            linkedList.add(String.format(CREATE_INDEX, str2, str2, FIXED_SHARD_COLS[i]));
        }
        database = env.createTestDb(linkedList);
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int getTableIndex(String str) {
        for (int i = 0; i < TABLE_NAMES.length; i++) {
            if (TABLE_NAMES[i].equals(str)) {
                return i;
            }
        }
        return -1;
    }

    @After
    public void deleteTestData() {
        DatabaseClient databaseClient = env.getSpanner().getDatabaseClient(database.getId());
        for (String str : TABLE_NAMES) {
            databaseClient.write(Collections.singleton(Mutation.delete(str, KeySet.all())));
        }
        this.sentChanges.set(0);
        this.lastReceivedTimestamps.clear();
        this.lastWrittenTimestamps.clear();
    }

    @Test
    public void testStressSpannerTailer() throws Exception {
        System.out.printf("Starting test (changeCount=%d, runners=%d)\n", Integer.valueOf(this.changeCount), Integer.valueOf(this.changeRunners));
        Spanner spanner = env.getSpanner();
        final ListeningScheduledExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(this.changeRunners + 1));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        LinkedList linkedList = new LinkedList();
        UnmodifiableIterator it = SHARD_VALUES.iterator();
        while (it.hasNext()) {
            final Value[] valueArr = (Value[]) it.next();
            HashMap hashMap = new HashMap();
            for (int i = 0; i < TABLE_NAMES.length; i++) {
                hashMap.put(TableId.of(database.getId(), TABLE_NAMES[i]), FixedShardProvider.create(FIXED_SHARD_COLS[i], valueArr[i]));
            }
            SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(spanner, database.getId()).allTables().setPollInterval(Duration.ofMillis(1L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setShardProviders(hashMap).build();
            build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerDatabaseTailerStressTest.1
                public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                    TestKey testKey = new TestKey(tableId, valueArr[ITSpannerDatabaseTailerStressTest.getTableIndex(tableId.getTable())], Long.valueOf(row.getLong("ColInt64")));
                    synchronized (ITSpannerDatabaseTailerStressTest.this.lock) {
                        ITSpannerDatabaseTailerStressTest.this.lastReceivedTimestamps.put(testKey, timestamp);
                        if (ITSpannerDatabaseTailerStressTest.this.sentChanges.get() == ITSpannerDatabaseTailerStressTest.this.changeCount * ITSpannerDatabaseTailerStressTest.TABLE_NAMES.length && countDownLatch.getCount() > 0) {
                            if (ITSpannerDatabaseTailerStressTest.this.lastReceivedTimestamps.equals(ITSpannerDatabaseTailerStressTest.this.lastWrittenTimestamps)) {
                                countDownLatch.countDown();
                            } else {
                                listeningDecorator.schedule(new Runnable() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerDatabaseTailerStressTest.1.1
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        countDownLatch.countDown();
                                    }
                                }, 5L, TimeUnit.SECONDS);
                            }
                        }
                    }
                }
            });
            linkedList.add(build);
            build.startAsync();
        }
        Iterator it2 = linkedList.iterator();
        while (it2.hasNext()) {
            ((SpannerDatabaseChangeWatcher) it2.next()).awaitRunning();
        }
        System.out.printf("Change watcher started (changeCount=%d, runners=%d)\n", Integer.valueOf(this.changeCount), Integer.valueOf(this.changeRunners));
        Stopwatch createStarted = Stopwatch.createStarted();
        DatabaseClient databaseClient = spanner.getDatabaseClient(database.getId());
        ArrayList arrayList = new ArrayList(this.changeRunners);
        for (int i2 = 0; i2 < this.changeRunners; i2++) {
            arrayList.add(listeningDecorator.submit(new GenerateChangesCallable(databaseClient, this.changeCount / this.changeRunners)));
        }
        Futures.allAsList(arrayList).get(300L, TimeUnit.SECONDS);
        System.out.printf("Finished writing changes in %d seconds (changeCount=%d, runners=%d)\n", Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS)), Integer.valueOf(this.changeCount), Integer.valueOf(this.changeRunners));
        countDownLatch.await(300L, TimeUnit.SECONDS);
        System.out.printf("Finished test in %d seconds (changeCount=%d, runners=%d)\n", Long.valueOf(createStarted.elapsed(TimeUnit.SECONDS)), Integer.valueOf(this.changeCount), Integer.valueOf(this.changeRunners));
        Truth.assertThat(this.lastReceivedTimestamps).isEqualTo(this.lastWrittenTimestamps);
        Iterator it3 = linkedList.iterator();
        while (it3.hasNext()) {
            ((SpannerDatabaseChangeWatcher) it3.next()).stopAsync();
        }
        Iterator it4 = linkedList.iterator();
        while (it4.hasNext()) {
            ((SpannerDatabaseChangeWatcher) it4.next()).awaitTerminated();
        }
        listeningDecorator.shutdown();
    }
}
