/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.leader;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
import org.opensearch.dataprepper.plugins.source.rds.schema.MySqlSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.PostgresSchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);
    private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
    private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1L);
    private static final String S3_EXPORT_PREFIX = "rds";
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final RdsSourceConfig sourceConfig;
    private final String s3Prefix;
    private final SchemaManager schemaManager;
    private final DbTableMetadata dbTableMetadata;
    private final String pipelineName;
    private LeaderPartition leaderPartition;
    private List<String> tableNames;
    private StreamPartition streamPartition = null;
    private volatile boolean shutdownRequested = false;

    public LeaderScheduler(EnhancedSourceCoordinator sourceCoordinator, RdsSourceConfig sourceConfig, String s3Prefix, SchemaManager schemaManager, DbTableMetadata dbTableMetadata, String pipelineName) {
        this.sourceCoordinator = sourceCoordinator;
        this.sourceConfig = sourceConfig;
        this.s3Prefix = s3Prefix;
        this.schemaManager = schemaManager;
        this.dbTableMetadata = dbTableMetadata;
        this.pipelineName = pipelineName;
        this.tableNames = new ArrayList<String>(dbTableMetadata.getTableColumnDataTypeMap().keySet());
    }

    @Override
    public void run() {
        LOG.info("Starting Leader Scheduler for initialization.");
        while (!this.shutdownRequested && !Thread.currentThread().isInterrupted()) {
            try {
                LeaderProgressState leaderProgressState;
                Optional sourcePartition;
                if (this.leaderPartition == null && (sourcePartition = this.sourceCoordinator.acquireAvailablePartition("LEADER")).isPresent()) {
                    LOG.info("Running as a LEADER node.");
                    this.leaderPartition = (LeaderPartition)((Object)sourcePartition.get());
                }
                if (this.leaderPartition == null || (leaderProgressState = this.leaderPartition.getProgressState().get()).isInitialized()) continue;
                LOG.info("Performing initialization as LEADER node.");
                this.init();
            }
            catch (Exception e) {
                LOG.error("Exception occurred in primary leader scheduling loop", (Throwable)e);
            }
            finally {
                if (this.leaderPartition != null) {
                    this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.leaderPartition, Duration.ofMinutes(3L));
                }
                try {
                    Thread.sleep(DEFAULT_LEASE_INTERVAL.toMillis());
                }
                catch (InterruptedException e) {
                    LOG.info("InterruptedException occurred while waiting in leader scheduling loop.");
                    break;
                }
            }
        }
        LOG.warn("Quitting Leader Scheduler");
        if (this.leaderPartition != null) {
            this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)this.leaderPartition);
        }
    }

    public void shutdown() {
        this.shutdownRequested = true;
    }

    private void init() {
        LOG.info("Initializing RDS source service...");
        this.sourceCoordinator.createPartition((EnhancedSourcePartition)new GlobalState(this.sourceConfig.getDbIdentifier(), this.dbTableMetadata.toMap()));
        LOG.debug("Created global state for DB: {}", (Object)this.sourceConfig.getDbIdentifier());
        if (this.sourceConfig.isExportEnabled()) {
            LOG.debug("Export is enabled. Creating export partition in the source coordination store.");
            this.createExportPartition(this.sourceConfig);
        }
        if (this.sourceConfig.isStreamEnabled()) {
            LOG.debug("Stream is enabled. Creating stream partition in the source coordination store.");
            this.createStreamPartition(this.sourceConfig);
        }
        LOG.debug("Update initialization state");
        LeaderProgressState leaderProgressState = this.leaderPartition.getProgressState().get();
        leaderProgressState.setInitialized(true);
    }

    private void createExportPartition(RdsSourceConfig sourceConfig) {
        ExportProgressState progressState = new ExportProgressState();
        progressState.setEngineType(sourceConfig.getEngine().toString());
        progressState.setIamRoleArn(sourceConfig.getExport().getIamRoleArn());
        progressState.setBucket(sourceConfig.getS3Bucket());
        progressState.setPrefix(this.getS3PrefixForExport(this.s3Prefix));
        progressState.setTables(this.tableNames);
        progressState.setKmsKeyId(sourceConfig.getExport().getKmsKeyId());
        progressState.setPrimaryKeyMap(this.getPrimaryKeyMap());
        ExportPartition exportPartition = new ExportPartition(sourceConfig.getDbIdentifier(), sourceConfig.isCluster(), progressState);
        this.sourceCoordinator.createPartition((EnhancedSourcePartition)exportPartition);
    }

    private String getS3PrefixForExport(String givenS3Prefix) {
        return givenS3Prefix.isEmpty() ? S3_EXPORT_PREFIX : givenS3Prefix + "/rds";
    }

    private Map<String, List<String>> getPrimaryKeyMap() {
        return this.schemaManager.getPrimaryKeys(this.tableNames);
    }

    private Map<String, Set<String>> getPostgresEnumColumnsByTable() {
        return ((PostgresSchemaManager)this.schemaManager).getEnumColumns(this.tableNames);
    }

    private void createStreamPartition(RdsSourceConfig sourceConfig) {
        StreamProgressState progressState = new StreamProgressState();
        progressState.setEngineType(sourceConfig.getEngine().toString());
        progressState.setWaitForExport(sourceConfig.isExportEnabled());
        progressState.setPrimaryKeyMap(this.getPrimaryKeyMap());
        if (sourceConfig.getEngine().isMySql()) {
            MySqlStreamState mySqlStreamState = new MySqlStreamState();
            this.getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition);
            mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)this.schemaManager).getForeignKeyRelations(this.tableNames));
            progressState.setMySqlStreamState(mySqlStreamState);
        } else {
            String suffix = UUID.randomUUID().toString().substring(0, 8);
            String publicationName = this.generatePublicationName(suffix);
            String slotName = this.generateReplicationSlotName(suffix);
            ((PostgresSchemaManager)this.schemaManager).createLogicalReplicationSlot(this.tableNames, publicationName, slotName);
            PostgresStreamState postgresStreamState = new PostgresStreamState();
            postgresStreamState.setPublicationName(publicationName);
            postgresStreamState.setReplicationSlotName(slotName);
            postgresStreamState.setEnumColumnsByTable(this.getPostgresEnumColumnsByTable());
            progressState.setPostgresStreamState(postgresStreamState);
        }
        this.streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);
        this.sourceCoordinator.createPartition((EnhancedSourcePartition)this.streamPartition);
    }

    private Optional<BinlogCoordinate> getCurrentBinlogPosition() {
        Optional<BinlogCoordinate> binlogCoordinate = ((MySqlSchemaManager)this.schemaManager).getCurrentBinaryLogPosition();
        LOG.debug("Current binlog position: {}", binlogCoordinate.orElse(null));
        return binlogCoordinate;
    }

    private String generatePublicationName(String suffix) {
        return "data_prepper_" + this.getPipelineName() + "_pub_" + suffix;
    }

    private String generateReplicationSlotName(String suffix) {
        return "data_prepper_" + this.getPipelineName() + "_slot_" + suffix;
    }

    private String getPipelineName() {
        String shortenedPipelineName = this.pipelineName.length() <= 16 ? this.pipelineName : this.pipelineName.substring(0, 16);
        return shortenedPipelineName.replaceAll("[^a-zA-Z0-9_]", "_");
    }
}

