package com.google.cloud.spanner.watcher.it;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.AsyncRunner;
import com.google.cloud.spanner.AsyncTransactionManager;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.DatabaseClientWithChangeSets;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerTableChangeSetPoller;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Arrays;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/spanner/watcher/it/ITSpannerTableChangeSetPollerTest.class */
public class ITSpannerTableChangeSetPollerTest {
    private static final Logger logger = Logger.getLogger(ITSpannerTableChangeSetPollerTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private volatile Timestamp lastCommitTimestamp;
    private final Queue<Struct> receivedChanges = new ConcurrentLinkedQueue();
    private volatile CountDownLatch latch = new CountDownLatch(0);

    @BeforeClass
    public static void setup() throws Exception {
        SpannerTestHelper.setupSpanner(env);
        database = env.createTestDb(ImmutableList.of("CREATE TABLE NUMBERS (ID INT64 NOT NULL, NAME STRING(100), CHANGE_SET_ID STRING(MAX)) PRIMARY KEY (ID)", "CREATE TABLE CHANGE_SETS (CHANGE_SET_ID STRING(MAX), COMMIT_TIMESTAMP TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (CHANGE_SET_ID)", "CREATE INDEX IDX_NUMBERS_CHANGE_SET ON NUMBERS (CHANGE_SET_ID)"));
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    @Before
    public void deleteRowsInNumbers() {
        env.getSpanner().getDatabaseClient(database.getId()).writeAtLeastOnce(ImmutableList.of(Mutation.delete("NUMBERS", KeySet.all())));
    }

    @Test
    public void testSpannerChangeSetPoller() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerTableChangeSetPoller build = SpannerTableChangeSetPoller.newBuilder(spanner, TableId.of(database.getId(), "NUMBERS")).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerTableChangeSetPollerTest.1
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                ITSpannerTableChangeSetPollerTest.logger.info(String.format("Received changed for table %s: %s", tableId, row.asStruct().toString()));
                ITSpannerTableChangeSetPollerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerTableChangeSetPollerTest.this.lastCommitTimestamp = timestamp;
                ITSpannerTableChangeSetPollerTest.this.latch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        DatabaseClient databaseClient = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        String uuid = UUID.randomUUID().toString();
        Timestamp writeAtLeastOnce = databaseClient.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("CHANGE_SETS").set("CHANGE_SET_ID").to(uuid)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(uuid)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(uuid)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(uuid)).build()));
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(3);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(uuid)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(uuid)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(uuid)).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(writeAtLeastOnce);
        this.latch = new CountDownLatch(2);
        String uuid2 = UUID.randomUUID().toString();
        Timestamp writeAtLeastOnce2 = databaseClient.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("CHANGE_SETS").set("CHANGE_SET_ID").to(uuid2)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(uuid2)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(uuid2)).build()));
        ImmutableList<Struct> drainChanges2 = drainChanges();
        Truth.assertThat(drainChanges2).hasSize(2);
        Truth.assertThat(drainChanges2).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(uuid2)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(uuid2)).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(writeAtLeastOnce2);
        this.latch = new CountDownLatch(2);
        String uuid3 = UUID.randomUUID().toString();
        Timestamp writeAtLeastOnce3 = databaseClient.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("CHANGE_SETS").set("CHANGE_SET_ID").to(uuid3)).set("COMMIT_TIMESTAMP").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(uuid3)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(uuid3)).build()));
        ImmutableList<Struct> drainChanges3 = drainChanges();
        Truth.assertThat(drainChanges3).hasSize(2);
        Truth.assertThat(drainChanges3).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(uuid3)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(uuid3)).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(writeAtLeastOnce3);
        databaseClient.writeAtLeastOnce(Arrays.asList(Mutation.delete("NUMBERS", Key.of(new Object[]{2L})), Mutation.delete("NUMBERS", Key.of(new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        build.stopAsync().awaitTerminated();
    }

    @Test
    public void testSpannerChangeSetPollerUsingChangeSetDatabaseClient() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerTableChangeSetPoller build = SpannerTableChangeSetPoller.newBuilder(spanner, TableId.of(database.getId(), "NUMBERS")).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerTableChangeSetPollerTest.2
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                ITSpannerTableChangeSetPollerTest.logger.info(String.format("Received changed for table %s: %s", tableId, row.asStruct().toString()));
                ITSpannerTableChangeSetPollerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerTableChangeSetPollerTest.this.lastCommitTimestamp = timestamp;
                ITSpannerTableChangeSetPollerTest.this.latch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        DatabaseClientWithChangeSets of = DatabaseClientWithChangeSets.of(spanner.getDatabaseClient(database.getId()));
        this.latch = new CountDownLatch(3);
        String newChangeSetId = of.newChangeSetId();
        Timestamp writeAtLeastOnce = of.writeAtLeastOnce(newChangeSetId, Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(newChangeSetId)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(newChangeSetId)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(newChangeSetId)).build()));
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(3);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("CHANGE_SET_ID").to(newChangeSetId)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("CHANGE_SET_ID").to(newChangeSetId)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("CHANGE_SET_ID").to(newChangeSetId)).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(writeAtLeastOnce);
        this.latch = new CountDownLatch(2);
        String newChangeSetId2 = of.newChangeSetId();
        Timestamp writeAtLeastOnce2 = of.writeAtLeastOnce(newChangeSetId2, Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(newChangeSetId2)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(newChangeSetId2)).build()));
        ImmutableList<Struct> drainChanges2 = drainChanges();
        Truth.assertThat(drainChanges2).hasSize(2);
        Truth.assertThat(drainChanges2).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("CHANGE_SET_ID").to(newChangeSetId2)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("CHANGE_SET_ID").to(newChangeSetId2)).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(writeAtLeastOnce2);
        this.latch = new CountDownLatch(2);
        String newChangeSetId3 = of.newChangeSetId();
        Timestamp writeAtLeastOnce3 = of.writeAtLeastOnce(newChangeSetId3, Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(newChangeSetId3)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(newChangeSetId3)).build()));
        ImmutableList<Struct> drainChanges3 = drainChanges();
        Truth.assertThat(drainChanges3).hasSize(2);
        Truth.assertThat(drainChanges3).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("CHANGE_SET_ID").to(newChangeSetId3)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("CHANGE_SET_ID").to(newChangeSetId3)).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(writeAtLeastOnce3);
        testMutationsWithTransactionRunner(of);
        testDmlWithTransactionRunner(of);
        testMutationsWithTransactionManager(of);
        testDmlWithTransactionManager(of);
        testMutationsWithAsyncRunner(of);
        testDmlWithAsyncRunner(of);
        testMutationsWithAsyncTransactionManager(of);
        testDmlWithAsyncTransactionManager(of);
        of.writeAtLeastOnce(Arrays.asList(Mutation.delete("NUMBERS", Key.of(new Object[]{2L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        build.stopAsync().awaitTerminated();
    }

    private void testMutationsWithTransactionRunner(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        this.latch = new CountDownLatch(2);
        final DatabaseClientWithChangeSets.TransactionRunnerWithChangeSet readWriteTransaction = databaseClientWithChangeSets.readWriteTransaction(new Options.TransactionOption[0]);
        readWriteTransaction.run(new TransactionRunner.TransactionCallable<Void>() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerTableChangeSetPollerTest.3
            /* renamed from: run, reason: merged with bridge method [inline-methods] */
            public Void m8run(TransactionContext transactionContext) throws Exception {
                transactionContext.buffer(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(readWriteTransaction.getChangeSetId())).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(readWriteTransaction.getChangeSetId())).build()));
                return null;
            }
        });
        Timestamp commitTimestamp = readWriteTransaction.getCommitTimestamp();
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(2);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(readWriteTransaction.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(readWriteTransaction.getChangeSetId())).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(commitTimestamp);
    }

    private void testDmlWithTransactionRunner(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        this.latch = new CountDownLatch(2);
        final DatabaseClientWithChangeSets.TransactionRunnerWithChangeSet readWriteTransaction = databaseClientWithChangeSets.readWriteTransaction(new Options.TransactionOption[0]);
        readWriteTransaction.run(new TransactionRunner.TransactionCallable<Void>() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerTableChangeSetPollerTest.4
            /* renamed from: run, reason: merged with bridge method [inline-methods] */
            public Void m9run(TransactionContext transactionContext) throws Exception {
                transactionContext.batchUpdate(Arrays.asList(((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id").bind("name").to("Tre")).bind("id").to(3L)).bind("changeSet").to(readWriteTransaction.getChangeSetId())).build(), ((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id").bind("name").to("Fire")).bind("id").to(4L)).bind("changeSet").to(readWriteTransaction.getChangeSetId())).build()), new Options.UpdateOption[0]);
                return null;
            }
        });
        Timestamp commitTimestamp = readWriteTransaction.getCommitTimestamp();
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(2);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tre")).set("CHANGE_SET_ID").to(readWriteTransaction.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Fire")).set("CHANGE_SET_ID").to(readWriteTransaction.getChangeSetId())).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(commitTimestamp);
    }

    private void testMutationsWithTransactionManager(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        this.latch = new CountDownLatch(2);
        DatabaseClientWithChangeSets.TransactionManagerWithChangeSet transactionManager = databaseClientWithChangeSets.transactionManager(new Options.TransactionOption[0]);
        try {
            TransactionContext begin = transactionManager.begin();
            while (true) {
                try {
                    begin.buffer(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(transactionManager.getChangeSetId())).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(transactionManager.getChangeSetId())).build()));
                    transactionManager.commit();
                    break;
                } catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000);
                    begin = transactionManager.resetForRetry();
                }
            }
            Timestamp commitTimestamp = transactionManager.getCommitTimestamp();
            ImmutableList<Struct> drainChanges = drainChanges();
            Truth.assertThat(drainChanges).hasSize(2);
            Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(transactionManager.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(transactionManager.getChangeSetId())).build()});
            Truth.assertThat(this.lastCommitTimestamp).isEqualTo(commitTimestamp);
            if (transactionManager != null) {
                transactionManager.close();
            }
        } catch (Throwable th) {
            if (transactionManager != null) {
                try {
                    transactionManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void testDmlWithTransactionManager(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        this.latch = new CountDownLatch(2);
        DatabaseClientWithChangeSets.TransactionManagerWithChangeSet transactionManager = databaseClientWithChangeSets.transactionManager(new Options.TransactionOption[0]);
        try {
            TransactionContext begin = transactionManager.begin();
            while (true) {
                try {
                    begin.batchUpdate(Arrays.asList(((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id").bind("name").to("Tres")).bind("id").to(3L)).bind("changeSet").to(transactionManager.getChangeSetId())).build(), ((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id").bind("name").to("Cuatro")).bind("id").to(4L)).bind("changeSet").to(transactionManager.getChangeSetId())).build()), new Options.UpdateOption[0]);
                    transactionManager.commit();
                    break;
                } catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000);
                    begin = transactionManager.resetForRetry();
                }
            }
            Timestamp commitTimestamp = transactionManager.getCommitTimestamp();
            ImmutableList<Struct> drainChanges = drainChanges();
            Truth.assertThat(drainChanges).hasSize(2);
            Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tres")).set("CHANGE_SET_ID").to(transactionManager.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Cuatro")).set("CHANGE_SET_ID").to(transactionManager.getChangeSetId())).build()});
            Truth.assertThat(this.lastCommitTimestamp).isEqualTo(commitTimestamp);
            if (transactionManager != null) {
                transactionManager.close();
            }
        } catch (Throwable th) {
            if (transactionManager != null) {
                try {
                    transactionManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void testMutationsWithAsyncRunner(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        this.latch = new CountDownLatch(2);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final DatabaseClientWithChangeSets.AsyncRunnerWithChangeSet runAsync = databaseClientWithChangeSets.runAsync(new Options.TransactionOption[0]);
        runAsync.runAsync(new AsyncRunner.AsyncWork<Void>() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerTableChangeSetPollerTest.5
            public ApiFuture<Void> doWorkAsync(TransactionContext transactionContext) {
                transactionContext.buffer(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(runAsync.getChangeSetId())).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(runAsync.getChangeSetId())).build()));
                return ApiFutures.immediateFuture((Object) null);
            }
        }, newSingleThreadExecutor);
        ApiFuture commitTimestamp = runAsync.getCommitTimestamp();
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(2);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("En")).set("CHANGE_SET_ID").to(runAsync.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Fem")).set("CHANGE_SET_ID").to(runAsync.getChangeSetId())).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(commitTimestamp.get());
        newSingleThreadExecutor.shutdown();
    }

    private void testDmlWithAsyncRunner(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        this.latch = new CountDownLatch(2);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        final DatabaseClientWithChangeSets.AsyncRunnerWithChangeSet runAsync = databaseClientWithChangeSets.runAsync(new Options.TransactionOption[0]);
        runAsync.runAsync(new AsyncRunner.AsyncWork<Void>() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerTableChangeSetPollerTest.6
            public ApiFuture<Void> doWorkAsync(TransactionContext transactionContext) {
                transactionContext.batchUpdateAsync(Arrays.asList(((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id").bind("name").to("Tre")).bind("id").to(3L)).bind("changeSet").to(runAsync.getChangeSetId())).build(), ((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id").bind("name").to("Fire")).bind("id").to(4L)).bind("changeSet").to(runAsync.getChangeSetId())).build()), new Options.UpdateOption[0]);
                return ApiFutures.immediateFuture((Object) null);
            }
        }, newSingleThreadExecutor);
        ApiFuture commitTimestamp = runAsync.getCommitTimestamp();
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(2);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tre")).set("CHANGE_SET_ID").to(runAsync.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Fire")).set("CHANGE_SET_ID").to(runAsync.getChangeSetId())).build()});
        Truth.assertThat(this.lastCommitTimestamp).isEqualTo(commitTimestamp.get());
        newSingleThreadExecutor.shutdown();
    }

    private void testMutationsWithAsyncTransactionManager(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        Timestamp timestamp;
        this.latch = new CountDownLatch(2);
        DatabaseClientWithChangeSets.AsyncTransactionManagerWithChangeSet transactionManagerAsync = databaseClientWithChangeSets.transactionManagerAsync(new Options.TransactionOption[0]);
        try {
            AsyncTransactionManager.TransactionContextFuture beginAsync = transactionManagerAsync.beginAsync();
            while (true) {
                try {
                    timestamp = beginAsync.then((transactionContext, r10) -> {
                        transactionContext.buffer(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(transactionManagerAsync.getChangeSetId())).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS").set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(transactionManagerAsync.getChangeSetId())).build()));
                        return ApiFutures.immediateFuture((Object) null);
                    }, MoreExecutors.directExecutor()).commitAsync().get();
                    break;
                } catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000);
                    beginAsync = transactionManagerAsync.resetForRetryAsync();
                }
            }
            ImmutableList<Struct> drainChanges = drainChanges();
            Truth.assertThat(drainChanges).hasSize(2);
            Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("Uno")).set("CHANGE_SET_ID").to(transactionManagerAsync.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("Cinque")).set("CHANGE_SET_ID").to(transactionManagerAsync.getChangeSetId())).build()});
            Truth.assertThat(this.lastCommitTimestamp).isEqualTo(timestamp);
            if (transactionManagerAsync != null) {
                transactionManagerAsync.close();
            }
        } catch (Throwable th) {
            if (transactionManagerAsync != null) {
                try {
                    transactionManagerAsync.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void testDmlWithAsyncTransactionManager(DatabaseClientWithChangeSets databaseClientWithChangeSets) throws Exception {
        Timestamp timestamp;
        this.latch = new CountDownLatch(2);
        DatabaseClientWithChangeSets.AsyncTransactionManagerWithChangeSet transactionManagerAsync = databaseClientWithChangeSets.transactionManagerAsync(new Options.TransactionOption[0]);
        try {
            AsyncTransactionManager.TransactionContextFuture beginAsync = transactionManagerAsync.beginAsync();
            while (true) {
                try {
                    String str = "UPDATE NUMBERS SET NAME=@name, CHANGE_SET_ID=@changeSet WHERE ID=@id";
                    timestamp = beginAsync.then((transactionContext, r11) -> {
                        return transactionContext.batchUpdateAsync(Arrays.asList(((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder(str).bind("name").to("Tres")).bind("id").to(3L)).bind("changeSet").to(transactionManagerAsync.getChangeSetId())).build(), ((Statement.Builder) ((Statement.Builder) ((Statement.Builder) Statement.newBuilder(str).bind("name").to("Cuatro")).bind("id").to(4L)).bind("changeSet").to(transactionManagerAsync.getChangeSetId())).build()), new Options.UpdateOption[0]);
                    }, MoreExecutors.directExecutor()).commitAsync().get();
                    break;
                } catch (AbortedException e) {
                    Thread.sleep(e.getRetryDelayInMillis() / 1000);
                    beginAsync = transactionManagerAsync.resetForRetryAsync();
                }
            }
            ImmutableList<Struct> drainChanges = drainChanges();
            Truth.assertThat(drainChanges).hasSize(2);
            Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("Tres")).set("CHANGE_SET_ID").to(transactionManagerAsync.getChangeSetId())).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("Cuatro")).set("CHANGE_SET_ID").to(transactionManagerAsync.getChangeSetId())).build()});
            Truth.assertThat(this.lastCommitTimestamp).isEqualTo(timestamp);
            if (transactionManagerAsync != null) {
                transactionManagerAsync.close();
            }
        } catch (Throwable th) {
            if (transactionManagerAsync != null) {
                try {
                    transactionManagerAsync.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private ImmutableList<Struct> drainChanges() throws Exception {
        Truth.assertThat(Boolean.valueOf(this.latch.await(10L, TimeUnit.SECONDS))).isTrue();
        ImmutableList<Struct> copyOf = ImmutableList.copyOf(this.receivedChanges);
        this.receivedChanges.clear();
        return copyOf;
    }
}
