/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.stream.ChangeEventStatus;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamCheckpointer;
import org.postgresql.replication.LogSequenceNumber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamCheckpointManager {
    private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointManager.class);
    static final int REGULAR_CHECKPOINT_INTERVAL_MILLIS = 60000;
    static final int CHANGE_EVENT_COUNT_PER_CHECKPOINT_BATCH = 1000;
    static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets";
    static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets";
    static final String CHECKPOINT_COUNT = "checkpointCount";
    static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount";
    static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount";
    private final ConcurrentLinkedQueue<ChangeEventStatus> changeEventStatuses = new ConcurrentLinkedQueue();
    private final StreamCheckpointer streamCheckpointer;
    private final ExecutorService executorService;
    private final Runnable stopStreamRunnable;
    private final boolean isAcknowledgmentEnabled;
    private final AcknowledgementSetManager acknowledgementSetManager;
    private final Duration acknowledgmentTimeout;
    private final EngineType engineType;
    private final PluginMetrics pluginMetrics;
    private final Counter positiveAcknowledgementSets;
    private final Counter negativeAcknowledgementSets;
    private final Counter noDataExtendLeaseCount;
    private final Counter giveupPartitionCount;

    public StreamCheckpointManager(StreamCheckpointer streamCheckpointer, boolean isAcknowledgmentEnabled, AcknowledgementSetManager acknowledgementSetManager, Runnable stopStreamRunnable, Duration acknowledgmentTimeout, EngineType engineType, PluginMetrics pluginMetrics) {
        this.acknowledgementSetManager = acknowledgementSetManager;
        this.streamCheckpointer = streamCheckpointer;
        this.isAcknowledgmentEnabled = isAcknowledgmentEnabled;
        this.stopStreamRunnable = stopStreamRunnable;
        this.acknowledgmentTimeout = acknowledgmentTimeout;
        this.engineType = engineType;
        this.pluginMetrics = pluginMetrics;
        this.executorService = Executors.newSingleThreadExecutor();
        this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
        this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME);
        this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT);
        this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT);
    }

    public void start() {
        this.executorService.submit(this::runCheckpointing);
    }

    void runCheckpointing() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                ChangeEventStatus currentChangeEventStatus;
                if (this.changeEventStatuses.isEmpty()) {
                    LOG.debug("No records processed. Extend the lease on stream partition.");
                    this.noDataExtendLeaseCount.increment();
                    this.streamCheckpointer.extendLease();
                } else if (this.isAcknowledgmentEnabled) {
                    ChangeEventStatus lastChangeEventStatus = null;
                    currentChangeEventStatus = this.changeEventStatuses.peek();
                    while (currentChangeEventStatus != null && currentChangeEventStatus.isPositiveAcknowledgment()) {
                        lastChangeEventStatus = currentChangeEventStatus;
                        currentChangeEventStatus = this.changeEventStatuses.poll();
                    }
                    if (lastChangeEventStatus != null) {
                        this.checkpoint(this.engineType, lastChangeEventStatus);
                    }
                    if (currentChangeEventStatus != null && currentChangeEventStatus.isNegativeAcknowledgment()) {
                        LOG.info("Received negative acknowledgement for change event at {}. Will restart from most recent checkpoint", (Object)currentChangeEventStatus.getBinlogCoordinate());
                        this.streamCheckpointer.giveUpPartition();
                        this.giveupPartitionCount.increment();
                        break;
                    }
                } else {
                    int changeEventCount = 0;
                    do {
                        currentChangeEventStatus = this.changeEventStatuses.poll();
                        if (++changeEventCount % 1000 != 0) continue;
                        this.checkpoint(this.engineType, currentChangeEventStatus);
                    } while (!this.changeEventStatuses.isEmpty());
                    this.checkpoint(this.engineType, currentChangeEventStatus);
                }
            }
            catch (Exception e) {
                LOG.warn("Exception while checkpointing. The stream processing will start from previous checkpoint.", (Throwable)e);
                break;
            }
            try {
                Thread.sleep(60000L);
            }
            catch (InterruptedException ex) {
                // empty catch block
                break;
            }
        }
        this.stopStreamRunnable.run();
        this.stop();
    }

    public void stop() {
        this.executorService.shutdownNow();
    }

    public ChangeEventStatus saveChangeEventsStatus(BinlogCoordinate binlogCoordinate, long recordCount) {
        ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli(), recordCount);
        this.changeEventStatuses.add(changeEventStatus);
        return changeEventStatus;
    }

    public ChangeEventStatus saveChangeEventsStatus(LogSequenceNumber logSequenceNumber, long recordCount) {
        ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli(), recordCount);
        this.changeEventStatuses.add(changeEventStatus);
        return changeEventStatus;
    }

    public AcknowledgementSet createAcknowledgmentSet(BinlogCoordinate binlogCoordinate, long recordCount) {
        LOG.debug("Create acknowledgment set for events receive prior to {}", (Object)binlogCoordinate);
        ChangeEventStatus changeEventStatus = new ChangeEventStatus(binlogCoordinate, Instant.now().toEpochMilli(), recordCount);
        this.changeEventStatuses.add(changeEventStatus);
        return this.getAcknowledgementSet(changeEventStatus);
    }

    public AcknowledgementSet createAcknowledgmentSet(LogSequenceNumber logSequenceNumber, long recordCount) {
        LOG.debug("Create acknowledgment set for events receive prior to {}", (Object)logSequenceNumber);
        ChangeEventStatus changeEventStatus = new ChangeEventStatus(logSequenceNumber, Instant.now().toEpochMilli(), recordCount);
        this.changeEventStatuses.add(changeEventStatus);
        return this.getAcknowledgementSet(changeEventStatus);
    }

    private AcknowledgementSet getAcknowledgementSet(ChangeEventStatus changeEventStatus) {
        return this.acknowledgementSetManager.create(result -> {
            if (result.booleanValue()) {
                this.positiveAcknowledgementSets.increment();
                changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.POSITIVE_ACK);
            } else {
                this.negativeAcknowledgementSets.increment();
                changeEventStatus.setAcknowledgmentStatus(ChangeEventStatus.AcknowledgmentStatus.NEGATIVE_ACK);
            }
        }, this.acknowledgmentTimeout);
    }

    private void checkpoint(EngineType engineType, ChangeEventStatus changeEventStatus) {
        LOG.debug("Checkpoint at {} with record count {}. ", (Object)(engineType.isMySql() ? changeEventStatus.getBinlogCoordinate() : changeEventStatus.getLogSequenceNumber()), (Object)changeEventStatus.getRecordCount());
        this.streamCheckpointer.checkpoint(engineType, changeEventStatus);
    }

    ConcurrentLinkedQueue<ChangeEventStatus> getChangeEventStatuses() {
        return this.changeEventStatuses;
    }
}

