/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresChangeEventSourceFactory;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.schema.DatabaseSchema;
import java.sql.SQLException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresChangeEventSourceCoordinator
extends ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class);
    private final Snapshotter snapshotter;
    private final SlotState slotInfo;

    public PostgresChangeEventSourceCoordinator(Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig, PostgresChangeEventSourceFactory changeEventSourceFactory, ChangeEventSourceMetricsFactory<PostgresPartition> changeEventSourceMetricsFactory, EventDispatcher<PostgresPartition, ?> eventDispatcher, DatabaseSchema<?> schema, Snapshotter snapshotter, SlotState slotInfo) {
        super(previousOffsets, errorHandler, connectorType, connectorConfig, (ChangeEventSourceFactory)changeEventSourceFactory, changeEventSourceMetricsFactory, eventDispatcher, schema);
        this.snapshotter = snapshotter;
        this.slotInfo = slotInfo;
    }

    protected ChangeEventSourceCoordinator.CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSource.ChangeEventSourceContext context, SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> snapshotSource, PostgresPartition partition, PostgresOffsetContext previousOffset) throws InterruptedException {
        if (previousOffset != null && !this.snapshotter.shouldStreamEventsStartingFromSnapshot() && this.slotInfo != null) {
            try {
                this.setSnapshotStartLsn((PostgresSnapshotChangeEventSource)snapshotSource, previousOffset);
            }
            catch (SQLException e) {
                throw new DebeziumException("Failed to determine catch-up streaming stopping LSN");
            }
            LOGGER.info("Previous connector state exists and will stream events until {} then perform snapshot", (Object)previousOffset.getStreamingStoppingLsn());
            this.streamEvents(context, partition, (OffsetContext)previousOffset);
            return new ChangeEventSourceCoordinator.CatchUpStreamingResult((ChangeEventSourceCoordinator)this, true);
        }
        return new ChangeEventSourceCoordinator.CatchUpStreamingResult((ChangeEventSourceCoordinator)this, false);
    }

    private void setSnapshotStartLsn(PostgresSnapshotChangeEventSource snapshotSource, PostgresOffsetContext offsetContext) throws SQLException {
        snapshotSource.createSnapshotConnection();
        snapshotSource.setSnapshotTransactionIsolationLevel();
        snapshotSource.updateOffsetForPreSnapshotCatchUpStreaming(offsetContext);
    }
}

