/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.debezium.task.context;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.mysql.GtidSet;
import io.debezium.connector.mysql.GtidUtils;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.HistorizedRelationalDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler;
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlTaskContextImpl;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatefulTaskContext {
    private static final Logger LOG = LoggerFactory.getLogger(StatefulTaskContext.class);
    private static final int DEFAULT_BINLOG_QUEUE_SIZE_IN_SNAPSHOT_SCAN = 1024;
    private static final Clock clock = Clock.SYSTEM;
    private final MySqlSourceConfig sourceConfig;
    private final MySqlConnectorConfig connectorConfig;
    private final MySqlEventMetadataProvider metadataProvider;
    private final SchemaNameAdjuster schemaNameAdjuster;
    private final MySqlConnection connection;
    private final BinaryLogClient binaryLogClient;
    private MySqlDatabaseSchema databaseSchema;
    private MySqlTaskContextImpl taskContext;
    private MySqlOffsetContext offsetContext;
    private MySqlPartition mySqlPartition;
    private TopicSelector<TableId> topicSelector;
    private SnapshotChangeEventSourceMetrics<MySqlPartition> snapshotChangeEventSourceMetrics;
    private StreamingChangeEventSourceMetrics<MySqlPartition> streamingChangeEventSourceMetrics;
    private EventDispatcherImpl<TableId> dispatcher;
    private EventDispatcher.SnapshotReceiver<MySqlPartition> snapshotReceiver;
    private SignalEventDispatcher signalEventDispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private ErrorHandler errorHandler;

    public StatefulTaskContext(MySqlSourceConfig sourceConfig, BinaryLogClient binaryLogClient, MySqlConnection connection) {
        this.sourceConfig = sourceConfig;
        this.connectorConfig = sourceConfig.getMySqlConnectorConfig();
        this.schemaNameAdjuster = SchemaNameAdjuster.create();
        this.metadataProvider = new MySqlEventMetadataProvider();
        this.binaryLogClient = binaryLogClient;
        this.connection = connection;
    }

    public void configure(MySqlSplit mySqlSplit) {
        boolean tableIdCaseInsensitive = this.connection.isTableIdCaseSensitive();
        this.topicSelector = MySqlTopicSelector.defaultSelector(this.connectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), mySqlSplit.getTableSchemas().values());
        Optional.ofNullable(this.databaseSchema).ifPresent(HistorizedRelationalDatabaseSchema::close);
        this.databaseSchema = DebeziumUtils.createMySqlDatabaseSchema(this.connectorConfig, tableIdCaseInsensitive);
        this.mySqlPartition = new MySqlPartition(this.connectorConfig.getLogicalName());
        this.offsetContext = this.loadStartingOffsetState(new MySqlOffsetContext.Loader(this.connectorConfig), mySqlSplit);
        this.validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new MySqlTaskContextImpl(this.connectorConfig, this.databaseSchema, this.binaryLogClient);
        int queueSize = mySqlSplit.isSnapshotSplit() ? this.sourceConfig.getSplitSize() + 1024 : this.connectorConfig.getMaxQueueSize();
        this.queue = new ChangeEventQueue.Builder().pollInterval(this.connectorConfig.getPollInterval()).maxBatchSize(this.connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(this.connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("mysql-cdc-connector-task")).build();
        this.dispatcher = new EventDispatcherImpl<TableId>(this.connectorConfig, this.topicSelector, this.databaseSchema, this.queue, this.connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
        this.snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        this.signalEventDispatcher = new SignalEventDispatcher(this.offsetContext.getOffset(), this.topicSelector.getPrimaryTopic(), this.queue);
        MySqlChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new MySqlChangeEventSourceMetricsFactory(new MySqlStreamingChangeEventSourceMetrics(this.taskContext, (ChangeEventQueueMetrics)this.queue, (EventMetadataProvider)this.metadataProvider));
        this.snapshotChangeEventSourceMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.streamingChangeEventSourceMetrics = changeEventSourceMetricsFactory.getStreamingMetrics(this.taskContext, this.queue, this.metadataProvider);
        this.errorHandler = new MySqlErrorHandler(this.connectorConfig, this.queue, this.taskContext, this.sourceConfig);
    }

    private void validateAndLoadDatabaseHistory(MySqlOffsetContext offset, MySqlDatabaseSchema schema) {
        schema.initializeStorage();
        schema.recover(Offsets.of(this.mySqlPartition, offset));
    }

    protected MySqlOffsetContext loadStartingOffsetState(OffsetContext.Loader<MySqlOffsetContext> loader, MySqlSplit mySqlSplit) {
        BinlogOffset offset = mySqlSplit.isSnapshotSplit() ? BinlogOffset.ofEarliest() : BinlogOffsetUtils.initializeEffectiveOffset(mySqlSplit.asBinlogSplit().getStartingOffset(), this.connection);
        LOG.info("Starting offset is initialized to {}", (Object)offset);
        MySqlOffsetContext mySqlOffsetContext = loader.load(offset.getOffset());
        if (!this.isBinlogAvailable(mySqlOffsetContext)) {
            throw new IllegalStateException("The connector is trying to read binlog starting at " + mySqlOffsetContext.getSourceInfo() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
        }
        return mySqlOffsetContext;
    }

    private boolean isBinlogAvailable(MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();
        if (gtidStr != null) {
            return this.checkGtidSet(offset);
        }
        return this.checkBinlogFilename(offset);
    }

    private boolean checkGtidSet(MySqlOffsetContext offset) {
        String gtidStr = offset.gtidSet();
        if (gtidStr.trim().isEmpty()) {
            return true;
        }
        String availableGtidStr = this.connection.knownGtidSet();
        if (availableGtidStr == null || availableGtidStr.trim().isEmpty()) {
            LOG.warn("Connector used GTIDs previously, but MySQL does not know of any GTIDs or they are not enabled");
            return false;
        }
        GtidSet availableGtidSet = new GtidSet(availableGtidStr);
        LOG.info("Merging server GTID set {} with restored GTID set {}", (Object)availableGtidSet, (Object)gtidStr);
        GtidSet gtidSet = GtidUtils.fixRestoredGtidSet(availableGtidSet, new GtidSet(gtidStr));
        LOG.info("Merged GTID set is {}", (Object)gtidSet);
        if (gtidSet.isContainedWithin(availableGtidSet)) {
            LOG.info("MySQL current GTID set {} does contain the GTID set {} required by the connector.", (Object)availableGtidSet, (Object)gtidSet);
            GtidSet gtidSetToReplicate = this.connection.subtractGtidSet(availableGtidSet, gtidSet);
            GtidSet purgedGtidSet = this.connection.purgedGtidSet();
            LOG.info("Server has already purged {} GTIDs", (Object)purgedGtidSet);
            GtidSet nonPurgedGtidSetToReplicate = this.connection.subtractGtidSet(gtidSetToReplicate, purgedGtidSet);
            LOG.info("GTID set {} known by the server but not processed yet, for replication are available only GTID set {}", (Object)gtidSetToReplicate, (Object)nonPurgedGtidSetToReplicate);
            if (!gtidSetToReplicate.equals(nonPurgedGtidSetToReplicate)) {
                LOG.warn("Some of the GTIDs needed to replicate have been already purged");
                return false;
            }
            return true;
        }
        LOG.info("Connector last known GTIDs are {}, but MySQL has {}", (Object)gtidSet, (Object)availableGtidSet);
        return false;
    }

    private boolean checkBinlogFilename(MySqlOffsetContext offset) {
        String binlogFilename = offset.getSourceInfo().getString("file");
        if (binlogFilename == null) {
            return true;
        }
        if (binlogFilename.equals("")) {
            return true;
        }
        List<String> logNames = this.connection.availableBinlogFiles();
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            LOG.info("Connector requires binlog file '{}', but MySQL only has {}", (Object)binlogFilename, (Object)String.join((CharSequence)", ", logNames));
        } else {
            LOG.info("MySQL has the binlog file '{}' required by the connector", (Object)binlogFilename);
        }
        return found;
    }

    public static Clock getClock() {
        return clock;
    }

    public MySqlSourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    public MySqlConnectorConfig getConnectorConfig() {
        return this.connectorConfig;
    }

    public MySqlConnection getConnection() {
        return this.connection;
    }

    public BinaryLogClient getBinaryLogClient() {
        return this.binaryLogClient;
    }

    public MySqlDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public MySqlTaskContextImpl getTaskContext() {
        return this.taskContext;
    }

    public EventDispatcherImpl<TableId> getDispatcher() {
        return this.dispatcher;
    }

    public EventDispatcher.SnapshotReceiver<MySqlPartition> getSnapshotReceiver() {
        return this.snapshotReceiver;
    }

    public SignalEventDispatcher getSignalEventDispatcher() {
        return this.signalEventDispatcher;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public MySqlOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public MySqlPartition getMySqlPartition() {
        return this.mySqlPartition;
    }

    public TopicSelector<TableId> getTopicSelector() {
        return this.topicSelector;
    }

    public SnapshotChangeEventSourceMetrics<MySqlPartition> getSnapshotChangeEventSourceMetrics() {
        return this.snapshotChangeEventSourceMetrics;
    }

    public StreamingChangeEventSourceMetrics<MySqlPartition> getStreamingChangeEventSourceMetrics() {
        return this.streamingChangeEventSourceMetrics;
    }

    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return this.schemaNameAdjuster;
    }

    public static class MySqlEventMetadataProvider
    implements EventMetadataProvider {
        public static final String SERVER_ID_KEY = "server_id";
        public static final String GTID_KEY = "gtid";
        public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
        public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
        public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
        public static final String THREAD_KEY = "thread";
        public static final String QUERY_KEY = "query";

        @Override
        public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            Long timestamp = sourceInfo.getInt64("ts_ms");
            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
        }

        @Override
        public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return Collect.hashMapOf(BINLOG_FILENAME_OFFSET_KEY, sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY), BINLOG_POSITION_OFFSET_KEY, Long.toString(sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY)), BINLOG_ROW_IN_EVENT_OFFSET_KEY, Integer.toString(sourceInfo.getInt32(BINLOG_ROW_IN_EVENT_OFFSET_KEY)));
        }

        @Override
        public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            return ((MySqlOffsetContext)offset).getTransactionId();
        }
    }
}

