/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessorFactory;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
public class MultipleInputStreamTask<OUT>
extends StreamTask<OUT, MultipleInputStreamOperator<OUT>> {
    private static final int MAX_TRACKED_CHECKPOINTS = 100000;
    private final HashMap<Long, CompletableFuture<Boolean>> pendingCheckpointCompletedFutures = new HashMap();
    @Nullable
    private CheckpointBarrierHandler checkpointBarrierHandler;

    public MultipleInputStreamTask(Environment env) throws Exception {
        super(env);
    }

    @Override
    public void init() throws Exception {
        int i;
        StreamConfig configuration = this.getConfiguration();
        ClassLoader userClassLoader = this.getUserCodeClassLoader();
        StreamConfig.InputConfig[] inputs = configuration.getInputs(userClassLoader);
        WatermarkGauge[] watermarkGauges = new WatermarkGauge[inputs.length];
        for (int i2 = 0; i2 < inputs.length; ++i2) {
            watermarkGauges[i2] = new WatermarkGauge();
            ((MultipleInputStreamOperator)this.mainOperator).getMetricGroup().gauge(MetricNames.currentInputWatermarkName((int)(i2 + 1)), (Gauge)watermarkGauges[i2]);
        }
        MinWatermarkGauge minInputWatermarkGauge = new MinWatermarkGauge(watermarkGauges);
        ((MultipleInputStreamOperator)this.mainOperator).getMetricGroup().gauge("currentInputWatermark", (Gauge)minInputWatermarkGauge);
        List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
        int numberOfNetworkInputs = configuration.getNumberOfNetworkInputs();
        ArrayList[] inputLists = new ArrayList[inputs.length];
        for (i = 0; i < inputLists.length; ++i) {
            inputLists[i] = new ArrayList();
        }
        for (i = 0; i < numberOfNetworkInputs; ++i) {
            int inputType = inEdges.get(i).getTypeNumber();
            IndexedInputGate reader = this.getEnvironment().getInputGate(i);
            inputLists[inputType - 1].add(reader);
        }
        ArrayList<ArrayList> networkInputLists = new ArrayList<ArrayList>();
        for (ArrayList inputList : inputLists) {
            if (inputList.isEmpty()) continue;
            networkInputLists.add(inputList);
        }
        this.createInputProcessor(networkInputLists.toArray(new ArrayList[0]), inputs, watermarkGauges, index -> ((StreamEdge)inEdges.get((int)index)).getPartitioner());
        this.getEnvironment().getMetricGroup().gauge("currentInputWatermark", minInputWatermarkGauge::getValue);
    }

    protected void createInputProcessor(List<IndexedInputGate>[] inputGates, StreamConfig.InputConfig[] inputs, WatermarkGauge[] inputWatermarkGauges, Function<Integer, StreamPartitioner<?>> gatePartitioners) {
        this.checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(this, this.getConfiguration(), this.getCheckpointCoordinator(), this.getTaskNameWithSubtaskAndId(), inputGates, this.operatorChain.getSourceTaskInputs(), this.mainMailboxExecutor, this.timerService);
        CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedMultipleInputGate(this.mainMailboxExecutor, inputGates, this.getEnvironment().getMetricGroup().getIOMetricGroup(), this.checkpointBarrierHandler, this.configuration);
        this.inputProcessor = StreamMultipleInputProcessorFactory.create(this, checkpointedInputGates, inputs, this.getEnvironment().getIOManager(), this.getEnvironment().getMemoryManager(), this.getEnvironment().getMetricGroup().getIOMetricGroup(), this.setupNumRecordsInCounter(this.mainOperator), this.getStreamStatusMaintainer(), (MultipleInputStreamOperator)this.mainOperator, inputWatermarkGauges, this.getConfiguration(), this.getTaskConfiguration(), this.getJobConfiguration(), this.getExecutionConfig(), this.getUserCodeClassLoader(), this.operatorChain, this.getEnvironment().getTaskStateManager().getInputRescalingDescriptor(), gatePartitioners, this.getEnvironment().getTaskInfo());
    }

    @Override
    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData metadata, CheckpointOptions options) {
        CompletableFuture<Boolean> resultFuture = new CompletableFuture<Boolean>();
        this.mainMailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
            try {
                this.pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
                this.checkPendingCheckpointCompletedFuturesSize();
                this.triggerSourcesCheckpoint(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options));
            }
            catch (Exception ex) {
                this.pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
                resultFuture.completeExceptionally(ex);
                throw ex;
            }
        }), "checkpoint %s with %s", metadata, options);
        return resultFuture;
    }

    private void checkPendingCheckpointCompletedFuturesSize() {
        if (this.pendingCheckpointCompletedFutures.size() > 100000) {
            ArrayList<Long> pendingCheckpointIds = new ArrayList<Long>(this.pendingCheckpointCompletedFutures.keySet());
            pendingCheckpointIds.sort(Long::compareTo);
            for (Long checkpointId : pendingCheckpointIds.subList(0, pendingCheckpointIds.size() - 100000)) {
                this.pendingCheckpointCompletedFutures.remove(checkpointId).completeExceptionally(new IllegalStateException("Too many pending checkpoints"));
            }
        }
    }

    private void triggerSourcesCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
        for (StreamTaskSourceInput<?> sourceInput : this.operatorChain.getSourceTaskInputs()) {
            for (InputChannelInfo channelInfo : sourceInput.getChannelInfos()) {
                this.checkpointBarrierHandler.processBarrier(checkpointBarrier, channelInfo);
            }
        }
    }

    @Override
    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) throws IOException {
        CompletableFuture<Boolean> resultFuture = this.pendingCheckpointCompletedFutures.remove(checkpointMetaData.getCheckpointId());
        try {
            super.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);
            if (resultFuture != null) {
                resultFuture.complete(true);
            }
        }
        catch (IOException ex) {
            if (resultFuture != null) {
                resultFuture.completeExceptionally(ex);
            }
            throw ex;
        }
    }

    @Override
    public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) throws IOException {
        CompletableFuture<Boolean> resultFuture = this.pendingCheckpointCompletedFutures.remove(checkpointId);
        if (resultFuture != null) {
            resultFuture.completeExceptionally(cause);
        }
        super.abortCheckpointOnBarrier(checkpointId, cause);
    }

    @Override
    protected void advanceToEndOfEventTime() throws Exception {
        for (Output<StreamRecord<?>> sourceOutput : this.operatorChain.getChainedSourceOutputs()) {
            sourceOutput.emitWatermark(Watermark.MAX_WATERMARK);
        }
    }
}

