package com.google.cloud.spanner.watcher.it;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Database;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Value;
import com.google.cloud.spanner.watcher.SpannerCommitTimestampRepository;
import com.google.cloud.spanner.watcher.SpannerDatabaseTailer;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.cloud.spanner.watcher.TableId;
import com.google.cloud.spanner.watcher.it.SpannerTestHelper;
import com.google.common.collect.ImmutableList;
import com.google.common.truth.Truth;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/spanner/watcher/it/ITSpannerDatabaseTailerTest.class */
public class ITSpannerDatabaseTailerTest {
    private static final Logger logger = Logger.getLogger(ITSpannerDatabaseTailerTest.class.getName());
    private static final SpannerTestHelper.ITSpannerEnv env = new SpannerTestHelper.ITSpannerEnv();
    private static Database database;
    private final Queue<Struct> receivedChanges = new ConcurrentLinkedQueue();
    private volatile CountDownLatch latch = new CountDownLatch(0);

    @BeforeClass
    public static void setup() throws Exception {
        SpannerTestHelper.setupSpanner(env);
        database = env.createTestDb(Arrays.asList("CREATE TABLE NUMBERS1 (ID INT64 NOT NULL, NAME STRING(100), LAST_MODIFIED TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (ID)", "CREATE TABLE NUMBERS2 (ID INT64 NOT NULL, NAME STRING(100), LAST_MODIFIED TIMESTAMP OPTIONS (allow_commit_timestamp=true)) PRIMARY KEY (ID)"));
        logger.info(String.format("Created database %s", database.getId().toString()));
    }

    @AfterClass
    public static void teardown() {
        SpannerTestHelper.teardownSpanner(env);
    }

    @Test
    public void testSpannerTailer() throws Exception {
        Spanner spanner = env.getSpanner();
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(spanner, database.getId()).allTables().setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, database.getId()).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.it.ITSpannerDatabaseTailerTest.1
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                ITSpannerDatabaseTailerTest.logger.info(String.format("Received changed for table %s: %s", tableId, row.asStruct().toString()));
                ITSpannerDatabaseTailerTest.this.receivedChanges.add(row.asStruct());
                ITSpannerDatabaseTailerTest.this.latch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        DatabaseClient databaseClient = spanner.getDatabaseClient(database.getId());
        this.latch = new CountDownLatch(3);
        Timestamp writeAtLeastOnce = databaseClient.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS1").set("ID").to(1L)).set("NAME").to("ONE")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS2").set("ID").to(2L)).set("NAME").to("TWO")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS1").set("ID").to(3L)).set("NAME").to("THREE")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> drainChanges = drainChanges();
        Truth.assertThat(drainChanges).hasSize(3);
        Truth.assertThat(drainChanges).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("ONE")).set("LAST_MODIFIED").to(writeAtLeastOnce)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(2L)).set("NAME").to("TWO")).set("LAST_MODIFIED").to(writeAtLeastOnce)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(3L)).set("NAME").to("THREE")).set("LAST_MODIFIED").to(writeAtLeastOnce)).build()});
        this.latch = new CountDownLatch(2);
        Timestamp writeAtLeastOnce2 = databaseClient.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS2").set("ID").to(4L)).set("NAME").to("FOUR")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newInsertOrUpdateBuilder("NUMBERS1").set("ID").to(5L)).set("NAME").to("FIVE")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> drainChanges2 = drainChanges();
        Truth.assertThat(drainChanges2).hasSize(2);
        Truth.assertThat(drainChanges2).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(4L)).set("NAME").to("FOUR")).set("LAST_MODIFIED").to(writeAtLeastOnce2)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("FIVE")).set("LAST_MODIFIED").to(writeAtLeastOnce2)).build()});
        this.latch = new CountDownLatch(2);
        Timestamp writeAtLeastOnce3 = databaseClient.writeAtLeastOnce(Arrays.asList(((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS1").set("ID").to(1L)).set("NAME").to("one")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build(), ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) ((Mutation.WriteBuilder) Mutation.newUpdateBuilder("NUMBERS1").set("ID").to(5L)).set("NAME").to("five")).set("LAST_MODIFIED").to(Value.COMMIT_TIMESTAMP)).build()));
        ImmutableList<Struct> drainChanges3 = drainChanges();
        Truth.assertThat(drainChanges3).hasSize(2);
        Truth.assertThat(drainChanges3).containsExactly(new Object[]{((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(1L)).set("NAME").to("one")).set("LAST_MODIFIED").to(writeAtLeastOnce3)).build(), ((Struct.Builder) ((Struct.Builder) ((Struct.Builder) Struct.newBuilder().set("ID").to(5L)).set("NAME").to("five")).set("LAST_MODIFIED").to(writeAtLeastOnce3)).build()});
        databaseClient.writeAtLeastOnce(Arrays.asList(Mutation.delete("NUMBERS2", Key.of(new Object[]{2L})), Mutation.delete("NUMBERS1", Key.of(new Object[]{3L}))));
        Thread.sleep(500L);
        Truth.assertThat(this.receivedChanges).isEmpty();
        build.stopAsync().awaitTerminated();
    }

    private ImmutableList<Struct> drainChanges() throws Exception {
        Truth.assertThat(Boolean.valueOf(this.latch.await(5L, TimeUnit.SECONDS))).isTrue();
        ImmutableList<Struct> copyOf = ImmutableList.copyOf(this.receivedChanges);
        this.receivedChanges.clear();
        return copyOf;
    }
}
