/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.source.rds.stream;

import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.util.Optional;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.stream.ChangeEventStatus;
import org.postgresql.replication.LogSequenceNumber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamCheckpointer {
    private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointer.class);
    static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5L);
    static final String CHECKPOINT_COUNT = "checkpointCount";
    private final EnhancedSourceCoordinator sourceCoordinator;
    private final StreamPartition streamPartition;
    private final PluginMetrics pluginMetrics;
    private final Counter checkpointCounter;

    public StreamCheckpointer(EnhancedSourceCoordinator sourceCoordinator, StreamPartition streamPartition, PluginMetrics pluginMetrics) {
        this.sourceCoordinator = sourceCoordinator;
        this.streamPartition = streamPartition;
        this.pluginMetrics = pluginMetrics;
        this.checkpointCounter = pluginMetrics.counter(CHECKPOINT_COUNT);
    }

    public void checkpoint(EngineType engineType, ChangeEventStatus changeEventStatus) {
        if (engineType.isMySql()) {
            this.checkpoint(changeEventStatus.getBinlogCoordinate());
        } else if (engineType.isPostgres()) {
            this.checkpoint(changeEventStatus.getLogSequenceNumber());
        } else {
            throw new IllegalArgumentException("Unsupported engine type " + String.valueOf((Object)engineType));
        }
    }

    private void checkpoint(BinlogCoordinate binlogCoordinate) {
        LOG.debug("Checkpointing stream partition {} with binlog coordinate {}", (Object)this.streamPartition.getPartitionKey(), (Object)binlogCoordinate);
        Optional<StreamProgressState> progressState = this.streamPartition.getProgressState();
        progressState.get().getMySqlStreamState().setCurrentPosition(binlogCoordinate);
        this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
        this.checkpointCounter.increment();
    }

    private void checkpoint(LogSequenceNumber logSequenceNumber) {
        LOG.debug("Checkpointing stream partition {} with log sequence number {}", (Object)this.streamPartition.getPartitionKey(), (Object)logSequenceNumber);
        Optional<StreamProgressState> progressState = this.streamPartition.getProgressState();
        progressState.get().getPostgresStreamState().setCurrentLsn(logSequenceNumber.asString());
        this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
        this.checkpointCounter.increment();
    }

    public void extendLease() {
        LOG.debug("Extending lease of stream partition {}", (Object)this.streamPartition.getPartitionKey());
        this.sourceCoordinator.saveProgressStateForPartition((EnhancedSourcePartition)this.streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
    }

    public void giveUpPartition() {
        LOG.debug("Giving up stream partition {}", (Object)this.streamPartition.getPartitionKey());
        this.sourceCoordinator.giveUpPartition((EnhancedSourcePartition)this.streamPartition);
    }
}

