/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source.reader;

import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
import org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class PostgresSourceReader
extends IncrementalSourceReaderWithCommit {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceReader.class);
    private volatile boolean isCommitOffset = false;
    private final PriorityQueue<Long> minHeap;
    private final int lsnCommitCheckpointsDelay;

    public PostgresSourceReader(FutureCompletingBlockingQueue elementQueue, Supplier supplier, RecordEmitter recordEmitter, Configuration config, IncrementalSourceReaderContext incrementalSourceReaderContext, SourceConfig sourceConfig, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect dialect) {
        super(elementQueue, supplier, recordEmitter, config, incrementalSourceReaderContext, sourceConfig, sourceSplitSerializer, dialect);
        this.lsnCommitCheckpointsDelay = ((PostgresSourceConfig)sourceConfig).getLsnCommitCheckpointsDelay();
        this.minHeap = new PriorityQueue();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof OffsetCommitEvent) {
            this.isCommitOffset = ((OffsetCommitEvent)sourceEvent).isCommitOffset();
            this.context.sendSourceEventToCoordinator((SourceEvent)new OffsetCommitAckEvent());
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    protected void updateStreamSplitFinishedSplitsSize(LatestFinishedSplitsNumberEvent sourceEvent) {
        super.updateStreamSplitFinishedSplitsSize(sourceEvent);
        this.isCommitOffset = true;
    }

    public List<SourceSplitBase> snapshotState(long checkpointId) {
        List sourceSplitBases = super.snapshotState(checkpointId);
        if (!this.isCommitOffset()) {
            LOG.debug("Close offset commit of checkpoint {}", (Object)checkpointId);
            this.lastCheckpointOffsets.remove(checkpointId);
        }
        return sourceSplitBases;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.minHeap.add(checkpointId);
        if (this.minHeap.size() <= this.lsnCommitCheckpointsDelay) {
            LOG.info("Pending checkpoints '{}'.", this.minHeap);
            return;
        }
        long checkpointIdToCommit = this.minHeap.poll();
        LOG.info("Pending checkpoints '{}', to be committed checkpoint id '{}'.", this.minHeap, (Object)checkpointIdToCommit);
        if (this.isCommitOffset()) {
            super.notifyCheckpointComplete(checkpointIdToCommit);
        }
    }

    private boolean isCommitOffset() {
        return !this.sourceConfig.isScanNewlyAddedTableEnabled() || this.isCommitOffset;
    }
}

