/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.mongo.stream;

import com.google.common.annotations.VisibleForTesting;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.mongo.stream.CheckpointStatus;
import org.opensearch.dataprepper.plugins.mongo.stream.DataStreamPartitionCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamAcknowledgementManager {
    private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
    private static final int CHECKPOINT_RECORD_INTERVAL = 50;
    private static final int NO_ACK_PARTITION_TIME_OUT_SECONDS = 900;
    private final ConcurrentLinkedQueue<CheckpointStatus> checkpoints = new ConcurrentLinkedQueue();
    private final ConcurrentHashMap<String, CheckpointStatus> ackStatus = new ConcurrentHashMap();
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final DataStreamPartitionCheckpoint partitionCheckpoint;
    private final Duration partitionAcknowledgmentTimeout;
    private final int acknowledgementMonitorWaitTimeInMs;
    private final int checkPointIntervalInMs;
    private final ExecutorService executorService;
    private boolean enableAcknowledgement = false;
    private final Counter positiveAcknowledgementSets;
    private final Counter negativeAcknowledgementSets;
    private final Counter recordsCheckpointed;
    private final Counter noDataExtendLeaseCount;
    private final Counter giveupPartitionCount;
    public static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets";
    public static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets";
    public static final String RECORDS_CHECKPOINTED = "recordsCheckpointed";
    public static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount";
    public static final String GIVE_UP_PARTITION_COUNT = "giveUpPartitionCount";

    public StreamAcknowledgementManager(AcknowledgementSetManager acknowledgementSetManager, DataStreamPartitionCheckpoint partitionCheckpoint, Duration partitionAcknowledgmentTimeout, int acknowledgementMonitorWaitTimeInMs, int checkPointIntervalInMs, PluginMetrics pluginMetrics) {
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.partitionCheckpoint = partitionCheckpoint;
        this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout;
        this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs;
        this.checkPointIntervalInMs = checkPointIntervalInMs;
        this.executorService = Executors.newSingleThreadExecutor((ThreadFactory)BackgroundThreadFactory.defaultExecutorThreadFactory((String)"mongodb-stream-ack-monitor"));
        this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
        this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
        this.recordsCheckpointed = pluginMetrics.counter(RECORDS_CHECKPOINTED);
        this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT);
        this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT);
    }

    void init(Consumer<Void> stopWorkerConsumer) {
        this.enableAcknowledgement = true;
        this.executorService.submit(() -> this.monitorAcknowledgment(this.executorService, stopWorkerConsumer));
    }

    private void monitorAcknowledgment(ExecutorService executorService, Consumer<Void> stopWorkerConsumer) {
        long lastCheckpointTime = System.currentTimeMillis();
        CheckpointStatus lastCheckpointStatus = null;
        while (!Thread.currentThread().isInterrupted()) {
            block12: {
                try {
                    CheckpointStatus checkpointStatus = this.checkpoints.peek();
                    if (checkpointStatus != null) {
                        if (checkpointStatus.isPositiveAcknowledgement()) {
                            if (System.currentTimeMillis() - lastCheckpointTime >= (long)this.checkPointIntervalInMs) {
                                long ackCount = 0L;
                                do {
                                    lastCheckpointStatus = this.checkpoints.poll();
                                    this.ackStatus.remove(checkpointStatus.getResumeToken());
                                    checkpointStatus = this.checkpoints.peek();
                                    if (++ackCount % 50L != 0L && System.currentTimeMillis() - lastCheckpointTime < (long)this.checkPointIntervalInMs) continue;
                                    this.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
                                    lastCheckpointTime = System.currentTimeMillis();
                                } while (checkpointStatus != null && checkpointStatus.isPositiveAcknowledgement());
                                this.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
                                lastCheckpointTime = System.currentTimeMillis();
                            }
                        } else {
                            LOG.debug("Checkpoint not complete for resume token {}", (Object)checkpointStatus.getResumeToken());
                            if (checkpointStatus.isNegativeAcknowledgement()) {
                                LOG.warn("Negative Acknowledgement received for the checkpoint {}. Giving up partition.", (Object)checkpointStatus.getResumeToken());
                                this.giveUpPartition(lastCheckpointStatus);
                                break;
                            }
                            Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
                            if (!ackWaitDuration.minusSeconds(900L).isNegative()) {
                                LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", (Object)checkpointStatus.getResumeToken());
                                this.giveUpPartition(lastCheckpointStatus);
                                break;
                            }
                        }
                        break block12;
                    }
                    if (System.currentTimeMillis() - lastCheckpointTime < (long)this.checkPointIntervalInMs) break block12;
                    this.partitionCheckpoint.extendLease();
                    this.noDataExtendLeaseCount.increment();
                    lastCheckpointTime = System.currentTimeMillis();
                }
                catch (Exception e) {
                    LOG.warn("Exception monitoring acknowledgments. The stream record processing will start from previous checkpoint.", (Throwable)e);
                    break;
                }
            }
            try {
                Thread.sleep(this.acknowledgementMonitorWaitTimeInMs);
            }
            catch (InterruptedException ex) {
                // empty catch block
                break;
            }
        }
        stopWorkerConsumer.accept(null);
        executorService.shutdown();
    }

    private void giveUpPartition(CheckpointStatus lastCheckpointStatus) {
        if (lastCheckpointStatus != null && lastCheckpointStatus.isPositiveAcknowledgement()) {
            this.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
        }
        this.partitionCheckpoint.giveUpPartition();
        this.giveupPartitionCount.increment();
    }

    private void checkpoint(String resumeToken, long recordCount) {
        LOG.debug("Perform regular checkpointing for resume token {} at record count {}", (Object)resumeToken, (Object)recordCount);
        this.partitionCheckpoint.checkpoint(resumeToken, recordCount);
        this.recordsCheckpointed.increment();
    }

    Optional<AcknowledgementSet> createAcknowledgementSet(String resumeToken, long recordNumber) {
        if (!this.enableAcknowledgement) {
            return Optional.empty();
        }
        CheckpointStatus checkpointStatus = new CheckpointStatus(resumeToken, recordNumber, Instant.now().toEpochMilli());
        this.checkpoints.add(checkpointStatus);
        this.ackStatus.put(resumeToken, checkpointStatus);
        LOG.debug("Creating acknowledgment for resumeToken {}", (Object)checkpointStatus.getResumeToken());
        return Optional.of(this.acknowledgementSetManager.create(result -> {
            CheckpointStatus ackCheckpointStatus = this.ackStatus.get(resumeToken);
            ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli());
            if (result.booleanValue()) {
                this.positiveAcknowledgementSets.increment();
                ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK);
                LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", (Object)resumeToken);
            } else {
                this.negativeAcknowledgementSets.increment();
                ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK);
                LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", (Object)resumeToken);
            }
        }, this.partitionAcknowledgmentTimeout));
    }

    void shutdown() {
        this.executorService.shutdown();
    }

    @VisibleForTesting
    ConcurrentHashMap<String, CheckpointStatus> getAcknowledgementStatus() {
        return this.ackStatus;
    }

    @VisibleForTesting
    ConcurrentLinkedQueue<CheckpointStatus> getCheckpoints() {
        return this.checkpoints;
    }
}

