/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.mongo.stream;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.mongo.model.StreamLoadStatus;
import org.opensearch.dataprepper.plugins.mongo.s3partition.S3FolderPartitionCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataStreamPartitionCheckpoint
extends S3FolderPartitionCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(DataStreamPartitionCheckpoint.class);
    static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5L);
    private final EnhancedSourceCoordinator enhancedSourceCoordinator;
    private final StreamPartition streamPartition;

    public DataStreamPartitionCheckpoint(EnhancedSourceCoordinator enhancedSourceCoordinator, StreamPartition streamPartition) {
        super(enhancedSourceCoordinator);
        this.enhancedSourceCoordinator = enhancedSourceCoordinator;
        this.streamPartition = streamPartition;
    }

    private void setProgressState(String resumeToken, long recordNumber) {
        Optional<StreamProgressState> progressState = this.streamPartition.getProgressState();
        progressState.get().setResumeToken(resumeToken);
        progressState.get().setLoadedRecords(recordNumber);
        progressState.get().setLastUpdateTimestamp(Instant.now().toEpochMilli());
    }

    public void checkpoint(String resumeToken, long recordCount) {
        LOG.debug("Checkpoint stream partition for collection {} with record number {}", (Object)this.streamPartition.getCollection(), (Object)recordCount);
        this.setProgressState(resumeToken, recordCount);
        this.enhancedSourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
    }

    public void extendLease() {
        LOG.debug("Extending lease of stream partition for collection {}", (Object)this.streamPartition.getCollection());
        this.enhancedSourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
    }

    public void resetCheckpoint() {
        LOG.debug("Resetting checkpoint stream partition for collection {}", (Object)this.streamPartition.getCollection());
        this.setProgressState(null, 0L);
        this.enhancedSourceCoordinator.giveUpPartition((EnhancedSourcePartition)this.streamPartition);
        System.clearProperty("STOP_S3_SCAN_PROCESSING");
    }

    public Optional<StreamLoadStatus> getGlobalStreamLoadStatus() {
        Optional partition = this.enhancedSourceCoordinator.getPartition("STREAM-" + this.streamPartition.getPartitionKey());
        if (partition.isPresent()) {
            GlobalState globalState = (GlobalState)((Object)partition.get());
            return Optional.of(StreamLoadStatus.fromMap(globalState.getProgressState().get()));
        }
        return Optional.empty();
    }

    public void updateStreamPartitionForAcknowledgmentWait(Duration acknowledgmentSetTimeout) {
        this.enhancedSourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.streamPartition, acknowledgmentSetTimeout);
    }

    public void giveUpPartition() {
        this.enhancedSourceCoordinator.giveUpPartition((EnhancedSourcePartition)this.streamPartition);
        System.clearProperty("STOP_S3_SCAN_PROCESSING");
    }
}

