/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.mongo.export;

import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.converter.PartitionKeyRecordConverter;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.export.DataQueryPartitionCheckpoint;
import org.opensearch.dataprepper.plugins.mongo.export.ExportPartitionWorker;
import org.opensearch.dataprepper.plugins.mongo.model.ExportLoadStatus;
import org.opensearch.dataprepper.plugins.mongo.model.StreamLoadStatus;
import org.opensearch.dataprepper.plugins.mongo.utils.DocumentDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExportWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ExportWorker.class);
    private final AtomicInteger numOfWorkers = new AtomicInteger(0);
    static final String SUCCESS_PARTITION_COUNTER_NAME = "exportPartitionSuccessTotal";
    static final String FAILURE_PARTITION_COUNTER_NAME = "exportPartitionFailureTotal";
    static final String ACTIVE_EXPORT_PARTITION_CONSUMERS_GAUGE = "activeExportPartitionConsumers";
    private static final int MAX_JOB_COUNT = 1;
    private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 2000;
    private final int startLine;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final MongoDBSourceConfig sourceConfig;
    private final Counter successPartitionCounter;
    private final Counter failureParitionCounter;
    private final AtomicInteger activeExportPartitionConsumerGauge;
    static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60L);
    static final int DEFAULT_BUFFER_BATCH_SIZE = 10;
    private final RecordBufferWriter recordBufferWriter;
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final ExecutorService executor;
    private final PluginMetrics pluginMetrics;
    private final String s3PathPrefix;
    private final DocumentDBSourceAggregateMetrics documentDBAggregateMetrics;

    public ExportWorker(EnhancedSourceCoordinator sourceCoordinator, Buffer<Record<Event>> buffer, PluginMetrics pluginMetrics, AcknowledgementSetManager acknowledgementSetManager, MongoDBSourceConfig sourceConfig, String s3PathPrefix, DocumentDBSourceAggregateMetrics documentDBAggregateMetrics) {
        this.sourceCoordinator = sourceCoordinator;
        this.executor = Executors.newFixedThreadPool(1);
        BufferAccumulator bufferAccumulator = BufferAccumulator.create(buffer, (int)10, (Duration)BUFFER_TIMEOUT);
        this.recordBufferWriter = RecordBufferWriter.create((BufferAccumulator<Record<Event>>)bufferAccumulator, pluginMetrics);
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.sourceConfig = sourceConfig;
        this.startLine = 0;
        this.pluginMetrics = pluginMetrics;
        Preconditions.checkArgument((boolean)Objects.nonNull(s3PathPrefix), (Object)"S3 path prefix must not be null");
        this.s3PathPrefix = s3PathPrefix;
        this.documentDBAggregateMetrics = documentDBAggregateMetrics;
        this.successPartitionCounter = pluginMetrics.counter(SUCCESS_PARTITION_COUNTER_NAME);
        this.failureParitionCounter = pluginMetrics.counter(FAILURE_PARTITION_COUNTER_NAME);
        this.activeExportPartitionConsumerGauge = (AtomicInteger)pluginMetrics.gauge(ACTIVE_EXPORT_PARTITION_CONSUMERS_GAUGE, (Number)this.numOfWorkers);
    }

    @Override
    public void run() {
        LOG.info("Starting Export worker to process partitions for export");
        while (!Thread.currentThread().isInterrupted()) {
            DataQueryPartition dataQueryPartition = null;
            try {
                Optional sourcePartition;
                if (this.numOfWorkers.get() < 1 && (sourcePartition = this.sourceCoordinator.acquireAvailablePartition("DATA_QUERY")).isPresent()) {
                    dataQueryPartition = (DataQueryPartition)((Object)sourcePartition.get());
                    AcknowledgementSet acknowledgementSet = this.createAcknowledgementSet(dataQueryPartition).orElse(null);
                    String s3Prefix = this.s3PathPrefix + dataQueryPartition.getCollection();
                    DataQueryPartitionCheckpoint partitionCheckpoint = new DataQueryPartitionCheckpoint(this.sourceCoordinator, dataQueryPartition);
                    PartitionKeyRecordConverter recordConverter = new PartitionKeyRecordConverter(dataQueryPartition.getCollection(), "EXPORT", s3Prefix);
                    ExportPartitionWorker exportPartitionWorker = new ExportPartitionWorker(this.recordBufferWriter, recordConverter, dataQueryPartition, acknowledgementSet, this.sourceConfig, partitionCheckpoint, Instant.now().toEpochMilli(), this.pluginMetrics, this.documentDBAggregateMetrics);
                    CompletableFuture<Void> runLoader = CompletableFuture.runAsync(exportPartitionWorker, this.executor);
                    runLoader.whenComplete((BiConsumer)this.completePartitionLoader(dataQueryPartition));
                    this.numOfWorkers.incrementAndGet();
                    this.activeExportPartitionConsumerGauge.incrementAndGet();
                }
                try {
                    Thread.sleep(2000L);
                    continue;
                }
                catch (InterruptedException e) {
                    LOG.info("The ExportWorker was interrupted while waiting to retry, stopping processing");
                }
            }
            catch (Exception e) {
                LOG.error("Received an exception while processing an export data partition, backing off and retrying", (Throwable)e);
                if (dataQueryPartition != null) {
                    this.sourceCoordinator.giveUpPartition(dataQueryPartition);
                }
                try {
                    Thread.sleep(2000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.info("The ExportWorker was interrupted while waiting to retry, stopping processing");
                }
            }
            break;
        }
        LOG.warn("ExportWorker is interrupted, stopping all data partition loaders...");
        this.executor.shutdown();
        ExportPartitionWorker.stopAll();
    }

    private Optional<AcknowledgementSet> createAcknowledgementSet(DataQueryPartition partition) {
        if (this.sourceConfig.isAcknowledgmentsEnabled()) {
            return Optional.of(this.acknowledgementSetManager.create(result -> {
                if (result.booleanValue()) {
                    this.completeDataLoader(partition).accept(null, null);
                    LOG.info("Received acknowledgment of completion from sink for data query {}", (Object)partition.getPartitionKey());
                } else {
                    LOG.warn("Negative acknowledgment received for data query {}, retrying", (Object)partition.getPartitionKey());
                    this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)partition);
                }
            }, this.sourceConfig.getPartitionAcknowledgmentTimeout()));
        }
        return Optional.empty();
    }

    private BiConsumer<Void, Throwable> completeDataLoader(DataQueryPartition dataQueryPartition) {
        return (v, ex) -> {
            if (!this.sourceConfig.isAcknowledgmentsEnabled()) {
                this.numOfWorkers.decrementAndGet();
                this.activeExportPartitionConsumerGauge.decrementAndGet();
            }
            if (ex == null) {
                this.successPartitionCounter.increment();
                this.updateState(dataQueryPartition.getCollection(), dataQueryPartition.getProgressState().get().getLoadedRecords());
                this.sourceCoordinator.completePartition((EnhancedSourcePartition)dataQueryPartition);
            } else {
                this.giveUpPartition(dataQueryPartition, (Throwable)ex);
            }
        };
    }

    private void giveUpPartition(DataQueryPartition dataQueryPartition, Throwable ex) {
        LOG.error("Loading Data Query partition completed with an exception.", ex);
        this.failureParitionCounter.increment();
        this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)dataQueryPartition);
    }

    private BiConsumer<Void, Throwable> completePartitionLoader(DataQueryPartition dataQueryPartition) {
        if (!this.sourceConfig.isAcknowledgmentsEnabled()) {
            return this.completeDataLoader(dataQueryPartition);
        }
        return (v, ex) -> {
            if (ex != null) {
                this.giveUpPartition(dataQueryPartition, (Throwable)ex);
            }
            this.numOfWorkers.decrementAndGet();
            this.activeExportPartitionConsumerGauge.decrementAndGet();
        };
    }

    private void updateState(String collection, long loaded) {
        String exportPartitionKey = "EXPORT-" + collection;
        while (!Thread.currentThread().isInterrupted()) {
            Optional globalPartition = this.sourceCoordinator.getPartition(exportPartitionKey);
            if (globalPartition.isEmpty()) {
                LOG.error("Failed to get load status for " + exportPartitionKey);
                continue;
            }
            GlobalState globalState = (GlobalState)((Object)globalPartition.get());
            ExportLoadStatus exportLoadStatus = ExportLoadStatus.fromMap(globalState.getProgressState().get());
            exportLoadStatus.setLoadedPartitions(exportLoadStatus.getLoadedPartitions() + 1L);
            LOG.info("Current status: total {}, loaded {}", (Object)exportLoadStatus.getTotalPartitions(), (Object)exportLoadStatus.getLoadedPartitions());
            exportLoadStatus.setLoadedRecords(exportLoadStatus.getLoadedRecords() + loaded);
            exportLoadStatus.setLastUpdateTimestamp(Instant.now().toEpochMilli());
            globalState.setProgressState(exportLoadStatus.toMap());
            try {
                this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)globalState, null);
                if (!exportLoadStatus.isTotalParitionsComplete() || exportLoadStatus.getLoadedPartitions() != exportLoadStatus.getTotalPartitions()) break;
                LOG.info("All Exports are done, streaming can continue...");
                StreamLoadStatus streamLoadStatus = new StreamLoadStatus(Instant.now().toEpochMilli());
                this.sourceCoordinator.createPartition((EnhancedSourcePartition)new GlobalState("STREAM-" + collection, streamLoadStatus.toMap()));
                break;
            }
            catch (Exception e) {
                LOG.error("Failed to update the global status, looks like the status was out of date, will retry..");
            }
        }
    }
}

