/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.postgresql.ChangeEvent;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.RecordsProducer;
import io.debezium.connector.postgresql.RecordsStreamProducer;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Array;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.util.PGmoney;

@ThreadSafe
public class RecordsSnapshotProducer
extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-snapshot-producer";
    private final ExecutorService executorService;
    private final Optional<RecordsStreamProducer> streamProducer;
    private final AtomicReference<SourceRecord> currentRecord;
    private final Snapshotter snapshotter;

    public RecordsSnapshotProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, Snapshotter snapshotter) {
        super(taskContext, sourceInfo);
        this.executorService = Threads.newSingleThreadExecutor(PostgresConnector.class, (String)taskContext.config().getLogicalName(), (String)CONTEXT_NAME);
        this.currentRecord = new AtomicReference();
        this.snapshotter = snapshotter;
        this.streamProducer = snapshotter.shouldStream() ? Optional.of(new RecordsStreamProducer(taskContext, sourceInfo)) : Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void start(BlockingConsumer<ChangeEvent> eventConsumer, Consumer<Throwable> failureConsumer) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            ((CompletableFuture)((CompletableFuture)CompletableFuture.runAsync(this::delaySnapshotIfNeeded, this.executorService).thenRunAsync(() -> this.takeSnapshot(eventConsumer), this.executorService)).thenRunAsync(() -> this.startStreaming(eventConsumer, failureConsumer), this.executorService)).exceptionally(e -> {
                this.logger.error("unexpected exception", e.getCause() != null ? e.getCause() : e);
                this.stop();
                failureConsumer.accept((Throwable)e);
                return null;
            });
        }
        finally {
            previousContext.restore();
        }
    }

    private void delaySnapshotIfNeeded() {
        Duration delay = this.taskContext.getConfig().getSnapshotDelay();
        if (delay.isZero() || delay.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer((Clock)Clock.SYSTEM, (Duration)delay);
        Metronome metronome = Metronome.parker((Duration)ConfigurationDefaults.RETURN_CONTROL_INTERVAL, (Clock)Clock.SYSTEM);
        while (!timer.expired()) {
            try {
                this.logger.info("The connector will wait for {}s before proceeding", (Object)timer.remaining().getSeconds());
                metronome.pause();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.debug("Interrupted while awaiting initial snapshot delay");
                return;
            }
        }
    }

    private void startStreaming(BlockingConsumer<ChangeEvent> consumer, Consumer<Throwable> failureConsumer) {
        try {
            this.streamProducer.ifPresent(producer -> {
                if (this.sourceInfo.lsn() != null && this.logger.isInfoEnabled()) {
                    this.logger.info("Snapshot finished, continuing streaming changes from {}", (Object)ReplicationConnection.format(this.sourceInfo.lsn()));
                }
                producer.start(consumer, failureConsumer);
            });
        }
        finally {
            this.cleanup();
        }
    }

    @Override
    protected void commit(long lsn) {
        this.streamProducer.ifPresent(x -> x.commit(lsn));
    }

    @Override
    protected void stop() {
        try {
            this.streamProducer.ifPresent(RecordsStreamProducer::stop);
        }
        finally {
            this.cleanup();
        }
    }

    private void cleanup() {
        this.currentRecord.set(null);
        this.executorService.shutdownNow();
    }

    private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
        block24: {
            if (this.executorService.isShutdown()) {
                this.logger.info("Not taking snapshot as this task has been cancelled already");
                return;
            }
            long snapshotStart = this.clock().currentTimeInMillis();
            Connection jdbcConnection = null;
            try (PostgresConnection connection = this.taskContext.createConnection();){
                jdbcConnection = connection.connection();
                String lineSeparator = System.lineSeparator();
                this.logger.info("Step 0: disabling autocommit");
                connection.setAutoCommit(false);
                long lockTimeoutMillis = this.taskContext.config().snapshotLockTimeoutMillis();
                this.logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'", (Object)connection.database(), (Object)connection.username());
                StringBuilder statements = new StringBuilder("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;");
                connection.executeWithoutCommitting(new String[]{statements.toString()});
                statements.delete(0, statements.length());
                PostgresSchema schema = this.schema();
                schema.refresh(connection, false);
                this.logger.info("Step 2: locking each of the database tables, waiting a maximum of '{}' seconds for each lock", (Object)((double)lockTimeoutMillis / 1000.0));
                statements.append("SET lock_timeout = ").append(lockTimeoutMillis).append(";").append(lineSeparator);
                schema.tableIds().forEach(tableId -> statements.append("LOCK TABLE ").append(tableId.toDoubleQuotedString()).append(" IN SHARE UPDATE EXCLUSIVE MODE;").append(lineSeparator));
                connection.executeWithoutCommitting(new String[]{statements.toString()});
                schema.refresh(connection, false);
                long xlogStart = connection.currentXLogLocation();
                long txId = connection.currentTransactionId();
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("\t read xlogStart at '{}' from transaction '{}'", (Object)ReplicationConnection.format(xlogStart), (Object)txId);
                }
                this.sourceInfo.startSnapshot();
                this.sourceInfo.update(xlogStart, this.clock().currentTime(), txId, null, this.sourceInfo.xmin());
                this.logger.info("Step 3: reading and exporting the contents of each table");
                AtomicInteger rowsCounter = new AtomicInteger(0);
                for (TableId tableId2 : schema.tableIds()) {
                    long exportStart = this.clock().currentTimeInMillis();
                    this.logger.info("\t exporting data from table '{}'", (Object)tableId2);
                    try {
                        Optional<String> selectStatement = this.snapshotter.buildSnapshotQuery(tableId2);
                        if (!selectStatement.isPresent()) {
                            this.logger.warn("For table '{}' the select statement was not provided, skipping table", (Object)tableId2);
                            continue;
                        }
                        this.logger.info("For table '{}' using select statement: '{}'", (Object)tableId2, selectStatement);
                        connection.queryWithBlockingConsumer(selectStatement.get(), this::readTableStatement, rs -> this.readTable(tableId2, rs, consumer, rowsCounter));
                        if (this.logger.isInfoEnabled()) {
                            this.logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", new Object[]{rowsCounter.get(), tableId2, Strings.duration((long)(this.clock().currentTimeInMillis() - exportStart))});
                        }
                        rowsCounter.set(0);
                    }
                    catch (SQLException e) {
                        throw new ConnectException((Throwable)e);
                    }
                }
                this.logger.info("Step 4: committing transaction '{}'", (Object)txId);
                jdbcConnection.commit();
                SourceRecord currentRecord = this.currentRecord.get();
                if (currentRecord != null) {
                    this.logger.info("Step 5: sending the last snapshot record");
                    this.sourceInfo.markLastSnapshotRecord();
                    this.changeSourceToLastSnapshotRecord(currentRecord);
                    this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), this.sourceInfo.offset(), currentRecord.topic(), currentRecord.kafkaPartition(), currentRecord.keySchema(), currentRecord.key(), currentRecord.valueSchema(), currentRecord.value()));
                    this.sendCurrentRecord(consumer);
                }
                this.sourceInfo.completeSnapshot();
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Snapshot completed in '{}'", (Object)Strings.duration((long)(this.clock().currentTimeInMillis() - snapshotStart)));
                }
                Heartbeat.create((Configuration)this.taskContext.config().getConfig(), (String)this.taskContext.topicSelector().getHeartbeatTopic(), (String)this.taskContext.config().getLogicalName()).forcedBeat(this.sourceInfo.partition(), this.sourceInfo.offset(), r -> consumer.accept((Object)new ChangeEvent((SourceRecord)r, this.sourceInfo.lsn())));
            }
            catch (SQLException e) {
                this.rollbackTransaction(jdbcConnection);
                throw new ConnectException((Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.rollbackTransaction(jdbcConnection);
                if (!this.logger.isWarnEnabled()) break block24;
                this.logger.warn("Snapshot aborted after '{}'", (Object)Strings.duration((long)(this.clock().currentTimeInMillis() - snapshotStart)));
            }
        }
    }

    private void changeSourceToLastSnapshotRecord(SourceRecord currentRecord) {
        Struct envelope = (Struct)currentRecord.value();
        Struct source = (Struct)envelope.get("source");
        if (source.getBoolean("last_snapshot_record") != null) {
            source.put("last_snapshot_record", (Object)true);
        }
    }

    private void rollbackTransaction(Connection jdbcConnection) {
        try {
            if (jdbcConnection != null) {
                jdbcConnection.rollback();
            }
        }
        catch (SQLException se) {
            this.logger.error("Cannot rollback snapshot transaction", (Throwable)se);
        }
    }

    private Statement readTableStatement(Connection conn) throws SQLException {
        int fetchSize = this.taskContext.config().getSnapshotFetchSize();
        Statement statement = conn.createStatement();
        statement.setFetchSize(fetchSize);
        return statement;
    }

    private void readTable(TableId tableId, ResultSet rs, BlockingConsumer<ChangeEvent> consumer, AtomicInteger rowsCounter) throws SQLException, InterruptedException {
        Table table = this.schema().tableFor(tableId);
        assert (table != null);
        int numColumns = table.columns().size();
        Object[] row = new Object[numColumns];
        ResultSetMetaData metaData = rs.getMetaData();
        while (rs.next()) {
            rowsCounter.incrementAndGet();
            this.sendCurrentRecord(consumer);
            int i = 0;
            int j = 1;
            while (i != numColumns) {
                row[i] = this.valueForColumn(rs, j, metaData);
                ++i;
                ++j;
            }
            this.generateReadRecord(tableId, row);
        }
    }

    private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaData) throws SQLException {
        try {
            String columnTypeName = metaData.getColumnTypeName(colIdx);
            PostgresType type = this.taskContext.schema().getTypeRegistry().get(columnTypeName);
            this.logger.trace("Type of incoming data is: {}", (Object)type.getOid());
            this.logger.trace("ColumnTypeName is: {}", (Object)columnTypeName);
            this.logger.trace("Type is: {}", (Object)type);
            if (type.isArrayType()) {
                Array array = rs.getArray(colIdx);
                if (array == null) {
                    return null;
                }
                return Arrays.asList((Object[])array.getArray());
            }
            switch (type.getOid()) {
                case 790: {
                    return new PGmoney((String)rs.getString((int)colIdx)).val;
                }
                case 1560: {
                    return rs.getString(colIdx);
                }
                case 1700: {
                    String s = rs.getString(colIdx);
                    if (s == null) {
                        return s;
                    }
                    Optional<SpecialValueDecimal> value = PostgresValueConverter.toSpecialValue(s);
                    return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(colIdx));
                }
                case 1266: {
                    return rs.getString(colIdx);
                }
            }
            Object x = rs.getObject(colIdx);
            if (x != null) {
                this.logger.trace("rs getobject returns class: {}; rs getObject value is: {}", x.getClass(), x);
            }
            return x;
        }
        catch (SQLException e) {
            return rs.getObject(colIdx);
        }
    }

    protected void generateReadRecord(TableId tableId, Object[] rowData) {
        this.currentRecord.set(null);
        if (rowData.length == 0) {
            return;
        }
        this.logger.trace("tableId value is: {}", (Object)tableId);
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(rowData);
        Struct value = tableSchema.valueFromColumnData(rowData);
        if (value == null) {
            this.logger.trace("Read event for null key with value {}", (Object)value);
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        this.sourceInfo.update(this.clock().currentTimeInMicros(), tableId);
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor((DataCollectionId)tableId);
        Envelope envelope = tableSchema.getEnvelopeSchema();
        this.currentRecord.set(new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.read((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis()))));
    }

    private void sendCurrentRecord(BlockingConsumer<ChangeEvent> consumer) throws InterruptedException {
        SourceRecord record = this.currentRecord.get();
        if (record == null) {
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending read event '{}'", (Object)record);
        }
        consumer.accept((Object)new ChangeEvent(record, this.sourceInfo.lsn()));
    }
}

