/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.stream;

import java.nio.ByteBuffer;
import java.sql.Connection;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.LogicalReplicationEventProcessor;
import org.opensearch.dataprepper.plugins.source.rds.stream.ReplicationLogClient;
import org.opensearch.dataprepper.plugins.source.rds.utils.RdsSourceAggregateMetrics;
import org.postgresql.PGConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogicalReplicationClient
implements ReplicationLogClient {
    private static final Logger LOG = LoggerFactory.getLogger(LogicalReplicationClient.class);
    static final String PROTO_VERSION_KEY = "proto_version";
    static final String VERSION_ONE = "1";
    static final String PUBLICATION_NAMES_KEY = "publication_names";
    static final String AUTHENTICATION_FAILED = "authentication failed";
    static final String CONNECTION_REFUSED = "Connection refused";
    static final String REPLICATION_SLOT_DOES_NOT_EXIST = ".*replication slot .* does not exist.*";
    static final String PERMISSION_DENIED = "permission denied";
    private final ConnectionManager connectionManager;
    private final String publicationName;
    private final String replicationSlotName;
    private final RdsSourceAggregateMetrics rdsSourceAggregateMetrics;
    private LogSequenceNumber startLsn;
    private LogicalReplicationEventProcessor eventProcessor;
    private PGReplicationStream stream = null;
    private volatile boolean disconnectRequested = false;

    public LogicalReplicationClient(ConnectionManager connectionManager, String publicationName, String replicationSlotName, RdsSourceAggregateMetrics rdsSourceAggregateMetrics) {
        this.connectionManager = connectionManager;
        this.publicationName = publicationName;
        this.replicationSlotName = replicationSlotName;
        this.rdsSourceAggregateMetrics = rdsSourceAggregateMetrics;
    }

    @Override
    public void connect() {
        this.rdsSourceAggregateMetrics.getStreamApiInvocations().increment();
        LOG.debug("Start connecting logical replication stream. ");
        try (Connection conn = this.connectionManager.getConnection();){
            PGConnection pgConnection = conn.unwrap(PGConnection.class);
            ChainedLogicalStreamBuilder logicalStreamBuilder = ((ChainedLogicalStreamBuilder)pgConnection.getReplicationAPI().replicationStream().logical().withSlotName(this.replicationSlotName)).withSlotOption(PROTO_VERSION_KEY, VERSION_ONE).withSlotOption(PUBLICATION_NAMES_KEY, this.publicationName);
            if (this.startLsn != null) {
                logicalStreamBuilder.withStartPosition(this.startLsn);
            }
            this.stream = logicalStreamBuilder.start();
            LOG.debug("Logical replication stream started. ");
            if (this.eventProcessor != null) {
                while (!this.disconnectRequested && !Thread.currentThread().isInterrupted()) {
                    try {
                        ByteBuffer msg = this.stream.readPending();
                        if (msg == null) {
                            Thread.sleep(10L);
                            continue;
                        }
                        this.eventProcessor.process(msg);
                        LogSequenceNumber lsn = this.stream.getLastReceiveLSN();
                        this.stream.setFlushedLSN(lsn);
                        this.stream.setAppliedLSN(lsn);
                    }
                    catch (Exception e) {
                        LOG.error("Exception while processing Postgres replication stream. ");
                        this.closeStream();
                        throw e;
                    }
                }
            }
            this.closeStream();
            this.disconnectRequested = false;
        }
        catch (Exception e) {
            this.categorizeError(e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void disconnect() {
        this.disconnectRequested = true;
        LOG.debug("Requested to disconnect logical replication stream.");
        this.closeStream();
        if (this.eventProcessor != null) {
            this.eventProcessor.stopCheckpointManager();
            LOG.debug("Stopped checkpoint manager.");
        }
    }

    public void setEventProcessor(LogicalReplicationEventProcessor eventProcessor) {
        this.eventProcessor = eventProcessor;
    }

    public void setStartLsn(LogSequenceNumber startLsn) {
        this.startLsn = startLsn;
    }

    private void closeStream() {
        if (this.stream != null && !this.stream.isClosed()) {
            try {
                this.stream.close();
                LOG.debug("Replication stream closed.");
            }
            catch (Exception e) {
                LOG.error("Exception while closing replication stream. ", (Throwable)e);
            }
        }
    }

    private void categorizeError(Exception e) {
        if (e.getMessage() != null && e.getMessage().contains(AUTHENTICATION_FAILED)) {
            this.rdsSourceAggregateMetrics.getStream4xxErrors().increment();
            this.rdsSourceAggregateMetrics.getStreamAuthErrors().increment();
            LOG.error("Failed to create or process PostgreSQL replication stream: Authentication failed. [{}]", (Object)e.getMessage());
        } else if (e.getCause() != null && e.getCause().getMessage() != null && e.getCause().getMessage().contains(CONNECTION_REFUSED)) {
            this.rdsSourceAggregateMetrics.getStream4xxErrors().increment();
            this.rdsSourceAggregateMetrics.getStreamServerNotFoundErrors().increment();
            LOG.error("Failed to create or process PostgreSQL replication stream: Cannot connect to PostgreSQL server. [{}]", (Object)e.getMessage());
        } else if (e.getMessage() != null && e.getMessage().matches(REPLICATION_SLOT_DOES_NOT_EXIST)) {
            this.rdsSourceAggregateMetrics.getStream4xxErrors().increment();
            this.rdsSourceAggregateMetrics.getStreamReplicationNotEnabledErrors().increment();
            LOG.error("Failed to create or process PostgreSQL replication stream: Replication slot does not exist. [{}]", (Object)e.getMessage());
        } else if (e.getMessage() != null && e.getMessage().contains(PERMISSION_DENIED)) {
            this.rdsSourceAggregateMetrics.getStream4xxErrors().increment();
            this.rdsSourceAggregateMetrics.getStreamAccessDeniedErrors().increment();
            LOG.error("Failed to create or process PostgreSQL replication stream: Insufficient privileges. [{}]", (Object)e.getMessage());
        } else {
            this.rdsSourceAggregateMetrics.getStream5xxErrors().increment();
            LOG.error("Failed to create or process PostgreSQL replication stream. ", (Throwable)e);
        }
    }
}

