/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import io.micrometer.core.instrument.Counter;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.DynamoDBSourceConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.DataFilePartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoader;
import org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileLoaderFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.TableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataFileScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileScheduler.class);
    private final AtomicInteger numOfWorkers = new AtomicInteger(0);
    private static final int MAX_JOB_COUNT = 1;
    private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2000;
    static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed";
    static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers";
    private final EnhancedSourceCoordinator coordinator;
    private final ExecutorService executor;
    private final DataFileLoaderFactory loaderFactory;
    private final PluginMetrics pluginMetrics;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final DynamoDBSourceConfig dynamoDBSourceConfig;
    private final Counter exportFileSuccessCounter;
    private final AtomicInteger activeExportS3ObjectConsumersGauge;

    public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFactory loaderFactory, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager, DynamoDBSourceConfig dynamoDBSourceConfig) {
        this.coordinator = coordinator;
        this.pluginMetrics = pluginMetrics;
        this.loaderFactory = loaderFactory;
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.dynamoDBSourceConfig = dynamoDBSourceConfig;
        this.executor = Executors.newFixedThreadPool(1);
        this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT);
        this.activeExportS3ObjectConsumersGauge = (AtomicInteger)pluginMetrics.gauge(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, (Number)this.numOfWorkers);
    }

    private void processDataFilePartition(DataFilePartition dataFilePartition) {
        String exportArn = dataFilePartition.getExportArn();
        String tableArn = TableUtil.getTableArnFromExportArn(exportArn);
        TableInfo tableInfo = this.getTableInfo(tableArn);
        boolean acknowledgmentsEnabled = this.dynamoDBSourceConfig.isAcknowledgmentsEnabled();
        AcknowledgementSet acknowledgementSet = null;
        if (acknowledgmentsEnabled) {
            acknowledgementSet = this.acknowledgementSetManager.create(result -> {
                if (result.booleanValue()) {
                    this.completeDataLoader(dataFilePartition).accept(null, null);
                    LOG.info("Received acknowledgment of completion from sink for data file {}", (Object)dataFilePartition.getKey());
                } else {
                    LOG.warn("Negative acknowledgment received for data file {}, retrying", (Object)dataFilePartition.getKey());
                    this.coordinator.giveUpPartition((EnhancedSourcePartition)dataFilePartition);
                }
            }, this.dynamoDBSourceConfig.getDataFileAcknowledgmentTimeout());
        }
        Runnable loader = this.loaderFactory.createDataFileLoader(dataFilePartition, tableInfo, acknowledgementSet, this.dynamoDBSourceConfig.getDataFileAcknowledgmentTimeout());
        CompletableFuture<Void> runLoader = CompletableFuture.runAsync(loader, this.executor);
        if (!acknowledgmentsEnabled) {
            runLoader.whenComplete(this.completeDataLoader(dataFilePartition));
        } else {
            runLoader.whenComplete((v, ex) -> {
                if (ex != null) {
                    LOG.error("There was an exception while processing an S3 data file: {}", ex);
                    this.coordinator.giveUpPartition((EnhancedSourcePartition)dataFilePartition);
                }
                this.numOfWorkers.decrementAndGet();
            });
        }
        this.numOfWorkers.incrementAndGet();
    }

    @Override
    public void run() {
        LOG.debug("Starting Data File Scheduler to process S3 data files for export");
        while (!Thread.currentThread().isInterrupted()) {
            try {
                Optional sourcePartition;
                if (this.numOfWorkers.get() < 1 && (sourcePartition = this.coordinator.acquireAvailablePartition("DATAFILE")).isPresent()) {
                    DataFilePartition dataFilePartition = (DataFilePartition)((Object)sourcePartition.get());
                    this.processDataFilePartition(dataFilePartition);
                }
                try {
                    Thread.sleep(2000L);
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            catch (Exception e) {
                LOG.error("Received an exception while processing an S3 data file, backing off and retrying", (Throwable)e);
                try {
                    Thread.sleep(2000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.info("The DataFileScheduler was interrupted while waiting to retry, stopping processing");
                }
            }
            break;
        }
        LOG.warn("Data file scheduler is interrupted, stopping all data file loaders...");
        this.executor.shutdown();
        DataFileLoader.stopAll();
    }

    private TableInfo getTableInfo(String tableArn) {
        GlobalState tableState = (GlobalState)((Object)this.coordinator.getPartition(tableArn).get());
        TableInfo tableInfo = new TableInfo(tableArn, TableMetadata.fromMap(tableState.getProgressState().get()));
        return tableInfo;
    }

    private String getStreamArn(String exportArn) {
        String tableArn = TableUtil.getTableArnFromExportArn(exportArn);
        TableInfo tableInfo = this.getTableInfo(tableArn);
        if (tableInfo.getMetadata().isStreamRequired()) {
            return tableInfo.getMetadata().getStreamArn();
        }
        return null;
    }

    private BiConsumer completeDataLoader(DataFilePartition dataFilePartition) {
        return (v, ex) -> {
            if (!this.dynamoDBSourceConfig.isAcknowledgmentsEnabled()) {
                this.numOfWorkers.decrementAndGet();
                if (this.numOfWorkers.get() == 0) {
                    this.activeExportS3ObjectConsumersGauge.decrementAndGet();
                }
            }
            if (ex == null) {
                this.exportFileSuccessCounter.increment();
                this.updateState(dataFilePartition.getExportArn(), dataFilePartition.getProgressState().get().getLoaded());
                this.coordinator.completePartition((EnhancedSourcePartition)dataFilePartition);
            } else {
                LOG.error("Loading S3 data files completed with an exception: {}", ex);
                this.coordinator.giveUpPartition((EnhancedSourcePartition)dataFilePartition);
            }
        };
    }

    private void updateState(String exportArn, int loaded) {
        String streamArn = this.getStreamArn(exportArn);
        while (true) {
            Optional globalPartition;
            if ((globalPartition = this.coordinator.getPartition(exportArn)).isEmpty()) {
                LOG.error("Failed to get load status for " + exportArn);
                return;
            }
            GlobalState globalState = (GlobalState)((Object)globalPartition.get());
            LoadStatus loadStatus = LoadStatus.fromMap(globalState.getProgressState().get());
            loadStatus.setLoadedFiles(loadStatus.getLoadedFiles() + 1);
            LOG.info("Current status: total {} loaded {}", (Object)loadStatus.getTotalFiles(), (Object)loadStatus.getLoadedFiles());
            loadStatus.setLoadedRecords(loadStatus.getLoadedRecords() + (long)loaded);
            globalState.setProgressState(loadStatus.toMap());
            try {
                this.coordinator.saveProgressStateForPartition((EnhancedSourcePartition)globalState, null);
                if (streamArn == null || loadStatus.getLoadedFiles() < loadStatus.getTotalFiles()) break;
                LOG.info("All Exports are done, streaming can continue...");
                this.coordinator.createPartition((EnhancedSourcePartition)new GlobalState(streamArn, Optional.empty()));
            }
            catch (Exception e) {
                LOG.error("Failed to update the global status, looks like the status was out of date, will retry..");
                continue;
            }
            break;
        }
    }
}

