package com.google.cloud.spanner.watcher;

import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.MockSpannerServiceImpl;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.watcher.SpannerTableChangeWatcher;
import com.google.common.truth.Truth;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.Type;
import com.google.spanner.v1.TypeCode;
import io.grpc.Status;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/spanner/watcher/SpannerDatabaseTailerTest.class */
public class SpannerDatabaseTailerTest extends AbstractMockServerTest {
    @Test
    public void testReceiveChanges() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(spanner, of).allTables().setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(30);
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.1
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(30);
    }

    @Test
    public void testTableNotFoundDuringInitialization() throws Exception {
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(getSpanner(), DatabaseId.of("p", "i", "d")).includeTables("Foo", new String[]{"Bar", "NonExistingTable"}).build();
        final SettableApiFuture create = SettableApiFuture.create();
        build.addListener(new ApiService.Listener() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.2
            public void failed(ApiService.State state, Throwable th) {
                if (state != ApiService.State.STARTING) {
                    create.setException(new AssertionError("expected from State to be STARTING"));
                }
                create.set(Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        build.startAsync();
        Truth.assertThat((Boolean) create.get(5L, TimeUnit.SECONDS)).isTrue();
    }

    @Test
    public void testTableDeleted() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(spanner, of).allTables().setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(30);
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.3
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        final SettableApiFuture create = SettableApiFuture.create();
        build.addListener(new ApiService.Listener() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.4
            public void failed(ApiService.State state, Throwable th) {
                if (state != ApiService.State.RUNNING) {
                    create.setException(new AssertionError("expected from State to be RUNNING"));
                }
                create.set(Boolean.TRUE);
            }
        }, MoreExecutors.directExecutor());
        build.startAsync().awaitRunning();
        countDownLatch.await(20L, TimeUnit.SECONDS);
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(30);
        Level level = SpannerTableTailer.logger.getLevel();
        try {
            SpannerTableTailer.logger.setLevel(Level.OFF);
            mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.exception(getCurrentBarPollStatement(), Status.NOT_FOUND.withDescription("Table not found").asRuntimeException()));
            Truth.assertThat((Boolean) create.get(5L, TimeUnit.SECONDS)).isTrue();
            SpannerTableTailer.logger.setLevel(level);
            Truth.assertThat(build.state()).isEqualTo(ApiService.State.FAILED);
        } catch (Throwable th) {
            SpannerTableTailer.logger.setLevel(level);
            throw th;
        }
    }

    @Test
    public void testCustomExecutor() throws Exception {
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(spanner, of).allTables().setExecutor(newSingleThreadScheduledExecutor).setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).build();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(30);
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.5
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(30);
        Truth.assertThat(Boolean.valueOf(newSingleThreadScheduledExecutor.isShutdown())).isFalse();
        newSingleThreadScheduledExecutor.shutdown();
    }

    @Test
    public void testBuildWithoutTables() {
        try {
            SpannerDatabaseTailer.newBuilder(getSpanner(), DatabaseId.of("p", "i", "d")).includeTables((String) null, new String[0]).build();
            Assert.fail("missing expected exception");
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void testTableNotFound() {
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(getSpanner(), DatabaseId.of("p", "i", "d")).includeTables("NonExistingTable", new String[0]).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.6
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                Assert.fail("Received unexpected row change");
            }
        });
        try {
            build.startAsync().awaitRunning();
            Assert.fail("missing expected exception");
        } catch (IllegalStateException e) {
            Truth.assertThat(e.getCause()).isInstanceOf(SpannerException.class);
            Truth.assertThat(e.getCause().getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
        }
    }

    @Test
    public void testNoTablesFound() {
        SpannerDatabaseTailer build = SpannerDatabaseTailer.newBuilder(getSpanner(), DatabaseId.of("p", "i", "d")).allTables().except(new String[]{"Foo", "Bar"}).build();
        build.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.7
            public void rowChange(TableId tableId, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                Assert.fail("Received unexpected row change");
            }
        });
        try {
            build.startAsync().awaitRunning();
            Assert.fail("missing expected exception");
        } catch (IllegalStateException e) {
            Truth.assertThat(e.getCause()).isInstanceOf(SpannerException.class);
            Truth.assertThat(e.getCause().getErrorCode()).isEqualTo(ErrorCode.NOT_FOUND);
        }
    }

    @Test
    public void testCustomCommitTimestampColumn() throws Exception {
        Timestamp now = Timestamp.now();
        ResultSetMetadata build = RandomResultSetGenerator.METADATA.toBuilder().setRowType(RandomResultSetGenerator.METADATA.getRowType().toBuilder().setFields(RandomResultSetGenerator.METADATA.getRowType().getFieldsCount() - 1, StructType.Field.newBuilder().setName("AlternativeCommitTS").setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()).build()).build()).build();
        Statement build2 = ((Statement.Builder) ((Statement.Builder) Statement.newBuilder("SELECT *\nFROM `Foo`\nWHERE `AlternativeCommitTS`>@prevCommitTimestamp\nORDER BY `AlternativeCommitTS`, `COL0`\nLIMIT @limit").bind("limit").to(10000L)).bind("prevCommitTimestamp").to(Timestamp.MIN_VALUE)).build();
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query(build2, new RandomResultSetGenerator(1).generateWithFixedCommitTimestamp(now).toBuilder().setMetadata(build).build()));
        mockSpanner.putStatementResult(MockSpannerServiceImpl.StatementResult.query(((Statement.Builder) build2.toBuilder().bind("prevCommitTimestamp").to(now)).build(), new RandomResultSetGenerator(0).generate().toBuilder().setMetadata(build).build()));
        Spanner spanner = getSpanner();
        DatabaseId of = DatabaseId.of("p", "i", "d");
        SpannerDatabaseTailer build3 = SpannerDatabaseTailer.newBuilder(spanner, of).allTables().setPollInterval(Duration.ofMillis(10L)).setCommitTimestampRepository(SpannerCommitTimestampRepository.newBuilder(spanner, of).setInitialCommitTimestamp(Timestamp.MIN_VALUE).build()).setCommitTimestampColumnFunction(tableId -> {
            if (tableId.getTable().equals("Foo")) {
                return "AlternativeCommitTS";
            }
            return null;
        }).build();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(21);
        build3.addCallback(new SpannerTableChangeWatcher.RowChangeCallback() { // from class: com.google.cloud.spanner.watcher.SpannerDatabaseTailerTest.8
            public void rowChange(TableId tableId2, SpannerTableChangeWatcher.Row row, Timestamp timestamp) {
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
            }
        });
        build3.startAsync().awaitRunning();
        countDownLatch.await(5L, TimeUnit.SECONDS);
        build3.stopAsync().awaitTerminated();
        Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(21);
    }
}
