/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.debezium.reader;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.util.SchemaNameAdjuster;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.StoppableChangeEventSourceContext;
import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnapshotSplitReader
implements DebeziumReader<SourceRecords, MySqlSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
    private final StatefulTaskContext statefulTaskContext;
    private final ExecutorService executorService;
    private final SnapshotPhaseHooks hooks;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile boolean currentTaskRunning;
    private volatile Throwable readException;
    private MySqlSnapshotSplitReadTask splitSnapshotReadTask;
    private MySqlSnapshotSplit currentSnapshotSplit;
    private SchemaNameAdjuster nameAdjuster;
    public AtomicBoolean hasNextElement;
    public AtomicBoolean reachEnd;
    private final StoppableChangeEventSourceContext changeEventSourceContext = new StoppableChangeEventSourceContext();
    private static final long READER_CLOSE_TIMEOUT = 30L;

    public SnapshotSplitReader(MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks hooks) {
        this(new StatefulTaskContext(sourceConfig, DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()), DebeziumUtils.createMySqlConnection(sourceConfig)), subtaskId, hooks);
    }

    public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId, SnapshotPhaseHooks hooks) {
        this.statefulTaskContext = statefulTaskContext;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subtaskId).setUncaughtExceptionHandler((thread, throwable) -> this.setReadException(throwable)).build();
        this.executorService = Executors.newSingleThreadExecutor(threadFactory);
        this.hooks = hooks;
        this.currentTaskRunning = false;
        this.hasNextElement = new AtomicBoolean(false);
        this.reachEnd = new AtomicBoolean(false);
    }

    public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
        this(statefulTaskContext, subtaskId, SnapshotPhaseHooks.empty());
    }

    @Override
    public void submitSplit(MySqlSplit mySqlSplit) {
        this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();
        this.statefulTaskContext.configure(this.currentSnapshotSplit);
        this.queue = this.statefulTaskContext.getQueue();
        this.nameAdjuster = this.statefulTaskContext.getSchemaNameAdjuster();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
        this.splitSnapshotReadTask = new MySqlSnapshotSplitReadTask(this.statefulTaskContext.getSourceConfig(), this.statefulTaskContext.getConnectorConfig(), this.statefulTaskContext.getSnapshotChangeEventSourceMetrics(), this.statefulTaskContext.getDatabaseSchema(), this.statefulTaskContext.getConnection(), this.statefulTaskContext.getDispatcher(), this.statefulTaskContext.getTopicSelector(), this.statefulTaskContext.getSnapshotReceiver(), StatefulTaskContext.getClock(), this.currentSnapshotSplit, this.hooks, this.statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill());
        this.executorService.execute(() -> {
            try {
                this.currentTaskRunning = true;
                SnapshotSplitChangeEventSourceContextImpl sourceContext = new SnapshotSplitChangeEventSourceContextImpl();
                SnapshotResult<MySqlOffsetContext> snapshotResult = this.snapshot(sourceContext);
                this.backfill(snapshotResult, sourceContext);
            }
            catch (Exception e) {
                this.setReadException(e);
            }
            finally {
                this.stopCurrentTask();
            }
        });
    }

    private SnapshotResult<MySqlOffsetContext> snapshot(SnapshotSplitChangeEventSourceContextImpl sourceContext) throws Exception {
        return this.splitSnapshotReadTask.execute((ChangeEventSource.ChangeEventSourceContext)sourceContext, this.statefulTaskContext.getMySqlPartition(), this.statefulTaskContext.getOffsetContext());
    }

    private void backfill(SnapshotResult<MySqlOffsetContext> snapshotResult, SnapshotSplitChangeEventSourceContextImpl sourceContext) throws Exception {
        MySqlBinlogSplit backfillBinlogSplit = this.createBackfillBinlogSplit(sourceContext);
        if (!this.isBackfillRequired(backfillBinlogSplit)) {
            this.dispatchBinlogEndEvent(backfillBinlogSplit);
            this.stopCurrentTask();
            return;
        }
        if (!snapshotResult.isCompletedOrSkipped()) {
            throw new IllegalStateException(String.format("Read snapshot for mysql split %s fail", this.currentSnapshotSplit));
        }
        MySqlBinlogSplitReadTask backfillBinlogReadTask = this.createBackfillBinlogReadTask(backfillBinlogSplit);
        MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(this.statefulTaskContext.getConnectorConfig());
        OffsetContext mySqlOffsetContext = loader.load((Map)backfillBinlogSplit.getStartingOffset().getOffset());
        backfillBinlogReadTask.execute((ChangeEventSource.ChangeEventSourceContext)this.changeEventSourceContext, this.statefulTaskContext.getMySqlPartition(), (MySqlOffsetContext)mySqlOffsetContext);
    }

    private boolean isBackfillRequired(MySqlBinlogSplit backfillBinlogSplit) {
        return !this.statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill() && backfillBinlogSplit.getEndingOffset().isAfter(backfillBinlogSplit.getStartingOffset());
    }

    private MySqlBinlogSplit createBackfillBinlogSplit(SnapshotSplitChangeEventSourceContextImpl sourceContext) {
        return new MySqlBinlogSplit(this.currentSnapshotSplit.splitId(), sourceContext.getLowWatermark(), sourceContext.getHighWatermark(), new ArrayList<FinishedSnapshotSplitInfo>(), this.currentSnapshotSplit.getTableSchemas(), 0);
    }

    private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(MySqlBinlogSplit backfillBinlogSplit) {
        Configuration dezConf = ((Configuration.Builder)this.statefulTaskContext.getSourceConfig().getDbzConfiguration().edit().with("table.include.list", this.currentSnapshotSplit.getTableId().toString()).with(Heartbeat.HEARTBEAT_INTERVAL, 0)).build();
        return new MySqlBinlogSplitReadTask(new MySqlConnectorConfig(dezConf), this.statefulTaskContext.getConnection(), this.statefulTaskContext.getDispatcher(), this.statefulTaskContext.getSignalEventDispatcher(), this.statefulTaskContext.getErrorHandler(), StatefulTaskContext.getClock(), this.statefulTaskContext.getTaskContext(), (MySqlStreamingChangeEventSourceMetrics)this.statefulTaskContext.getStreamingChangeEventSourceMetrics(), backfillBinlogSplit, event -> true);
    }

    private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit) throws InterruptedException {
        SignalEventDispatcher signalEventDispatcher = new SignalEventDispatcher(this.statefulTaskContext.getOffsetContext().getOffset(), this.statefulTaskContext.getTopicSelector().getPrimaryTopic(), this.statefulTaskContext.getDispatcher().getQueue());
        signalEventDispatcher.dispatchWatermarkEvent(backFillBinlogSplit, backFillBinlogSplit.getEndingOffset(), SignalEventDispatcher.WatermarkKind.BINLOG_END);
    }

    @Override
    public boolean isFinished() {
        return this.currentSnapshotSplit == null || !this.currentTaskRunning && !this.hasNextElement.get() && this.reachEnd.get();
    }

    @Override
    @Nullable
    public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
        this.checkReadException();
        if (this.hasNextElement.get()) {
            if (this.statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill()) {
                return this.pollWithoutBuffer();
            }
            return this.pollWithBuffer();
        }
        this.reachEnd.compareAndSet(false, true);
        return null;
    }

    public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException {
        this.checkReadException();
        List<DataChangeEvent> batch = this.queue.poll();
        ArrayList<SourceRecord> records = new ArrayList<SourceRecord>();
        for (DataChangeEvent event : batch) {
            if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
                this.hasNextElement.set(false);
                break;
            }
            records.add(event.getRecord());
        }
        return Collections.singletonList(new SourceRecords(records)).iterator();
    }

    public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
        boolean reachBinlogStart = false;
        boolean reachBinlogEnd = false;
        SourceRecord lowWatermark = null;
        SourceRecord highWatermark = null;
        HashMap<Struct, List<SourceRecord>> snapshotRecords = new HashMap<Struct, List<SourceRecord>>();
        block0: while (!reachBinlogEnd) {
            this.checkReadException();
            List<DataChangeEvent> batch = this.queue.poll();
            for (DataChangeEvent event : batch) {
                SourceRecord record = event.getRecord();
                if (lowWatermark == null) {
                    lowWatermark = record;
                    this.assertLowWatermark(lowWatermark);
                    continue;
                }
                if (highWatermark == null && RecordUtils.isHighWatermarkEvent(record)) {
                    highWatermark = record;
                    reachBinlogStart = true;
                    continue;
                }
                if (reachBinlogStart && RecordUtils.isEndWatermarkEvent(record)) {
                    reachBinlogEnd = true;
                    continue block0;
                }
                if (!reachBinlogStart) {
                    if (record.key() != null) {
                        snapshotRecords.put((Struct)record.key(), Collections.singletonList(record));
                        continue;
                    }
                    List records = snapshotRecords.computeIfAbsent((Struct)record.value(), key -> new LinkedList());
                    records.add(record);
                    continue;
                }
                RecordUtils.upsertBinlog(snapshotRecords, record, this.currentSnapshotSplit.getSplitKeyType(), this.nameAdjuster, this.currentSnapshotSplit.getSplitStart(), this.currentSnapshotSplit.getSplitEnd());
            }
        }
        this.hasNextElement.set(false);
        ArrayList<SourceRecord> normalizedRecords = new ArrayList<SourceRecord>();
        normalizedRecords.add(lowWatermark);
        normalizedRecords.addAll(RecordUtils.formatMessageTimestamp(snapshotRecords.values().stream().flatMap(Collection::stream).collect(Collectors.toList())));
        normalizedRecords.add(highWatermark);
        ArrayList<SourceRecords> sourceRecordsSet = new ArrayList<SourceRecords>();
        sourceRecordsSet.add(new SourceRecords(normalizedRecords));
        return sourceRecordsSet.iterator();
    }

    private void checkReadException() {
        if (this.readException != null) {
            throw new FlinkRuntimeException(String.format("Read split %s error due to %s.", this.currentSnapshotSplit, this.readException.getMessage()), this.readException);
        }
    }

    private void assertLowWatermark(SourceRecord lowWatermark) {
        Preconditions.checkState((boolean)RecordUtils.isLowWatermarkEvent(lowWatermark), (Object)String.format("The first record should be low watermark signal event, but actual is %s", lowWatermark));
    }

    private void setReadException(Throwable throwable) {
        this.stopCurrentTask();
        LOG.error(String.format("Execute snapshot read task for mysql split %s fail", this.currentSnapshotSplit), throwable);
        if (this.readException == null) {
            this.readException = throwable;
        } else {
            this.readException.addSuppressed(throwable);
        }
    }

    @Override
    public void close() {
        try {
            this.stopCurrentTask();
            if (this.statefulTaskContext != null) {
                this.statefulTaskContext.close();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    LOG.warn("Failed to close the snapshot split reader in {} seconds.", (Object)30L);
                }
            }
        }
        catch (Exception e) {
            LOG.error("Close snapshot reader error", (Throwable)e);
        }
    }

    private void stopCurrentTask() {
        this.currentTaskRunning = false;
        this.changeEventSourceContext.stopChangeEventSource();
    }

    @VisibleForTesting
    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    public static class SnapshotSplitChangeEventSourceContextImpl
    implements ChangeEventSource.ChangeEventSourceContext {
        private BinlogOffset lowWatermark;
        private BinlogOffset highWatermark;

        public BinlogOffset getLowWatermark() {
            return this.lowWatermark;
        }

        public void setLowWatermark(BinlogOffset lowWatermark) {
            this.lowWatermark = lowWatermark;
        }

        public BinlogOffset getHighWatermark() {
            return this.highWatermark;
        }

        public void setHighWatermark(BinlogOffset highWatermark) {
            this.highWatermark = highWatermark;
        }

        @Override
        public boolean isRunning() {
            return this.lowWatermark != null && this.highWatermark != null;
        }
    }
}

