/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader;

import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsReportEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitAssignedEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitMetaEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitMetaRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitUpdateAckEvent;
import org.apache.flink.cdc.connectors.base.source.meta.events.StreamSplitUpdateRequestEvent;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplitState;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class IncrementalSourceReader<T, C extends SourceConfig>
extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    protected final Map<String, StreamSplit> uncompletedStreamSplits;
    protected volatile StreamSplit suspendedStreamSplit;
    private final int subtaskId;
    private final SourceSplitSerializer sourceSplitSerializer;
    protected final C sourceConfig;
    protected final DataSourceDialect<C> dialect;
    private final IncrementalSourceReaderContext incrementalSourceReaderContext;

    public IncrementalSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue, Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier, RecordEmitter<SourceRecords, T, SourceSplitState> recordEmitter, Configuration config, IncrementalSourceReaderContext incrementalSourceReaderContext, C sourceConfig, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect<C> dialect) {
        super(elementQueue, new SingleThreadFetcherManager(elementQueue, splitReaderSupplier::get), recordEmitter, config, incrementalSourceReaderContext.getSourceReaderContext());
        this.sourceConfig = sourceConfig;
        this.finishedUnackedSplits = new HashMap<String, SnapshotSplit>();
        this.uncompletedStreamSplits = new HashMap<String, StreamSplit>();
        this.subtaskId = this.context.getIndexOfSubtask();
        this.sourceSplitSerializer = (SourceSplitSerializer)Preconditions.checkNotNull((Object)sourceSplitSerializer);
        this.dialect = dialect;
        this.incrementalSourceReaderContext = incrementalSourceReaderContext;
        this.suspendedStreamSplit = null;
    }

    public void start() {
        if (this.getNumberOfCurrentlyAssignedSplits() <= 1) {
            this.context.sendSplitRequest();
        }
    }

    protected SourceSplitState initializedState(SourceSplitBase split) {
        if (split.isSnapshotSplit()) {
            return new SnapshotSplitState(split.asSnapshotSplit());
        }
        return new StreamSplitState(split.asStreamSplit());
    }

    public List<SourceSplitBase> snapshotState(long checkpointId) {
        List stateSplits = super.snapshotState(checkpointId);
        List<SourceSplitBase> unfinishedSplits = stateSplits.stream().filter(split -> !this.finishedUnackedSplits.containsKey(split.splitId())).collect(Collectors.toList());
        unfinishedSplits.addAll(this.finishedUnackedSplits.values());
        unfinishedSplits.addAll(this.uncompletedStreamSplits.values());
        if (this.suspendedStreamSplit != null) {
            unfinishedSplits.add(this.suspendedStreamSplit);
        }
        this.logCurrentStreamOffsets(unfinishedSplits, checkpointId);
        return unfinishedSplits;
    }

    protected void onSplitFinished(Map<String, SourceSplitState> finishedSplitIds) {
        boolean requestNextSplit = true;
        if (this.isNewlyAddedTableSplitAndStreamSplit(finishedSplitIds)) {
            SourceSplitState streamSplitState = finishedSplitIds.remove("stream-split");
            finishedSplitIds.values().forEach(newAddedSplitState -> this.finishedUnackedSplits.put(newAddedSplitState.toSourceSplit().splitId(), newAddedSplitState.asSnapshotSplitState().toSourceSplit()));
            Preconditions.checkState((finishedSplitIds.values().size() == 1 ? 1 : 0) != 0);
            LOG.info("Source reader {} finished stream split and snapshot split {}", (Object)this.subtaskId, (Object)finishedSplitIds.values().iterator().next().toSourceSplit().splitId());
            this.addSplits(Collections.singletonList(streamSplitState.toSourceSplit()));
        } else {
            Preconditions.checkState((finishedSplitIds.size() == 1 ? 1 : 0) != 0);
            for (SourceSplitState splitState : finishedSplitIds.values()) {
                SourceSplitBase sourceSplit = splitState.toSourceSplit();
                if (sourceSplit.isStreamSplit()) {
                    if (!this.incrementalSourceReaderContext.isStreamSplitReaderSuspended()) continue;
                    this.suspendedStreamSplit = StreamSplit.toSuspendedStreamSplit(sourceSplit.asStreamSplit());
                    LOG.info("Source reader {} suspended stream split reader success after the newly added table process, current offset {}", (Object)this.subtaskId, (Object)this.suspendedStreamSplit.getStartingOffset());
                    this.context.sendSourceEventToCoordinator((SourceEvent)new LatestFinishedSplitsNumberRequestEvent());
                    requestNextSplit = false;
                    continue;
                }
                this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
            }
            this.reportFinishedSnapshotSplitsIfNeed();
        }
        if (requestNextSplit) {
            this.context.sendSplitRequest();
        }
    }

    private boolean isNewlyAddedTableSplitAndStreamSplit(Map<String, SourceSplitState> finishedSplitIds) {
        return finishedSplitIds.containsKey("stream-split") && finishedSplitIds.size() == 2;
    }

    public void addSplits(List<SourceSplitBase> splits) {
        this.addSplits(splits, true);
    }

    private void addSplits(List<SourceSplitBase> splits, boolean checkTableChangeForStreamSplit) {
        ArrayList<SourceSplitBase> unfinishedSplits = new ArrayList<SourceSplitBase>();
        for (SourceSplitBase split : splits) {
            if (split.isSnapshotSplit()) {
                SnapshotSplit snapshotSplit = split.asSnapshotSplit();
                if (this.dialect.isIncludeDataCollection(this.sourceConfig, snapshotSplit.getTableId())) {
                    if (snapshotSplit.isSnapshotReadFinished()) {
                        this.finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
                        continue;
                    }
                    unfinishedSplits.add(split);
                    continue;
                }
                LOG.info("The subtask {} is skipping split {} because it does not match new table filter.", (Object)this.subtaskId, (Object)split.splitId());
                continue;
            }
            StreamSplit streamSplit = split.asStreamSplit();
            if (checkTableChangeForStreamSplit) {
                LOG.info("before checkTableChangeForStreamSplit: " + streamSplit);
                streamSplit = StreamSplit.filterOutdatedSplitInfos(streamSplit, tableId -> this.dialect.isIncludeDataCollection(this.sourceConfig, (TableId)tableId));
                LOG.info("after checkTableChangeForStreamSplit: " + streamSplit);
            }
            boolean checkNewlyAddedTableSchema = !this.incrementalSourceReaderContext.isHasAssignedStreamSplit() && this.sourceConfig.isScanNewlyAddedTableEnabled();
            this.incrementalSourceReaderContext.setHasAssignedStreamSplit(true);
            if (streamSplit.isSuspended()) {
                this.suspendedStreamSplit = streamSplit;
            } else if (!streamSplit.isCompletedSplit()) {
                this.uncompletedStreamSplits.put(split.splitId(), split.asStreamSplit());
                this.requestStreamSplitMetaIfNeeded(split.asStreamSplit());
            } else {
                this.uncompletedStreamSplits.remove(split.splitId());
                streamSplit = this.discoverTableSchemasForStreamSplit(streamSplit, checkNewlyAddedTableSchema);
                unfinishedSplits.add(streamSplit);
            }
            LOG.info("Source reader {} received the stream split : {}.", (Object)this.subtaskId, (Object)streamSplit);
            this.context.sendSourceEventToCoordinator((SourceEvent)new StreamSplitAssignedEvent());
        }
        this.reportFinishedSnapshotSplitsIfNeed();
        if (!unfinishedSplits.isEmpty()) {
            super.addSplits(unfinishedSplits);
        } else if (this.suspendedStreamSplit != null || this.getNumberOfCurrentlyAssignedSplits() <= 1) {
            this.context.sendSplitRequest();
        }
    }

    private StreamSplit discoverTableSchemasForStreamSplit(StreamSplit split, boolean checkNewlyAddedTableSchema) {
        String splitId = split.splitId();
        if (split.getTableSchemas().isEmpty() || checkNewlyAddedTableSchema) {
            try {
                Map<TableId, TableChanges.TableChange> existTableSchemas = split.getTableSchemas();
                Map<TableId, TableChanges.TableChange> tableSchemas = this.dialect.discoverDataCollectionSchemas(this.sourceConfig);
                tableSchemas.putAll(existTableSchemas);
                LOG.info("Source reader {} discovers table schema for stream split {} success", (Object)this.subtaskId, (Object)splitId);
                return StreamSplit.fillTableSchemas(split, tableSchemas);
            }
            catch (Exception e) {
                LOG.error("Source reader {} failed to obtains table schemas due to {}", (Object)this.subtaskId, (Object)e.getMessage());
                throw new FlinkRuntimeException((Throwable)e);
            }
        }
        LOG.warn("Source reader {} skip the table schema discovery, the stream split {} has table schemas yet.", (Object)this.subtaskId, (Object)split);
        return split;
    }

    private Set<String> getExistedSplitsOfLastGroup(List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
        int splitsNumOfLastGroup = finishedSnapshotSplits.size() % this.sourceConfig.getSplitMetaGroupSize();
        if (splitsNumOfLastGroup != 0) {
            int lastGroupStart = finishedSnapshotSplits.size() / this.sourceConfig.getSplitMetaGroupSize() * metaGroupSize;
            List sortedFinishedSnapshotSplits = finishedSnapshotSplits.stream().map(FinishedSnapshotSplitInfo::getSplitId).sorted().collect(Collectors.toList());
            return new HashSet<String>(sortedFinishedSnapshotSplits.subList(lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
        }
        return new HashSet<String>();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent)sourceEvent;
            LOG.debug("The subtask {} receives ack event for {} from enumerator.", (Object)this.subtaskId, (Object)ackEvent.getFinishedSplits());
            for (String splitId : ackEvent.getFinishedSplits()) {
                this.finishedUnackedSplits.remove(splitId);
            }
        } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            LOG.debug("The subtask {} receives request to report finished snapshot splits.", (Object)this.subtaskId);
            this.reportFinishedSnapshotSplitsIfNeed();
        } else if (sourceEvent instanceof StreamSplitMetaEvent) {
            LOG.debug("The subtask {} receives stream meta with group id {}.", (Object)this.subtaskId, (Object)((StreamSplitMetaEvent)sourceEvent).getMetaGroupId());
            this.fillMetaDataForStreamSplit((StreamSplitMetaEvent)sourceEvent);
        } else if (sourceEvent instanceof StreamSplitUpdateRequestEvent) {
            this.suspendStreamSplitReader();
        } else if (sourceEvent instanceof LatestFinishedSplitsNumberEvent) {
            this.updateStreamSplitFinishedSplitsSize((LatestFinishedSplitsNumberEvent)sourceEvent);
        } else {
            super.handleSourceEvents(sourceEvent);
        }
    }

    private void suspendStreamSplitReader() {
        this.incrementalSourceReaderContext.suspendStreamSplitReader();
    }

    private void fillMetaDataForStreamSplit(StreamSplitMetaEvent metadataEvent) {
        StreamSplit streamSplit = this.uncompletedStreamSplits.get(metadataEvent.getSplitId());
        if (streamSplit != null) {
            int receivedMetaGroupId = metadataEvent.getMetaGroupId();
            int receivedTotalFinishedSplitSize = metadataEvent.getTotalFinishedSplitSize();
            int expectedMetaGroupId = IncrementalSourceReader.getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
            if (receivedTotalFinishedSplitSize < streamSplit.getTotalFinishedSplitSize()) {
                LOG.warn("Source reader {} receives out of bound finished split size. The received finished split size is {}, but expected is {}, truncate it", this.subtaskId, receivedTotalFinishedSplitSize, streamSplit.getTotalFinishedSplitSize());
                streamSplit = StreamSplit.toNormalStreamSplit(streamSplit, receivedTotalFinishedSplitSize);
                this.uncompletedStreamSplits.put(streamSplit.splitId(), streamSplit);
            } else if (receivedMetaGroupId == expectedMetaGroupId) {
                Set<String> existedSplitsOfLastGroup = this.getExistedSplitsOfLastGroup(streamSplit.getFinishedSnapshotSplitInfos(), this.sourceConfig.getSplitMetaGroupSize());
                List<FinishedSnapshotSplitInfo> newAddedMetadataGroup = metadataEvent.getMetaGroup().stream().map(this.sourceSplitSerializer::deserialize).filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId())).collect(Collectors.toList());
                this.uncompletedStreamSplits.put(streamSplit.splitId(), StreamSplit.appendFinishedSplitInfos(streamSplit, newAddedMetadataGroup));
                LOG.info("Fill metadata of group {} to stream split", (Object)newAddedMetadataGroup.size());
            } else {
                LOG.warn("Received out of oder metadata event for split {}, the received meta group id is {}, but expected is {}, ignore it", metadataEvent.getSplitId(), receivedMetaGroupId, expectedMetaGroupId);
            }
            this.requestStreamSplitMetaIfNeeded(this.uncompletedStreamSplits.get(streamSplit.splitId()));
        } else {
            LOG.warn("Received metadata event for split {}, but the uncompleted split map does not contain it", (Object)metadataEvent.getSplitId());
        }
    }

    private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) {
        String splitId = streamSplit.splitId();
        if (!streamSplit.isCompletedSplit()) {
            int nextMetaGroupId = IncrementalSourceReader.getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
            StreamSplitMetaRequestEvent splitMetaRequestEvent = new StreamSplitMetaRequestEvent(splitId, nextMetaGroupId, streamSplit.getTotalFinishedSplitSize());
            this.context.sendSourceEventToCoordinator((SourceEvent)splitMetaRequestEvent);
        } else {
            LOG.info("The meta of stream split {} has been collected success", (Object)splitId);
            this.addSplits(Collections.singletonList(streamSplit));
        }
    }

    protected void updateStreamSplitFinishedSplitsSize(LatestFinishedSplitsNumberEvent sourceEvent) {
        if (this.suspendedStreamSplit != null) {
            int finishedSplitsSize = sourceEvent.getLatestFinishedSplitsNumber();
            StreamSplit streamSplit = StreamSplit.toNormalStreamSplit(this.suspendedStreamSplit, finishedSplitsSize);
            this.suspendedStreamSplit = null;
            this.addSplits(Collections.singletonList(streamSplit), false);
            this.context.sendSourceEventToCoordinator((SourceEvent)new StreamSplitUpdateAckEvent());
            LOG.info("Source reader {} notifies enumerator that stream split has been updated.", (Object)this.subtaskId);
            this.incrementalSourceReaderContext.wakeupSuspendedStreamSplitReader();
            LOG.info("Source reader {} wakes up suspended stream reader as stream split has been updated.", (Object)this.subtaskId);
        } else {
            LOG.warn("Unexpected event {}, this should not happen.", (Object)sourceEvent);
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (!this.finishedUnackedSplits.isEmpty()) {
            HashMap<String, Offset> finishedOffsets = new HashMap<String, Offset>();
            for (SnapshotSplit split : this.finishedUnackedSplits.values()) {
                finishedOffsets.put(split.splitId(), split.getHighWatermark());
            }
            FinishedSnapshotSplitsReportEvent reportEvent = new FinishedSnapshotSplitsReportEvent(finishedOffsets);
            this.context.sendSourceEventToCoordinator((SourceEvent)reportEvent);
            LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.", (Object)this.subtaskId, (Object)finishedOffsets);
        }
    }

    public static int getNextMetaGroupId(int receivedMetaNum, int metaGroupSize) {
        Preconditions.checkState((metaGroupSize > 0 ? 1 : 0) != 0);
        return receivedMetaNum / metaGroupSize;
    }

    protected SourceSplitBase toSplitType(String splitId, SourceSplitState splitState) {
        return splitState.toSourceSplit();
    }

    private void logCurrentStreamOffsets(List<SourceSplitBase> splits, long checkpointId) {
        if (!LOG.isInfoEnabled()) {
            return;
        }
        for (SourceSplitBase split : splits) {
            if (!split.isStreamSplit()) {
                return;
            }
            Offset offset = split.asStreamSplit().getStartingOffset();
            LOG.info("Stream split offset on checkpoint {}: {}", (Object)checkpointId, (Object)offset);
        }
    }

    @VisibleForTesting
    public Map<String, SnapshotSplit> getFinishedUnackedSplits() {
        return this.finishedUnackedSplits;
    }
}

