/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.schema;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opensearch.dataprepper.plugins.source.rds.exception.SqlMetadataException;
import org.opensearch.dataprepper.plugins.source.rds.schema.ConnectionManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.postgresql.PGConnection;
import org.postgresql.replication.PGReplicationConnection;
import org.postgresql.replication.fluent.logical.ChainedLogicalCreateSlotBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSchemaManager
implements SchemaManager {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresSchemaManager.class);
    private final ConnectionManager connectionManager;
    static final int NUM_OF_RETRIES = 3;
    static final int BACKOFF_IN_MILLIS = 500;
    static final String COLUMN_NAME = "COLUMN_NAME";
    static final String TYPE_NAME = "TYPE_NAME";
    static final String TABLE_SCHEMA = "TABLE_SCHEM";
    static final String TABLE_NAME = "TABLE_NAME";
    static final String PGOUTPUT = "pgoutput";
    static final String DROP_PUBLICATION_SQL = "DROP PUBLICATION IF EXISTS ";
    static final String DROP_SLOT_SQL = "SELECT pg_drop_replication_slot(?)";

    public PostgresSchemaManager(ConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void createLogicalReplicationSlot(List<String> tableNames, String publicationName, String slotName) {
        StringBuilder createPublicationStatementBuilder = new StringBuilder("CREATE PUBLICATION ").append(publicationName).append(" FOR TABLE ");
        for (int i = 0; i < tableNames.size(); ++i) {
            createPublicationStatementBuilder.append(tableNames.get(i));
            if (i >= tableNames.size() - 1) continue;
            createPublicationStatementBuilder.append(", ");
        }
        createPublicationStatementBuilder.append(";");
        String createPublicationStatement = createPublicationStatementBuilder.toString();
        try (Connection conn = this.connectionManager.getConnection();){
            ResultSet resultSet;
            PGReplicationConnection replicationConnection;
            block20: {
                LOG.debug("Connecting to create slot");
                try {
                    PreparedStatement statement = conn.prepareStatement(createPublicationStatement);
                    statement.executeUpdate();
                    LOG.info("Publication {} created successfully. ", (Object)publicationName);
                }
                catch (Exception e) {
                    LOG.error("Failed to create publication: {}", (Object)e.getMessage());
                    throw e;
                }
                PGConnection pgConnection = conn.unwrap(PGConnection.class);
                replicationConnection = pgConnection.getReplicationAPI();
                try {
                    String checkSlotQuery = "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = ?);";
                    PreparedStatement checkSlotStatement = conn.prepareStatement(checkSlotQuery);
                    checkSlotStatement.setString(1, slotName);
                    resultSet = checkSlotStatement.executeQuery();
                    try {
                        if (!resultSet.next() || !resultSet.getBoolean(1)) break block20;
                        LOG.info("Replication slot {} already exists. ", (Object)slotName);
                        if (resultSet == null) return;
                    }
                    catch (Throwable throwable) {
                        if (resultSet == null) throw throwable;
                        try {
                            resultSet.close();
                            throw throwable;
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                        throw throwable;
                    }
                    resultSet.close();
                    return;
                }
                catch (Exception e) {
                    LOG.warn("Failed to create replication slot {}: {}", (Object)slotName, (Object)e.getMessage());
                    return;
                }
            }
            if (resultSet != null) {
                resultSet.close();
            }
            LOG.info("Creating replication slot {}...", (Object)slotName);
            ((ChainedLogicalCreateSlotBuilder)replicationConnection.createReplicationSlot().logical().withSlotName(slotName)).withOutputPlugin(PGOUTPUT).make();
            LOG.info("Replication slot {} created successfully. ", (Object)slotName);
            return;
        }
        catch (Exception e) {
            LOG.error("Exception when creating replication slot: {}", (Object)e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public void deleteLogicalReplicationSlot(String publicationName, String slotName) {
        block26: {
            try (Connection conn = this.connectionManager.getConnection();){
                LOG.debug("Connecting to delete slot");
                if (slotName != null) {
                    try (PreparedStatement dropSlotStatement = conn.prepareStatement(DROP_SLOT_SQL);){
                        dropSlotStatement.setString(1, slotName);
                        dropSlotStatement.execute();
                        LOG.info("Replication slot {} dropped successfully.", (Object)slotName);
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to drop replication slot: {}", (Object)e.getMessage());
                    }
                }
                if (publicationName == null) break block26;
                try (Statement dropPublicationStatement = conn.createStatement();){
                    dropPublicationStatement.execute(DROP_PUBLICATION_SQL + publicationName);
                    LOG.info("Publication {} dropped successfully.", (Object)publicationName);
                }
                catch (Exception e) {
                    LOG.warn("Failed to drop publication: {}", (Object)e.getMessage());
                }
            }
            catch (Exception e) {
                LOG.error("Exception when connecting to database to drop replication slot. ", (Throwable)e);
            }
        }
    }

    @Override
    public Map<String, List<String>> getPrimaryKeys(List<String> fullTableNames) {
        HashMap<String, List<String>> hashMap;
        block9: {
            HashMap<String, List<String>> tableToPrimaryKeysMap = new HashMap<String, List<String>>();
            Connection connection = this.connectionManager.getConnection();
            try {
                LOG.debug("Connecting to get primary keys");
                for (String fullTableName : fullTableNames) {
                    tableToPrimaryKeysMap.put(fullTableName, this.getPrimaryKeysForTable(connection, fullTableName));
                }
                hashMap = tableToPrimaryKeysMap;
                if (connection == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to get connection while trying to get primary keys for tables. ", e);
                }
            }
            connection.close();
        }
        return hashMap;
    }

    @Override
    public Map<String, Map<String, String>> getColumnDataTypes(List<String> fullTableNames) {
        HashMap<String, Map<String, String>> hashMap;
        block9: {
            HashMap<String, Map<String, String>> tableToColumnDataTypesMap = new HashMap<String, Map<String, String>>();
            Connection connection = this.connectionManager.getConnection();
            try {
                LOG.debug("Connecting to get column types");
                for (String fullTableName : fullTableNames) {
                    tableToColumnDataTypesMap.put(fullTableName, this.getColumnDataTypesForTable(connection, fullTableName));
                }
                hashMap = tableToColumnDataTypesMap;
                if (connection == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (SQLException e) {
                    throw new RuntimeException("Failed to get connection while trying to get column data types for tables. ", e);
                }
            }
            connection.close();
        }
        return hashMap;
    }

    private Map<String, String> getColumnDataTypesForTable(Connection connection, String fullTableName) {
        String[] splits = fullTableName.split("\\.");
        String database = splits[0];
        String schema = splits[1];
        String table = splits[2];
        HashMap<String, String> columnsToDataType = new HashMap<String, String>();
        for (int retry = 0; retry <= 3; ++retry) {
            try {
                Set<String> enumColumns = this.getEnumColumnsForTable(connection, fullTableName);
                try (ResultSet columns = connection.getMetaData().getColumns(database, schema, table, null);){
                    while (columns.next()) {
                        String columnName = columns.getString(COLUMN_NAME);
                        String typeName = columns.getString(TYPE_NAME);
                        if (enumColumns.contains(columnName)) {
                            columnsToDataType.put(columnName, "enum");
                            continue;
                        }
                        columnsToDataType.put(columnName, typeName);
                    }
                }
                return columnsToDataType;
            }
            catch (Exception e) {
                LOG.error("Failed to get dataTypes for database {} schema {} table {}, retrying", new Object[]{database, schema, table, e});
                this.applyBackoff();
                continue;
            }
        }
        throw new SqlMetadataException(String.format("Failed to get dataTypes for database %s schema %s table %s after %d retries", database, schema, table, 3));
    }

    public Map<String, Set<String>> getEnumColumns(List<String> fullTableNames) {
        HashMap<String, Set<String>> hashMap;
        block9: {
            HashMap<String, Set<String>> tableToEnumColumnsMap = new HashMap<String, Set<String>>();
            Connection connection = this.connectionManager.getConnection();
            try {
                LOG.debug("Connecting to get enums");
                for (String fullTableName : fullTableNames) {
                    tableToEnumColumnsMap.put(fullTableName, this.getEnumColumnsForTable(connection, fullTableName));
                }
                hashMap = tableToEnumColumnsMap;
                if (connection == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (connection != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (SQLException e) {
                    throw new RuntimeException("Failed to get connection while trying to get enum columns for tables. ", e);
                }
            }
            connection.close();
        }
        return hashMap;
    }

    @Override
    public Set<String> getTableNames(String databaseName) {
        HashSet<String> tableNames = new HashSet<String>();
        int retry = 0;
        while (retry <= 3) {
            HashSet<String> hashSet;
            block17: {
                Connection connection = this.connectionManager.getConnection();
                try {
                    LOG.debug("Connecting to get table names");
                    try (ResultSet tables = connection.getMetaData().getTables(databaseName, null, null, new String[]{"TABLE"});){
                        while (tables.next()) {
                            String schemaName = tables.getString(TABLE_SCHEMA);
                            String tableName = tables.getString(TABLE_NAME);
                            tableNames.add(databaseName + "." + schemaName + "." + tableName);
                        }
                    }
                    hashSet = tableNames;
                    if (connection == null) break block17;
                }
                catch (Throwable throwable) {
                    try {
                        if (connection != null) {
                            try {
                                connection.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to get table names, retrying", (Throwable)e);
                        tableNames.clear();
                        this.applyBackoff();
                        ++retry;
                    }
                }
                connection.close();
            }
            return hashSet;
        }
        throw new RuntimeException("Failed to get table names for database: " + databaseName);
    }

    private List<String> getPrimaryKeysForTable(Connection connection, String fullTableName) {
        String[] splits = fullTableName.split("\\.");
        String database = splits[0];
        String schema = splits[1];
        String table = splits[2];
        int retry = 0;
        while (retry <= 3) {
            ArrayList<String> arrayList;
            block11: {
                ArrayList<String> primaryKeys = new ArrayList<String>();
                ResultSet rs = connection.getMetaData().getPrimaryKeys(database, schema, table);
                try {
                    while (rs.next()) {
                        primaryKeys.add(rs.getString(COLUMN_NAME));
                    }
                    if (primaryKeys.isEmpty()) {
                        LOG.warn("No primary keys found for table {}", (Object)fullTableName);
                    }
                    arrayList = primaryKeys;
                    if (rs == null) break block11;
                }
                catch (Throwable throwable) {
                    try {
                        if (rs != null) {
                            try {
                                rs.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    catch (Exception e) {
                        LOG.error("Failed to get primary keys for table {}, retrying", (Object)fullTableName, (Object)e);
                        this.applyBackoff();
                        ++retry;
                    }
                }
                rs.close();
            }
            return arrayList;
        }
        throw new RuntimeException("Failed to get primary keys for table " + fullTableName);
    }

    Set<String> getEnumColumnsForTable(Connection connection, String fullTableName) {
        String[] splits = fullTableName.split("\\.");
        String database = splits[0];
        String schema = splits[1];
        String table = splits[2];
        HashSet<String> enumColumns = new HashSet<String>();
        for (int retry = 0; retry <= 3; ++retry) {
            try {
                String getEnumColumnsQuery = "SELECT a.attname AS column_name FROM pg_database d JOIN pg_namespace n ON n.nspname = ? JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = ? JOIN pg_attribute a ON a.attrelid = c.oid JOIN pg_type t ON t.oid = a.atttypid WHERE d.datname = ? AND t.typtype = 'e'";
                try (PreparedStatement getEnumStatement = connection.prepareStatement(getEnumColumnsQuery);){
                    getEnumStatement.setString(1, schema);
                    getEnumStatement.setString(2, table);
                    getEnumStatement.setString(3, database);
                    try (ResultSet rs = getEnumStatement.executeQuery();){
                        while (rs.next()) {
                            enumColumns.add(rs.getString("column_name"));
                        }
                    }
                }
                return enumColumns;
            }
            catch (Exception e) {
                LOG.error("Failed to get enum columns for database {} schema {} table {}, retrying", new Object[]{database, schema, table, e});
                this.applyBackoff();
                continue;
            }
        }
        throw new SqlMetadataException(String.format("Failed to get enum columns for database %s schema %s table %s after ", database, schema, table));
    }

    private void applyBackoff() {
        try {
            Thread.sleep(500L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

