/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection;

import com.google.protobuf.InvalidProtocolBufferException;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.postgresql.PGConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalCreateSlotBuilder;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresReplicationConnection
extends JdbcConnection
implements ReplicationConnection {
    private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    private final String slotName;
    private final String pluginName;
    private final boolean dropSlotOnClose;
    private final Configuration originalConfig;
    private final Integer statusUpdateIntervalSeconds;
    private long defaultStartingPos;

    private PostgresReplicationConnection(Configuration config, String slotName, String pluginName, boolean dropSlotOnClose, Integer statusUpdateIntervalSeconds) {
        super(config, PostgresConnection.FACTORY, null, PostgresReplicationConnection::defaultSettings);
        this.originalConfig = config;
        this.slotName = slotName;
        this.pluginName = pluginName;
        this.dropSlotOnClose = dropSlotOnClose;
        this.statusUpdateIntervalSeconds = statusUpdateIntervalSeconds;
        try {
            this.initReplicationSlot();
        }
        catch (SQLException e) {
            throw new JdbcConnectionException("Cannot create replication connection", e);
        }
    }

    protected void initReplicationSlot() throws SQLException {
        ServerInfo.ReplicationSlot slotInfo;
        try (PostgresConnection connection = new PostgresConnection(this.originalConfig);){
            slotInfo = connection.readReplicationSlotInfo(this.slotName, this.pluginName);
        }
        boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
        try {
            if (shouldCreateSlot) {
                LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", (Object)this.slotName, (Object)this.pluginName);
                ((ChainedLogicalCreateSlotBuilder)this.pgConnection().getReplicationAPI().createReplicationSlot().logical().withSlotName(this.slotName)).withOutputPlugin(this.pluginName).make();
            } else if (slotInfo.active()) {
                LOGGER.error("A logical replication slot named '{}' for plugin '{}' and database '{}' is already active on the server.You cannot have multiple slots with the same name active for the same database", new Object[]{this.slotName, this.pluginName, this.database()});
                throw new IllegalStateException();
            }
            AtomicLong xlogStart = new AtomicLong();
            this.execute(statement -> {
                String identifySystemStatement = "IDENTIFY_SYSTEM";
                LOGGER.debug("running '{}' to validate replication connection", (Object)identifySystemStatement);
                try (ResultSet rs = statement.executeQuery(identifySystemStatement);){
                    if (!rs.next()) {
                        throw new IllegalStateException("The DB connection is not a valid replication connection");
                    }
                    String xlogpos = rs.getString("xlogpos");
                    LOGGER.debug("received latest xlogpos '{}'", (Object)xlogpos);
                    xlogStart.compareAndSet(0L, LogSequenceNumber.valueOf((String)xlogpos).asLong());
                }
            });
            if (shouldCreateSlot || !slotInfo.hasValidFlushedLSN()) {
                this.defaultStartingPos = xlogStart.get();
            } else {
                Long latestFlushedLSN = slotInfo.latestFlushedLSN();
                long l = this.defaultStartingPos = latestFlushedLSN < xlogStart.get() ? latestFlushedLSN.longValue() : xlogStart.get();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found previous flushed LSN '{}'", (Object)ReplicationConnection.format(latestFlushedLSN));
                }
            }
        }
        catch (SQLException e) {
            throw new JdbcConnectionException(e);
        }
    }

    @Override
    public ReplicationStream startStreaming() throws SQLException {
        return this.startStreaming(this.defaultStartingPos);
    }

    @Override
    public ReplicationStream startStreaming(Long offset) throws SQLException {
        this.connect();
        if (offset == null || offset <= 0L) {
            offset = this.defaultStartingPos;
        }
        LogSequenceNumber lsn = LogSequenceNumber.valueOf((long)offset);
        LOGGER.debug("starting streaming from LSN '{}'", (Object)lsn.asString());
        return this.createReplicationStream(lsn);
    }

    protected PGConnection pgConnection() throws SQLException {
        return (PGConnection)this.connection();
    }

    private ReplicationStream createReplicationStream(LogSequenceNumber lsn) throws SQLException {
        assert (lsn != null);
        ChainedLogicalStreamBuilder streamBuilder = (ChainedLogicalStreamBuilder)((ChainedLogicalStreamBuilder)this.pgConnection().getReplicationAPI().replicationStream().logical().withSlotName(this.slotName)).withStartPosition(lsn);
        if (this.statusUpdateIntervalSeconds != null) {
            streamBuilder.withStatusInterval(this.statusUpdateIntervalSeconds.intValue(), TimeUnit.SECONDS);
        }
        final PGReplicationStream stream = streamBuilder.start();
        final long lsnLong = lsn.asLong();
        return new ReplicationStream(){
            private volatile LogSequenceNumber lastReceivedLSN;

            @Override
            public PgProto.RowMessage read() throws SQLException {
                ByteBuffer read = stream.read();
                if (lsnLong >= stream.getLastReceiveLSN().asLong()) {
                    return null;
                }
                return this.deserializeMessage(read);
            }

            @Override
            public PgProto.RowMessage readPending() throws SQLException {
                ByteBuffer read = stream.readPending();
                if (read == null || lsnLong >= stream.getLastReceiveLSN().asLong()) {
                    return null;
                }
                return this.deserializeMessage(read);
            }

            private PgProto.RowMessage deserializeMessage(ByteBuffer buffer) {
                try {
                    if (!buffer.hasArray()) {
                        throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
                    }
                    byte[] source = buffer.array();
                    byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
                    this.lastReceivedLSN = stream.getLastReceiveLSN();
                    return PgProto.RowMessage.parseFrom(content);
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void close() throws SQLException {
                stream.close();
            }

            @Override
            public void flushLSN() throws SQLException {
                if (this.lastReceivedLSN == null) {
                    return;
                }
                stream.setFlushedLSN(this.lastReceivedLSN);
                stream.setAppliedLSN(this.lastReceivedLSN);
                stream.forceUpdateStatus();
            }

            @Override
            public Long lastReceivedLSN() {
                return this.lastReceivedLSN != null ? Long.valueOf(this.lastReceivedLSN.asLong()) : null;
            }
        };
    }

    @Override
    public synchronized void close() {
        try {
            super.close();
        }
        catch (SQLException e) {
            LOGGER.error("Unexpected error while closing Postgres connection", (Throwable)e);
        }
        if (this.dropSlotOnClose) {
            try (PostgresConnection connection = new PostgresConnection(this.originalConfig);){
                connection.dropReplicationSlot(this.slotName);
            }
        }
    }

    protected static void defaultSettings(Configuration.Builder builder) {
        PostgresConnection.defaultSettings(builder);
        builder.with("replication", "database").with("preferQueryMode", "simple");
    }

    protected static class ReplicationConnectionBuilder
    implements ReplicationConnection.Builder {
        private Configuration config;
        private String slotName = "debezium";
        private String pluginName = "decoderbufs";
        private boolean dropSlotOnClose = true;
        private int statusUpdateIntervalSeconds = 10;

        protected ReplicationConnectionBuilder(Configuration config) {
            assert (config != null);
            this.config = config;
        }

        @Override
        public ReplicationConnectionBuilder withSlot(String slotName) {
            assert (slotName != null);
            this.slotName = slotName;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder withPlugin(String pluginName) {
            assert (pluginName != null);
            this.pluginName = pluginName;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder dropSlotOnClose(boolean dropSlotOnClose) {
            this.dropSlotOnClose = dropSlotOnClose;
            return this;
        }

        @Override
        public ReplicationConnectionBuilder statusUpdateIntervalSeconds(int statusUpdateIntervalSeconds) {
            assert (statusUpdateIntervalSeconds >= 0);
            this.statusUpdateIntervalSeconds = statusUpdateIntervalSeconds;
            return this;
        }

        @Override
        public ReplicationConnection build() {
            return new PostgresReplicationConnection(this.config, this.slotName, this.pluginName, this.dropSlotOnClose, this.statusUpdateIntervalSeconds);
        }
    }
}

