/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.sqlserver.SqlServerOffsetContext;
import io.debezium.connector.sqlserver.SqlServerPartition;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqlServerChangeEventSourceCoordinator
extends ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerChangeEventSourceCoordinator.class);
    private final Clock clock;
    private final Duration pollInterval;

    public SqlServerChangeEventSourceCoordinator(Offsets<SqlServerPartition, SqlServerOffsetContext> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType, CommonConnectorConfig connectorConfig, ChangeEventSourceFactory<SqlServerPartition, SqlServerOffsetContext> changeEventSourceFactory, ChangeEventSourceMetricsFactory<SqlServerPartition> changeEventSourceMetricsFactory, EventDispatcher<SqlServerPartition, ?> eventDispatcher, DatabaseSchema<?> schema, Clock clock) {
        super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory, changeEventSourceMetricsFactory, eventDispatcher, schema);
        this.clock = clock;
        this.pollInterval = connectorConfig.getPollInterval();
    }

    @Override
    protected void executeChangeEventSources(CdcSourceTaskContext taskContext, SnapshotChangeEventSource<SqlServerPartition, SqlServerOffsetContext> snapshotSource, Offsets<SqlServerPartition, SqlServerOffsetContext> previousOffsets, AtomicReference<LoggingContext.PreviousContext> previousLogContext, ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        Offsets streamingOffsets = Offsets.of(new HashMap());
        for (Map.Entry<SqlServerPartition, SqlServerOffsetContext> entry : previousOffsets) {
            SqlServerPartition partition = entry.getKey();
            SqlServerOffsetContext sqlServerOffsetContext = entry.getValue();
            previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition));
            SnapshotResult<SqlServerOffsetContext> snapshotResult = this.doSnapshot(snapshotSource, context, partition, sqlServerOffsetContext);
            if (!snapshotResult.isCompletedOrSkipped()) continue;
            streamingOffsets.getOffsets().put(partition, snapshotResult.getOffset());
        }
        previousLogContext.set(taskContext.configureLoggingContext("streaming"));
        for (Map.Entry<SqlServerPartition, SqlServerOffsetContext> entry : streamingOffsets) {
            this.initStreamEvents(entry.getKey(), entry.getValue());
        }
        Metronome metronome = Metronome.sleeper(this.pollInterval, this.clock);
        LOGGER.info("Starting streaming");
        while (context.isRunning()) {
            boolean bl = false;
            for (Map.Entry entry : streamingOffsets) {
                SqlServerPartition partition = (SqlServerPartition)entry.getKey();
                SqlServerOffsetContext previousOffset = (SqlServerOffsetContext)entry.getValue();
                previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
                if (!context.isRunning() || !this.streamingSource.executeIteration(context, partition, previousOffset)) continue;
                bl = true;
            }
            if (bl) continue;
            metronome.pause();
        }
        LOGGER.info("Finished streaming");
    }
}

