/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.binlog;

import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDatabaseSchema;
import io.debezium.connector.binlog.BinlogOffsetContext;
import io.debezium.connector.binlog.BinlogPartition;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.binlog.metrics.BinlogSnapshotChangeEventSourceMetrics;
import io.debezium.function.BlockingConsumer;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Strings;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BinlogSnapshotChangeEventSource<P extends BinlogPartition, O extends BinlogOffsetContext<?>>
extends RelationalSnapshotChangeEventSource<P, O> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BinlogSnapshotChangeEventSource.class);
    private final BinlogConnectorConfig connectorConfig;
    private final BinlogConnectorConnection connection;
    private final RelationalTableFilters filters;
    private final BinlogSnapshotChangeEventSourceMetrics<P> metrics;
    private final BinlogDatabaseSchema<P, O, ?, ?> databaseSchema;
    private final Set<SchemaChangeEvent> schemaEvents = new LinkedHashSet<SchemaChangeEvent>();
    private final BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor;
    private final Runnable preSnapshotAction;
    private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
    private long globalLockAcquiredAt = -1L;
    private long tableLockAcquiredAt = -1L;

    public BinlogSnapshotChangeEventSource(BinlogConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory<BinlogConnectorConnection> connectionFactory, BinlogDatabaseSchema<P, O, ?, ?> schema, EventDispatcher<P, TableId> dispatcher, Clock clock, BinlogSnapshotChangeEventSourceMetrics<P> metrics, BlockingConsumer<Function<SourceRecord, SourceRecord>> lastEventProcessor, Runnable preSnapshotAction, NotificationService<P, O> notificationService, SnapshotterService snapshotterService) {
        super((RelationalDatabaseConnectorConfig)connectorConfig, connectionFactory, schema, dispatcher, clock, metrics, notificationService, snapshotterService);
        this.connectorConfig = connectorConfig;
        this.connection = (BinlogConnectorConnection)connectionFactory.mainConnection();
        this.filters = connectorConfig.getTableFilters();
        this.metrics = metrics;
        this.databaseSchema = schema;
        this.lastEventProcessor = lastEventProcessor;
        this.preSnapshotAction = preSnapshotAction;
    }

    protected AbstractSnapshotChangeEventSource.SnapshotContext<P, O> prepare(P partition, boolean onDemand) {
        return new BinlogSnapshotContext(partition, onDemand);
    }

    protected Set<TableId> getAllTableIds(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> ctx) throws Exception {
        LOGGER.info("Read list of available databases");
        List<String> databaseNames = this.connection.availableDatabases();
        LOGGER.info("\t list of available databases is: {}", databaseNames);
        LOGGER.info("Read list of available tables in each database");
        HashSet<TableId> tableIds = new HashSet<TableId>();
        HashSet<String> readableDatabaseNames = new HashSet<String>();
        for (String dbName : databaseNames) {
            try {
                this.connection.query("SHOW FULL TABLES IN " + this.quote(dbName) + " where Table_Type = 'BASE TABLE'", rs -> {
                    while (rs.next()) {
                        TableId id = new TableId(dbName, null, rs.getString(1));
                        tableIds.add(id);
                    }
                });
                readableDatabaseNames.add(dbName);
            }
            catch (SQLException e) {
                LOGGER.warn("\t skipping database '{}' due to error reading tables: {}", (Object)dbName, (Object)e.getMessage());
            }
        }
        Set includedDatabaseNames = readableDatabaseNames.stream().filter(this.filters.databaseFilter()).collect(Collectors.toSet());
        LOGGER.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);
        return tableIds;
    }

    protected void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext) throws SQLException {
        this.connection.connection().setTransactionIsolation(4);
        this.connection.executeWithoutCommitting(new String[]{"SET SESSION lock_wait_timeout=" + this.connectorConfig.snapshotLockTimeout().getSeconds()});
        try {
            this.connection.executeWithoutCommitting(new String[]{"SET SESSION innodb_lock_wait_timeout=" + this.connectorConfig.snapshotLockTimeout().getSeconds()});
        }
        catch (SQLException e) {
            LOGGER.warn("Unable to set innodb_lock_wait_timeout", (Throwable)e);
        }
        if (this.connectorConfig.getSnapshotLockingStrategy().isLockingEnabled() && this.connectorConfig.isGlobalLockUseRequested()) {
            block6: {
                try {
                    this.globalLock();
                    this.metrics.setGlobalLockAcquired();
                }
                catch (SQLException e) {
                    LOGGER.info("Unable to flush and acquire global read lock, will use table read locks after reading table names");
                    if ($assertionsDisabled || !this.isGloballyLocked()) break block6;
                    throw new AssertionError();
                }
            }
            if (this.connectorConfig.getSnapshotLockingStrategy().isIsolationLevelResetOnFlush()) {
                this.connection.executeWithoutCommitting(new String[]{"SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"});
            }
        }
    }

    protected void releaseSchemaSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext) throws SQLException {
        if (this.connectorConfig.getSnapshotLockingStrategy().isMinimalLockingEnabled()) {
            if (this.isGloballyLocked()) {
                this.globalUnlock();
            }
            if (this.isTablesLocked()) {
                LOGGER.warn("Tables were locked explicitly, but to get a consistent snapshot we cannot release the locks until we've read all tables.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void releaseDataSnapshotLocks(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext) throws Exception {
        if (this.isGloballyLocked()) {
            this.globalUnlock();
        }
        if (this.isTablesLocked()) {
            this.tableUnlock();
            if (!this.delayedSchemaSnapshotTables.isEmpty()) {
                this.schemaEvents.clear();
                if (this.connectorConfig.getSnapshotLockingStrategy().isLockingEnabled()) {
                    this.createSchemaEventsForTables(snapshotContext, this.delayedSchemaSnapshotTables, false);
                } else {
                    int snapshotMaxThreads = this.connectionPool.size();
                    LOGGER.info("Creating delayed schema snapshot worker pool with {} worker thread(s)", (Object)snapshotMaxThreads);
                    ExecutorService executorService = Executors.newFixedThreadPool(snapshotMaxThreads);
                    try {
                        this.createSchemaEventsForTables(snapshotContext, this.delayedSchemaSnapshotTables, false, executorService);
                    }
                    finally {
                        executorService.shutdownNow();
                    }
                }
                for (SchemaChangeEvent event : this.schemaEvents) {
                    if (this.databaseSchema.storeOnlyCapturedTables() && event.getDatabase() != null && !event.getDatabase().isEmpty() && !this.connectorConfig.getTableFilters().databaseFilter().test(event.getDatabase())) {
                        LOGGER.debug("Skipping schema event as it belongs to a non-captured database: '{}'", (Object)event);
                        continue;
                    }
                    LOGGER.debug("Processing schema event {}", (Object)event);
                    TableId tableId = event.getTables().isEmpty() ? null : ((Table)event.getTables().iterator().next()).id();
                    ((BinlogOffsetContext)snapshotContext.offset).event((DataCollectionId)tableId, this.getClock().currentTime());
                    this.dispatcher.dispatchSchemaChangeEvent((Partition)((BinlogPartition)snapshotContext.partition), snapshotContext.offset, (DataCollectionId)tableId, receiver -> receiver.schemaChangeEvent(event));
                }
                this.databaseSchema.tableIds().forEach(x -> snapshotContext.tables.overwriteTable(this.databaseSchema.tableFor((TableId)x)));
            }
        }
    }

    protected void determineSnapshotOffset(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> ctx, O previousOffset) throws Exception {
        if (!this.isGloballyLocked() && !this.isTablesLocked() && this.connectorConfig.getSnapshotLockingStrategy().isLockingEnabled()) {
            return;
        }
        if (previousOffset != null) {
            ctx.offset = previousOffset;
            this.tryStartingSnapshot(ctx);
            return;
        }
        O offsetContext = this.getInitialOffsetContext(this.connectorConfig);
        ctx.offset = offsetContext;
        this.setOffsetContextBinlogPositionAndGtidDetailsForSnapshot(offsetContext, this.connection, this.snapshotterService);
        this.tryStartingSnapshot(ctx);
    }

    protected abstract O getInitialOffsetContext(BinlogConnectorConfig var1);

    protected abstract void setOffsetContextBinlogPositionAndGtidDetailsForSnapshot(O var1, BinlogConnectorConnection var2, SnapshotterService var3) throws Exception;

    private void addSchemaEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, String database, String ddl) {
        List<SchemaChangeEvent> schemaChangeEvents = this.databaseSchema.parseSnapshotDdl((BinlogPartition)snapshotContext.partition, ddl, database, (BinlogOffsetContext)snapshotContext.offset, this.clock.currentTimeAsInstant());
        this.schemaEvents.addAll(new LinkedHashSet<SchemaChangeEvent>(schemaChangeEvents));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void readTableStructure(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, O offsetContext, SnapshottingTask snapshottingTask) throws Exception {
        Set capturedSchemaTables;
        if (this.twoPhaseSchemaSnapshot()) {
            this.tableLock(snapshotContext);
            this.determineSnapshotOffset(snapshotContext, offsetContext);
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Table level locking is in place, the schema will be capture in two phases, now capturing: {}", (Object)capturedSchemaTables);
            this.delayedSchemaSnapshotTables = Collect.minus((Set)snapshotContext.capturedSchemaTables, (Set)snapshotContext.capturedTables);
            LOGGER.info("Tables for delayed schema capture: {}", this.delayedSchemaSnapshotTables);
        }
        if (this.databaseSchema.storeOnlyCapturedTables()) {
            capturedSchemaTables = snapshotContext.capturedTables;
            LOGGER.info("Only captured tables schema should be captured, capturing: {}", (Object)capturedSchemaTables);
        } else {
            capturedSchemaTables = snapshotContext.capturedSchemaTables;
            LOGGER.info("All eligible tables schema should be captured, capturing: {}", (Object)capturedSchemaTables);
        }
        Map tablesToRead = capturedSchemaTables.stream().collect(Collectors.groupingBy(TableId::catalog, LinkedHashMap::new, Collectors.toList()));
        Set databases = tablesToRead.keySet();
        if (!snapshottingTask.isOnDemand()) {
            this.addSchemaEvent(snapshotContext, "", this.connection.setStatementFor(this.connection.readCharsetSystemVariables()));
        }
        for (TableId tableId : capturedSchemaTables) {
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while emitting initial DROP TABLE events");
            }
            this.addSchemaEvent(snapshotContext, tableId.catalog(), "DROP TABLE IF EXISTS " + this.quote(tableId));
        }
        Map<String, BinlogConnectorConnection.DatabaseLocales> databaseCharsets = this.connection.readDatabaseCollations();
        ExecutorService executorService = null;
        if (!this.connectorConfig.getSnapshotLockingStrategy().isLockingEnabled()) {
            int snapshotMaxThreads = this.connectionPool.size();
            LOGGER.info("Creating schema snapshot worker pool with {} worker thread(s)", (Object)snapshotMaxThreads);
            executorService = Executors.newFixedThreadPool(snapshotMaxThreads);
        }
        try {
            for (String database : databases) {
                if (!sourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while reading structure of schema " + databases);
                }
                if (!snapshottingTask.isOnDemand()) {
                    LOGGER.info("Reading structure of database '{}'", (Object)database);
                    this.addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + this.quote(database));
                    StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + this.quote(database));
                    BinlogConnectorConnection.DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
                    if (defaultDatabaseLocales != null) {
                        defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl);
                    }
                    this.addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString());
                    this.addSchemaEvent(snapshotContext, database, "USE " + this.quote(database));
                }
                if (this.connectorConfig.getSnapshotLockingStrategy().isLockingEnabled()) {
                    this.createSchemaEventsForTables(snapshotContext, (Collection)tablesToRead.get(database), true);
                    continue;
                }
                assert (executorService != null);
                this.createSchemaEventsForTables(snapshotContext, (Collection)tablesToRead.get(database), true, executorService);
            }
        }
        finally {
            if (executorService != null) {
                executorService.shutdownNow();
            }
        }
    }

    private void createSchemaEventsForTables(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, Collection<TableId> tablesToRead, boolean firstPhase) throws Exception {
        List<Object> realTablesToRead = new ArrayList<TableId>(tablesToRead);
        if (firstPhase) {
            realTablesToRead = realTablesToRead.stream().filter(id -> !this.delayedSchemaSnapshotTables.contains(id)).collect(Collectors.toList());
        }
        for (TableId tableId : realTablesToRead) {
            this.connection.query("SHOW CREATE TABLE " + this.quote(tableId), rs -> {
                if (rs.next()) {
                    this.addSchemaEvent(snapshotContext, tableId.catalog(), rs.getString(2));
                }
            });
        }
    }

    /*
     * WARNING - void declaration
     */
    private void createSchemaEventsForTables(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, Collection<TableId> tablesToRead, boolean firstPhase, ExecutorService executorService) throws Exception {
        List<Object> realTablesToRead = new ArrayList<TableId>(tablesToRead);
        if (firstPhase) {
            realTablesToRead = realTablesToRead.stream().filter(id -> !this.delayedSchemaSnapshotTables.contains(id)).collect(Collectors.toList());
        }
        if (!realTablesToRead.isEmpty()) {
            void var8_10;
            ExecutorCompletionService<Map<TableId, String>> completionService = new ExecutorCompletionService<Map<TableId, String>>(executorService);
            for (TableId tableId : realTablesToRead) {
                completionService.submit(this.createDdlForTableCallable(tableId, this.connectionPool));
            }
            HashMap<TableId, String> ddls = new HashMap<TableId, String>();
            boolean bl = false;
            while (var8_10 < realTablesToRead.size()) {
                Map ddl = (Map)completionService.take().get();
                if (ddl != null) {
                    ddls.putAll(ddl);
                }
                ++var8_10;
            }
            ddls.forEach((key, value) -> this.addSchemaEvent(snapshotContext, key.catalog(), (String)value));
        }
    }

    private Callable<Map<TableId, String>> createDdlForTableCallable(TableId tableId, Queue<JdbcConnection> connectionPool) {
        return () -> {
            JdbcConnection connection = (JdbcConnection)connectionPool.poll();
            assert (connection != null);
            try {
                HashMap result = new HashMap();
                connection.query("SHOW CREATE TABLE " + this.quote(tableId), rs -> {
                    if (rs.next()) {
                        result.put(tableId, rs.getString(2));
                    }
                });
                HashMap hashMap = result;
                return hashMap;
            }
            finally {
                connectionPool.add(connection);
            }
        };
    }

    private boolean twoPhaseSchemaSnapshot() {
        return this.connectorConfig.getSnapshotLockingStrategy().isLockingEnabled() && !this.isGloballyLocked();
    }

    protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, Table table) {
        return SchemaChangeEvent.ofSnapshotCreate((Partition)snapshotContext.partition, (OffsetContext)snapshotContext.offset, (String)snapshotContext.catalogName, (Table)table);
    }

    protected Optional<String> getSnapshotSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, TableId tableId, List<String> columns) {
        return this.getSnapshotSelect(tableId, columns);
    }

    private Optional<String> getSnapshotSelect(TableId tableId, List<String> columns) {
        return this.snapshotterService.getSnapshotQuery().snapshotQuery(tableId.toQuotedString('`'), columns);
    }

    protected Optional<String> getSnapshotConnectionFirstSelect(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, TableId tableId) {
        if (this.getSnapshotSelect(tableId, List.of("*")).isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(this.getSnapshotSelect(tableId, List.of("*")).get() + " LIMIT 1");
    }

    private boolean isGloballyLocked() {
        return this.globalLockAcquiredAt != -1L;
    }

    private boolean isTablesLocked() {
        return this.tableLockAcquiredAt != -1L;
    }

    private void globalLock() throws SQLException {
        LOGGER.info("Flush and obtain global read lock to prevent writes to database");
        Optional lockingStatement = this.snapshotterService.getSnapshotLock().tableLockingStatement(null, null);
        if (lockingStatement.isPresent()) {
            this.connection.executeWithoutCommitting(new String[]{(String)lockingStatement.get()});
            this.globalLockAcquiredAt = this.clock.currentTimeInMillis();
        }
    }

    private void globalUnlock() throws SQLException {
        LOGGER.info("Releasing global read lock to enable MySQL writes");
        this.connection.executeWithoutCommitting(new String[]{"UNLOCK TABLES"});
        long lockReleased = this.clock.currentTimeInMillis();
        this.metrics.setGlobalLockReleased();
        LOGGER.info("Writes to MySQL tables prevented for a total of {}", (Object)Strings.duration((long)(lockReleased - this.globalLockAcquiredAt)));
        this.globalLockAcquiredAt = -1L;
    }

    private void tableLock(RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext) throws SQLException {
        if (!this.connection.userHasPrivileges("LOCK TABLES")) {
            throw new DebeziumException("User does not have the 'LOCK TABLES' privilege required to obtain a consistent snapshot by preventing concurrent writes to tables.");
        }
        LOGGER.info("Flush and obtain read lock for {} tables (preventing writes)", (Object)snapshotContext.capturedTables);
        if (!snapshotContext.capturedTables.isEmpty()) {
            String tableList = snapshotContext.capturedTables.stream().map(this::quote).collect(Collectors.joining(","));
            this.connection.executeWithoutCommitting(new String[]{"FLUSH TABLES " + tableList + " WITH READ LOCK"});
        }
        this.tableLockAcquiredAt = this.clock.currentTimeInMillis();
        this.metrics.setGlobalLockAcquired();
    }

    private void tableUnlock() throws SQLException {
        LOGGER.info("Releasing table read lock to enable MySQL writes");
        this.connection.executeWithoutCommitting(new String[]{"UNLOCK TABLES"});
        long lockReleased = this.clock.currentTimeInMillis();
        this.metrics.setGlobalLockReleased();
        LOGGER.info("Writes to MySQL tables prevented for a total of {}", (Object)Strings.duration((long)(lockReleased - this.tableLockAcquiredAt)));
        this.tableLockAcquiredAt = -1L;
    }

    private String quote(String dbOrTableName) {
        return "`" + dbOrTableName + "`";
    }

    private String quote(TableId id) {
        return this.quote(id.catalog()) + "." + this.quote(id.table());
    }

    protected OptionalLong rowCountForTable(TableId tableId) {
        if (this.getSnapshotSelectOverridesByTable(tableId, this.connectorConfig.getSnapshotSelectOverridesByTable()) != null) {
            return super.rowCountForTable(tableId);
        }
        OptionalLong rowCount = this.connection.getEstimatedTableSize(tableId);
        LOGGER.info("Estimated row count for table {} is {}", (Object)tableId, (Object)rowCount);
        return rowCount;
    }

    protected Statement readTableStatement(JdbcConnection jdbcConnection, OptionalLong rowCount) throws SQLException {
        BinlogConnectorConnection connection = (BinlogConnectorConnection)jdbcConnection;
        long largeTableRowCount = this.connectorConfig.getRowCountForLargeTable();
        if (rowCount.isEmpty() || largeTableRowCount == 0L || rowCount.getAsLong() <= largeTableRowCount) {
            return super.readTableStatement((JdbcConnection)connection, rowCount);
        }
        return this.createStatementWithLargeResultSet(connection);
    }

    private Statement createStatementWithLargeResultSet(BinlogConnectorConnection connection) throws SQLException {
        int fetchSize = this.connectorConfig.getSnapshotFetchSize();
        Statement stmt = connection.connection().createStatement(1003, 1007);
        stmt.setFetchSize(fetchSize);
        return stmt;
    }

    protected void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> snapshotContext, SnapshottingTask snapshottingTask) throws Exception {
        this.tryStartingSnapshot(snapshotContext);
        for (SchemaChangeEvent event : this.schemaEvents) {
            TableId tableId;
            if (!sourceContext.isRunning()) {
                throw new InterruptedException("Interrupted while processing event " + event);
            }
            if (this.databaseSchema.skipSchemaChangeEvent(event)) continue;
            LOGGER.debug("Processing schema event {}", (Object)event);
            TableId tableId2 = tableId = event.getTables().isEmpty() ? null : ((Table)event.getTables().iterator().next()).id();
            if (snapshottingTask.isOnDemand() && !snapshotContext.capturedTables.contains(tableId)) {
                LOGGER.debug("Event {} will be skipped since it's not related to blocking snapshot captured table {}", (Object)event, (Object)snapshotContext.capturedTables);
                continue;
            }
            ((BinlogOffsetContext)snapshotContext.offset).event((DataCollectionId)tableId, this.getClock().currentTime());
            this.dispatcher.dispatchSchemaChangeEvent((Partition)((BinlogPartition)snapshotContext.partition), snapshotContext.offset, (DataCollectionId)tableId, receiver -> receiver.schemaChangeEvent(event));
        }
        this.databaseSchema.tableIds().forEach(x -> snapshotContext.tables.overwriteTable(this.databaseSchema.tableFor((TableId)x)));
    }

    protected void postSnapshot() throws InterruptedException {
        this.lastEventProcessor.accept(record -> {
            record.sourceOffset().remove("snapshot");
            ((Struct)record.value()).getStruct("source").put("snapshot", (Object)SnapshotRecord.LAST.toString().toLowerCase());
            return record;
        });
        super.postSnapshot();
    }

    protected void preSnapshot() throws InterruptedException {
        this.preSnapshotAction.run();
        super.preSnapshot();
    }

    private static class BinlogSnapshotContext<P extends BinlogPartition, O extends BinlogOffsetContext>
    extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext<P, O> {
        BinlogSnapshotContext(P partition, boolean onDemand) {
            super(partition, "", onDemand);
        }
    }
}

