/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSplitAssigner<C extends SourceConfig>
implements SplitAssigner {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSplitAssigner.class);
    protected static final String INCREMENTAL_SPLIT_ID = "incremental-split-%d";
    private final SplitAssigner.Context<C> context;
    private final int incrementalParallelism;
    private final OffsetFactory offsetFactory;
    private final Map<TableId, Offset> tableWatermarks = new HashMap<TableId, Offset>();
    private boolean splitAssigned = false;
    private final List<IncrementalSplit> remainingSplits = new ArrayList<IncrementalSplit>();
    private final Map<String, IncrementalSplit> assignedSplits = new HashMap<String, IncrementalSplit>();
    private boolean startWithSnapshotMinimumOffset = true;
    private List<CatalogTable> checkpointTables;
    private Map<TableId, byte[]> historyTableChanges;

    public IncrementalSplitAssigner(SplitAssigner.Context<C> context, int incrementalParallelism, OffsetFactory offsetFactory) {
        this.context = context;
        this.incrementalParallelism = incrementalParallelism;
        this.offsetFactory = offsetFactory;
    }

    @Override
    public void open() {
    }

    @Override
    public Optional<SourceSplitBase> getNext() {
        if (!this.remainingSplits.isEmpty()) {
            Iterator<IncrementalSplit> iterator = this.remainingSplits.iterator();
            IncrementalSplit split = iterator.next();
            iterator.remove();
            this.assignedSplits.put(split.splitId(), split);
            return Optional.of(split);
        }
        if (this.splitAssigned) {
            return Optional.empty();
        }
        List<IncrementalSplit> incrementalSplits = this.createIncrementalSplits(this.startWithSnapshotMinimumOffset);
        this.remainingSplits.addAll(incrementalSplits);
        this.splitAssigned = true;
        return this.getNext();
    }

    public boolean noMoreSplits() {
        return this.getRemainingTables().isEmpty() && this.remainingSplits.isEmpty();
    }

    private Set<TableId> getRemainingTables() {
        HashSet<TableId> allTables = new HashSet<TableId>(this.context.getCapturedTables());
        this.assignedSplits.values().forEach(split -> split.getTableIds().forEach(allTables::remove));
        return allTables;
    }

    @Override
    public boolean waitingForCompletedSplits() {
        return false;
    }

    @Override
    public void onCompletedSplits(List<SnapshotSplitWatermark> completedSplitWatermarks) {
        completedSplitWatermarks.forEach(watermark -> this.context.getSplitCompletedOffsets().put(watermark.getSplitId(), (SnapshotSplitWatermark)watermark));
    }

    @Override
    public void addSplits(Collection<SourceSplitBase> splits) {
        splits.stream().map(SourceSplitBase::asIncrementalSplit).forEach(incrementalSplit -> {
            Offset startupOffset = incrementalSplit.getStartupOffset();
            List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = incrementalSplit.getCompletedSnapshotSplitInfos();
            for (CompletedSnapshotSplitInfo info : completedSnapshotSplitInfos) {
                if (!this.context.getCapturedTables().contains(info.getTableId())) continue;
                this.context.getSplitCompletedOffsets().put(info.getSplitId(), info.getWatermark());
                this.context.getAssignedSnapshotSplit().put(info.getSplitId(), info.asSnapshotSplit());
            }
            for (TableId tableId : incrementalSplit.getTableIds()) {
                if (!this.context.getCapturedTables().contains(tableId)) continue;
                this.tableWatermarks.put(tableId, startupOffset);
            }
            this.checkpointTables = incrementalSplit.getCheckpointTables();
            this.historyTableChanges = incrementalSplit.getHistoryTableChanges();
        });
        if (!this.tableWatermarks.isEmpty()) {
            this.startWithSnapshotMinimumOffset = false;
        }
    }

    @Override
    public IncrementalPhaseState snapshotState(long checkpointId) {
        return new IncrementalPhaseState();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    public List<IncrementalSplit> createIncrementalSplits(boolean startWithSnapshotMinimumOffset) {
        HashSet<TableId> allTables = new HashSet<TableId>(this.context.getCapturedTables());
        this.assignedSplits.values().forEach(split -> split.getTableIds().forEach(allTables::remove));
        List[] capturedTables = new List[this.incrementalParallelism];
        int i = 0;
        for (TableId tableId : allTables) {
            int index = i % this.incrementalParallelism;
            if (capturedTables[index] == null) {
                capturedTables[index] = new ArrayList();
            }
            capturedTables[index].add(tableId);
            ++i;
        }
        i = 0;
        ArrayList<IncrementalSplit> incrementalSplits = new ArrayList<IncrementalSplit>();
        for (List capturedTable : capturedTables) {
            incrementalSplits.add(this.createIncrementalSplit(capturedTable, i++, startWithSnapshotMinimumOffset));
        }
        return incrementalSplits;
    }

    private IncrementalSplit createIncrementalSplit(List<TableId> capturedTables, int index, boolean startWithSnapshotMinimumOffset) {
        C sourceConfig = this.context.getSourceConfig();
        List assignedSnapshotSplit = this.context.getAssignedSnapshotSplit().values().stream().filter(split -> capturedTables.contains(split.getTableId())).sorted(Comparator.comparing(SourceSplitBase::splitId)).collect(Collectors.toList());
        Map<String, SnapshotSplitWatermark> splitCompletedOffsets = this.context.getSplitCompletedOffsets();
        ArrayList<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = new ArrayList<CompletedSnapshotSplitInfo>();
        Offset minOffset = null;
        for (SnapshotSplit split2 : assignedSnapshotSplit) {
            SnapshotSplitWatermark splitWatermark = splitCompletedOffsets.get(split2.splitId());
            if (startWithSnapshotMinimumOffset) {
                Offset splitOffset;
                Offset offset = splitOffset = sourceConfig.isExactlyOnce() ? splitWatermark.getHighWatermark() : splitWatermark.getLowWatermark();
                if (minOffset == null || splitOffset.isBefore(minOffset)) {
                    minOffset = splitOffset;
                    LOG.debug("Find the min offset {} of change log in split {}", (Object)splitOffset, (Object)splitWatermark);
                }
            }
            completedSnapshotSplitInfos.add(new CompletedSnapshotSplitInfo(split2.splitId(), split2.getTableId(), split2.getSplitKeyType(), split2.getSplitStart(), split2.getSplitEnd(), splitWatermark));
        }
        for (TableId tableId : capturedTables) {
            Offset watermark = this.tableWatermarks.get(tableId);
            if (minOffset != null && (watermark == null || !watermark.isBefore(minOffset))) continue;
            minOffset = watermark;
            LOG.debug("Find the min offset {} of change log in table-watermarks {}", (Object)watermark, (Object)tableId);
        }
        Offset incrementalSplitStartOffset = minOffset != null ? minOffset : sourceConfig.getStartupConfig().getStartupOffset(this.offsetFactory);
        return new IncrementalSplit(String.format(INCREMENTAL_SPLIT_ID, index), capturedTables, incrementalSplitStartOffset, sourceConfig.getStopConfig().getStopOffset(this.offsetFactory), completedSnapshotSplitInfos, this.checkpointTables, this.historyTableChanges);
    }

    @VisibleForTesting
    void setSplitAssigned(boolean assigned) {
        this.splitAssigned = assigned;
    }

    public boolean completedSnapshotPhase(List<TableId> tableIds) {
        Preconditions.checkArgument((this.splitAssigned && this.noMoreSplits() ? 1 : 0) != 0);
        for (String splitKey : new ArrayList<String>(this.context.getAssignedSnapshotSplit().keySet())) {
            SnapshotSplit assignedSplit = this.context.getAssignedSnapshotSplit().get(splitKey);
            if (!tableIds.contains(assignedSplit.getTableId())) continue;
            this.context.getAssignedSnapshotSplit().remove(splitKey);
            this.context.getSplitCompletedOffsets().remove(assignedSplit.splitId());
        }
        return this.context.getAssignedSnapshotSplit().isEmpty() && this.context.getSplitCompletedOffsets().isEmpty();
    }

    public boolean waitingForAssignedSplits() {
        return !this.splitAssigned || !this.noMoreSplits();
    }
}

