/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.ChangeEvent;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.RecordsProducer;
import io.debezium.connector.postgresql.RecordsStreamProducer;
import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Array;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.util.PGmoney;

@ThreadSafe
public class RecordsSnapshotProducer
extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-snapshot-producer";
    private final ExecutorService executorService;
    private final Optional<RecordsStreamProducer> streamProducer;
    private final AtomicReference<SourceRecord> currentRecord;

    public RecordsSnapshotProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, boolean continueStreamingAfterCompletion) {
        super(taskContext, sourceInfo);
        this.executorService = Threads.newSingleThreadExecutor(PostgresConnector.class, (String)taskContext.config().serverName(), (String)CONTEXT_NAME);
        this.currentRecord = new AtomicReference();
        this.streamProducer = continueStreamingAfterCompletion ? Optional.of(new RecordsStreamProducer(taskContext, sourceInfo)) : Optional.empty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void start(BlockingConsumer<ChangeEvent> eventConsumer, Consumer<Throwable> failureConsumer) {
        LoggingContext.PreviousContext previousContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            ((CompletableFuture)CompletableFuture.runAsync(() -> this.takeSnapshot(eventConsumer), this.executorService).thenRun(() -> this.startStreaming(eventConsumer, failureConsumer))).exceptionally(e -> {
                this.logger.error("unexpected exception", e.getCause() != null ? e.getCause() : e);
                this.stop();
                failureConsumer.accept((Throwable)e);
                return null;
            });
        }
        finally {
            previousContext.restore();
        }
    }

    private void startStreaming(BlockingConsumer<ChangeEvent> consumer, Consumer<Throwable> failureConsumer) {
        try {
            this.streamProducer.ifPresent(producer -> {
                this.logger.info("Snapshot finished, continuing streaming changes from {}", (Object)ReplicationConnection.format(this.sourceInfo.lsn()));
                producer.start(consumer, failureConsumer);
            });
        }
        finally {
            this.cleanup();
        }
    }

    @Override
    protected void commit(long lsn) {
        this.streamProducer.ifPresent(x -> x.commit(lsn));
    }

    @Override
    protected void stop() {
        try {
            this.streamProducer.ifPresent(RecordsStreamProducer::stop);
        }
        finally {
            this.cleanup();
        }
    }

    private void cleanup() {
        this.currentRecord.set(null);
        this.executorService.shutdownNow();
    }

    private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
        long snapshotStart = this.clock().currentTimeInMillis();
        Connection jdbcConnection = null;
        try (PostgresConnection connection = this.taskContext.createConnection();){
            jdbcConnection = connection.connection();
            String lineSeparator = System.lineSeparator();
            this.logger.info("Step 0: disabling autocommit");
            connection.setAutoCommit(false);
            long lockTimeoutMillis = this.taskContext.config().snapshotLockTimeoutMillis();
            this.logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'", (Object)connection.database(), (Object)connection.username());
            StringBuilder statements = new StringBuilder("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;");
            connection.executeWithoutCommitting(statements.toString());
            statements.delete(0, statements.length());
            PostgresSchema schema = this.schema();
            schema.refresh(connection, false);
            this.logger.info("Step 2: locking each of the database tables, waiting a maximum of '{}' seconds for each lock", (Object)((double)lockTimeoutMillis / 1000.0));
            statements.append("SET lock_timeout = ").append(lockTimeoutMillis).append(";").append(lineSeparator);
            schema.tables().forEach(tableId -> statements.append("LOCK TABLE ").append(tableId.toDoubleQuotedString()).append(" IN SHARE UPDATE EXCLUSIVE MODE;").append(lineSeparator));
            connection.executeWithoutCommitting(statements.toString());
            schema.refresh(connection, false);
            long xlogStart = connection.currentXLogLocation();
            long txId = connection.currentTransactionId();
            this.logger.info("\t read xlogStart at '{}' from transaction '{}'", (Object)ReplicationConnection.format(xlogStart), (Object)txId);
            this.sourceInfo.startSnapshot();
            this.sourceInfo.update(xlogStart, this.clock().currentTimeInMicros(), txId);
            this.logger.info("Step 3: reading and exporting the contents of each table");
            AtomicInteger rowsCounter = new AtomicInteger(0);
            Map<TableId, String> selectOverrides = this.getSnapshotSelectOverridesByTable();
            for (TableId tableId2 : schema.tables()) {
                if (schema.isFilteredOut(tableId2)) {
                    this.logger.info("\t table '{}' is filtered out, ignoring", (Object)tableId2);
                    continue;
                }
                long exportStart = this.clock().currentTimeInMillis();
                this.logger.info("\t exporting data from table '{}'", (Object)tableId2);
                try {
                    String selectStatement = selectOverrides.getOrDefault(tableId2, "SELECT * FROM " + tableId2.toDoubleQuotedString());
                    this.logger.info("For table '{}' using select statement: '{}'", (Object)tableId2, (Object)selectStatement);
                    connection.queryWithBlockingConsumer(selectStatement, this::readTableStatement, rs -> this.readTable(tableId2, rs, consumer, rowsCounter));
                    this.logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", new Object[]{rowsCounter.get(), tableId2, Strings.duration((long)(this.clock().currentTimeInMillis() - exportStart))});
                    rowsCounter.set(0);
                }
                catch (SQLException e) {
                    throw new ConnectException((Throwable)e);
                }
            }
            this.logger.info("Step 4: committing transaction '{}'", (Object)txId);
            jdbcConnection.commit();
            SourceRecord currentRecord = this.currentRecord.get();
            if (currentRecord != null) {
                this.logger.info("Step 5: sending the last snapshot record");
                this.sourceInfo.markLastSnapshotRecord();
                this.currentRecord.set(new SourceRecord(currentRecord.sourcePartition(), this.sourceInfo.offset(), currentRecord.topic(), currentRecord.kafkaPartition(), currentRecord.keySchema(), currentRecord.key(), currentRecord.valueSchema(), currentRecord.value()));
                this.sendCurrentRecord(consumer);
            }
            this.sourceInfo.completeSnapshot();
            this.logger.info("Snapshot completed in '{}'", (Object)Strings.duration((long)(this.clock().currentTimeInMillis() - snapshotStart)));
        }
        catch (SQLException e) {
            this.rollbackTransaction(jdbcConnection);
            throw new ConnectException((Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            this.rollbackTransaction(jdbcConnection);
            this.logger.warn("Snapshot aborted after '{}'", (Object)Strings.duration((long)(this.clock().currentTimeInMillis() - snapshotStart)));
        }
    }

    private void rollbackTransaction(Connection jdbcConnection) {
        try {
            if (jdbcConnection != null) {
                jdbcConnection.rollback();
            }
        }
        catch (SQLException se) {
            this.logger.error("Cannot rollback snapshot transaction", (Throwable)se);
        }
    }

    private Statement readTableStatement(Connection conn) throws SQLException {
        int rowsFetchSize = this.taskContext.config().rowsFetchSize();
        Statement statement = conn.createStatement();
        statement.setFetchSize(rowsFetchSize);
        return statement;
    }

    private void readTable(TableId tableId, ResultSet rs, BlockingConsumer<ChangeEvent> consumer, AtomicInteger rowsCounter) throws SQLException, InterruptedException {
        Table table = this.schema().tableFor(tableId);
        assert (table != null);
        int numColumns = table.columns().size();
        Object[] row = new Object[numColumns];
        ResultSetMetaData metaData = rs.getMetaData();
        while (rs.next()) {
            rowsCounter.incrementAndGet();
            this.sendCurrentRecord(consumer);
            int i = 0;
            int j = 1;
            while (i != numColumns) {
                row[i] = this.valueForColumn(rs, j, metaData);
                ++i;
                ++j;
            }
            this.generateReadRecord(tableId, row);
        }
    }

    private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaData) throws SQLException {
        try {
            String columnTypeName = metaData.getColumnTypeName(colIdx);
            PostgresType type = this.taskContext.schema().getTypeRegistry().get(columnTypeName);
            if (type.isArrayType()) {
                Array array = rs.getArray(colIdx);
                if (array == null) {
                    return null;
                }
                return Arrays.asList((Object[])array.getArray());
            }
            switch (type.getOid()) {
                case 790: {
                    return new PGmoney((String)rs.getString((int)colIdx)).val;
                }
                case 1560: {
                    return rs.getString(colIdx);
                }
                case 1700: {
                    String s = rs.getString(colIdx);
                    if (s == null) {
                        return s;
                    }
                    Optional<SpecialValueDecimal> value = PostgresValueConverter.toSpecialValue(s);
                    return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(colIdx));
                }
            }
            return rs.getObject(colIdx);
        }
        catch (SQLException e) {
            return rs.getObject(colIdx);
        }
    }

    protected void generateReadRecord(TableId tableId, Object[] rowData) {
        if (rowData.length == 0) {
            return;
        }
        TableSchema tableSchema = this.schema().schemaFor(tableId);
        assert (tableSchema != null);
        Object key = tableSchema.keyFromColumnData(rowData);
        Struct value = tableSchema.valueFromColumnData(rowData);
        if (key == null || value == null) {
            return;
        }
        Schema keySchema = tableSchema.keySchema();
        this.sourceInfo.update(this.clock().currentTimeInMicros());
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String topicName = this.topicSelector().topicNameFor(tableId);
        Envelope envelope = tableSchema.getEnvelopeSchema();
        this.currentRecord.set(new SourceRecord(partition, offset, topicName, null, keySchema, key, envelope.schema(), (Object)envelope.read((Object)value, this.sourceInfo.source(), Long.valueOf(this.clock().currentTimeInMillis()))));
    }

    private void sendCurrentRecord(BlockingConsumer<ChangeEvent> consumer) throws InterruptedException {
        SourceRecord record = this.currentRecord.get();
        if (record == null) {
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending read event '{}'", (Object)record);
        }
        consumer.accept((Object)new ChangeEvent(record));
    }

    private Map<TableId, String> getSnapshotSelectOverridesByTable() {
        String tableList = this.taskContext.getConfig().snapshotSelectOverrides();
        if (tableList == null) {
            return Collections.emptyMap();
        }
        HashMap<TableId, String> snapshotSelectOverridesByTable = new HashMap<TableId, String>();
        for (String table : tableList.split(",")) {
            snapshotSelectOverridesByTable.put(TableId.parse((String)table), this.taskContext.getConfig().snapshotSelectOverrideForTable(table));
        }
        return snapshotSelectOverridesByTable;
    }
}

