/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;

public class StreamConfigChainer<OWNER> {
    public static final int MAIN_NODE_ID = 0;
    private final OWNER owner;
    private final StreamConfig headConfig;
    private final Map<Integer, StreamConfig> chainedConfigs = new HashMap<Integer, StreamConfig>();
    private final int numberOfNonChainedOutputs;
    private int bufferTimeout;
    private StreamConfig tailConfig;
    private int chainIndex = 0;
    private final List<List<NonChainedOutput>> outEdgesInOrder = new LinkedList<List<NonChainedOutput>>();
    private boolean setTailNonChainedOutputs = true;

    StreamConfigChainer(OperatorID headOperatorID, StreamConfig headConfig, OWNER owner, int numberOfNonChainedOutputs) {
        this.owner = Preconditions.checkNotNull(owner);
        this.headConfig = (StreamConfig)Preconditions.checkNotNull((Object)headConfig);
        this.tailConfig = (StreamConfig)Preconditions.checkNotNull((Object)headConfig);
        this.numberOfNonChainedOutputs = numberOfNonChainedOutputs;
        this.head(headOperatorID);
    }

    private void head(OperatorID headOperatorID) {
        this.headConfig.setOperatorID(headOperatorID);
        this.headConfig.setChainStart();
        this.headConfig.setChainIndex(this.chainIndex);
    }

    @Deprecated
    public <T> StreamConfigChainer<OWNER> chain(OperatorID operatorID, OneInputStreamOperator<T, T> operator, TypeSerializer<T> typeSerializer, boolean createKeyedStateBackend) {
        return this.chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend);
    }

    @Deprecated
    public <T> StreamConfigChainer<OWNER> chain(OneInputStreamOperator<T, T> operator, TypeSerializer<T> typeSerializer) {
        return this.chain(new OperatorID(), operator, typeSerializer);
    }

    @Deprecated
    public <T> StreamConfigChainer<OWNER> chain(OperatorID operatorID, OneInputStreamOperator<T, T> operator, TypeSerializer<T> typeSerializer) {
        return this.chain(operatorID, operator, typeSerializer, typeSerializer, false);
    }

    @Deprecated
    public <T> StreamConfigChainer<OWNER> chain(OneInputStreamOperatorFactory<T, T> operatorFactory, TypeSerializer<T> typeSerializer) {
        return this.chain(new OperatorID(), operatorFactory, typeSerializer);
    }

    @Deprecated
    public <T> StreamConfigChainer<OWNER> chain(OperatorID operatorID, OneInputStreamOperatorFactory<T, T> operatorFactory, TypeSerializer<T> typeSerializer) {
        return this.chain(operatorID, (StreamOperatorFactory)operatorFactory, (TypeSerializer)typeSerializer, (TypeSerializer)typeSerializer, false);
    }

    @Deprecated
    private <IN, OUT> StreamConfigChainer<OWNER> chain(OperatorID operatorID, OneInputStreamOperator<IN, OUT> operator, TypeSerializer<IN> inputSerializer, TypeSerializer<OUT> outputSerializer, boolean createKeyedStateBackend) {
        return this.chain(operatorID, (StreamOperatorFactory<OUT>)SimpleOperatorFactory.of(operator), inputSerializer, outputSerializer, createKeyedStateBackend);
    }

    @Deprecated
    public <IN, OUT> StreamConfigChainer<OWNER> chain(OperatorID operatorID, StreamOperatorFactory<OUT> operatorFactory, TypeSerializer<IN> inputSerializer, TypeSerializer<OUT> outputSerializer, boolean createKeyedStateBackend) {
        ++this.chainIndex;
        StreamEdge streamEdge = new StreamEdge(new StreamNode(Integer.valueOf(this.tailConfig.getChainIndex()), null, null, (StreamOperator)null, null, null), new StreamNode(Integer.valueOf(this.chainIndex), null, null, (StreamOperator)null, null, null), 0, null, null);
        streamEdge.setBufferTimeout((long)this.bufferTimeout);
        this.tailConfig.setChainedOutputs(Collections.singletonList(streamEdge));
        this.tailConfig = new StreamConfig(new Configuration());
        this.tailConfig.setStreamOperatorFactory((StreamOperatorFactory)Preconditions.checkNotNull(operatorFactory));
        this.tailConfig.setOperatorID((OperatorID)Preconditions.checkNotNull((Object)operatorID));
        this.tailConfig.setupNetworkInputs(new TypeSerializer[]{inputSerializer});
        this.tailConfig.setTypeSerializerOut(outputSerializer);
        if (createKeyedStateBackend) {
            this.tailConfig.setStateKeySerializer(inputSerializer);
            this.tailConfig.setStateBackendUsesManagedMemory(true);
            this.tailConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0);
        }
        this.tailConfig.setChainIndex(this.chainIndex);
        this.tailConfig.serializeAllConfigs();
        this.chainedConfigs.put(this.chainIndex, this.tailConfig);
        return this;
    }

    public <T> StreamConfigEdgeChainer<OWNER, T, T> chain(TypeSerializer<T> typeSerializer) {
        return this.chain(typeSerializer, typeSerializer);
    }

    public <IN, OUT> StreamConfigEdgeChainer<OWNER, IN, OUT> chain(TypeSerializer<IN> inputSerializer, TypeSerializer<OUT> outputSerializer) {
        return new StreamConfigEdgeChainer(this, inputSerializer, outputSerializer);
    }

    public OWNER finish() {
        if (this.setTailNonChainedOutputs) {
            ArrayList<NonChainedOutput> nonChainedOutputs = new ArrayList<NonChainedOutput>();
            for (int i = 0; i < this.numberOfNonChainedOutputs; ++i) {
                NonChainedOutput streamOutput = new NonChainedOutput(true, this.chainIndex, 1, 1, 100L, false, new IntermediateDataSetID(), null, (StreamPartitioner)new BroadcastPartitioner(), ResultPartitionType.PIPELINED_BOUNDED);
                nonChainedOutputs.add(streamOutput);
            }
            this.outEdgesInOrder.add(nonChainedOutputs);
            this.tailConfig.setNumberOfOutputs(this.numberOfNonChainedOutputs);
            this.tailConfig.setVertexNonChainedOutputs(nonChainedOutputs);
            this.tailConfig.setOperatorNonChainedOutputs(nonChainedOutputs);
        }
        Collections.reverse(this.outEdgesInOrder);
        List allOutEdgesInOrder = this.outEdgesInOrder.stream().flatMap(Collection::stream).collect(Collectors.toList());
        this.tailConfig.setChainEnd();
        this.chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
        this.headConfig.setAndSerializeTransitiveChainedTaskConfigs(this.chainedConfigs);
        this.headConfig.setVertexNonChainedOutputs(allOutEdgesInOrder);
        this.headConfig.serializeAllConfigs();
        return this.owner;
    }

    public <OUT> OWNER finishForSingletonOperatorChain(TypeSerializer<OUT> outputSerializer) {
        return this.finishForSingletonOperatorChain(outputSerializer, (StreamPartitioner<?>)new BroadcastPartitioner());
    }

    public <OUT> OWNER finishForSingletonOperatorChain(TypeSerializer<OUT> outputSerializer, StreamPartitioner<?> partitioner) {
        Preconditions.checkState((this.chainIndex == 0 ? 1 : 0) != 0, (Object)"Use finishForSingletonOperatorChain");
        Preconditions.checkState((this.headConfig == this.tailConfig ? 1 : 0) != 0);
        AbstractStreamOperator dummyOperator = new AbstractStreamOperator<OUT>(){
            private static final long serialVersionUID = 1L;
        };
        LinkedList<NonChainedOutput> streamOutputs = new LinkedList<NonChainedOutput>();
        StreamNode sourceVertexDummy = new StreamNode(Integer.valueOf(0), "group", null, (StreamOperator)dummyOperator, "source dummy", SourceStreamTask.class);
        for (int i = 0; i < this.numberOfNonChainedOutputs; ++i) {
            streamOutputs.add(new NonChainedOutput(true, sourceVertexDummy.getId(), 1, 1, 100L, false, new IntermediateDataSetID(), null, partitioner, ResultPartitionType.PIPELINED_BOUNDED));
        }
        this.headConfig.setVertexID(Integer.valueOf(0));
        this.headConfig.setNumberOfOutputs(1);
        this.headConfig.setVertexNonChainedOutputs(streamOutputs);
        this.headConfig.setOperatorNonChainedOutputs(streamOutputs);
        this.chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
        this.headConfig.setAndSerializeTransitiveChainedTaskConfigs(this.chainedConfigs);
        this.headConfig.setVertexNonChainedOutputs(streamOutputs);
        this.headConfig.setTypeSerializerOut(outputSerializer);
        this.headConfig.serializeAllConfigs();
        return this.owner;
    }

    public StreamConfigChainer<OWNER> name(String name) {
        this.tailConfig.setOperatorName(name);
        return this;
    }

    public void setBufferTimeout(int bufferTimeout) {
        this.bufferTimeout = bufferTimeout;
    }

    public static class StreamConfigEdgeChainer<OWNER, IN, OUT> {
        private final OutputTag<Void> placeHolderTag;
        private StreamConfigChainer<OWNER> parent;
        private OperatorID operatorID;
        private final TypeSerializer<IN> inputSerializer;
        private final TypeSerializer<OUT> outputSerializer;
        private StreamOperatorFactory<OUT> operatorFactory;
        private Map<OutputTag<?>, Integer> nonChainedOutPuts;
        private boolean createKeyedStateBackend;

        private StreamConfigEdgeChainer(StreamConfigChainer<OWNER> parent, TypeSerializer<IN> inputSerializer, TypeSerializer<OUT> outputSerializer) {
            this.parent = parent;
            this.parent.setTailNonChainedOutputs = true;
            this.inputSerializer = inputSerializer;
            this.outputSerializer = outputSerializer;
            this.placeHolderTag = new OutputTag("FLINK_PLACEHOLDER", (TypeInformation)BasicTypeInfo.VOID_TYPE_INFO);
            this.nonChainedOutPuts = new HashMap(4);
        }

        public StreamConfigEdgeChainer<OWNER, IN, OUT> setOperatorID(OperatorID operatorID) {
            this.operatorID = operatorID;
            return this;
        }

        public StreamConfigEdgeChainer<OWNER, IN, OUT> setOperatorFactory(StreamOperatorFactory operatorFactory) {
            this.operatorFactory = operatorFactory;
            return this;
        }

        public StreamConfigEdgeChainer<OWNER, IN, OUT> addNonChainedOutputsCount(int nonChainedOutputsCount) {
            return this.addNonChainedOutputsCount(this.placeHolderTag, nonChainedOutputsCount);
        }

        public StreamConfigEdgeChainer<OWNER, IN, OUT> addNonChainedOutputsCount(OutputTag<?> outputTag, int nonChainedOutputsCount) {
            Preconditions.checkArgument((nonChainedOutputsCount >= 0 && outputTag != null ? 1 : 0) != 0);
            this.nonChainedOutPuts.put(outputTag, nonChainedOutputsCount);
            return this;
        }

        public StreamConfigEdgeChainer<OWNER, IN, OUT> setCreateKeyedStateBackend(boolean createKeyedStateBackend) {
            this.createKeyedStateBackend = createKeyedStateBackend;
            return this;
        }

        public StreamConfigChainer<OWNER> build() {
            ++this.parent.chainIndex;
            StreamEdge streamEdge = new StreamEdge(new StreamNode(Integer.valueOf(this.parent.tailConfig.getChainIndex()), null, null, (StreamOperator)null, null, null), new StreamNode(Integer.valueOf(this.parent.chainIndex), null, null, (StreamOperator)null, null, null), 0, null, null);
            streamEdge.setBufferTimeout((long)this.parent.bufferTimeout);
            this.parent.tailConfig.setChainedOutputs(Collections.singletonList(streamEdge));
            this.parent.tailConfig = new StreamConfig(new Configuration());
            this.parent.tailConfig.setStreamOperatorFactory((StreamOperatorFactory)Preconditions.checkNotNull(this.operatorFactory));
            this.parent.tailConfig.setOperatorID(this.operatorID == null ? new OperatorID() : this.operatorID);
            this.parent.tailConfig.setupNetworkInputs(new TypeSerializer[]{this.inputSerializer});
            this.parent.tailConfig.setTypeSerializerOut(this.outputSerializer);
            if (this.createKeyedStateBackend) {
                this.parent.tailConfig.setStateKeySerializer(this.inputSerializer);
                this.parent.tailConfig.setStateBackendUsesManagedMemory(true);
                this.parent.tailConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0);
            }
            if (!this.nonChainedOutPuts.isEmpty()) {
                List<NonChainedOutput> nonChainedOutputs = this.createNonChainedOutputs(this.nonChainedOutPuts, streamEdge);
                this.parent.tailConfig.setVertexNonChainedOutputs(nonChainedOutputs);
                this.parent.tailConfig.setOperatorNonChainedOutputs(nonChainedOutputs);
                this.parent.chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
                this.parent.tailConfig.setNumberOfOutputs(nonChainedOutputs.size());
                this.parent.outEdgesInOrder.add(nonChainedOutputs);
                this.parent.setTailNonChainedOutputs = false;
            }
            this.parent.tailConfig.setChainIndex(this.parent.chainIndex);
            this.parent.tailConfig.serializeAllConfigs();
            this.parent.chainedConfigs.put(this.parent.chainIndex, this.parent.tailConfig);
            return this.parent;
        }

        private List<NonChainedOutput> createNonChainedOutputs(Map<OutputTag<?>, Integer> nonChainedOutputsCount, StreamEdge streamEdge) {
            ArrayList<NonChainedOutput> nonChainedOutputs = new ArrayList<NonChainedOutput>();
            nonChainedOutputsCount.forEach((outputTag, value) -> {
                for (int i = 0; i < value; ++i) {
                    nonChainedOutputs.add(new NonChainedOutput(true, streamEdge.getTargetId(), 1, 1, 100L, false, new IntermediateDataSetID(), this.placeHolderTag.equals(outputTag) ? null : outputTag, (StreamPartitioner)new BroadcastPartitioner(), ResultPartitionType.PIPELINED_BOUNDED));
                    if (this.placeHolderTag.equals(outputTag)) continue;
                    this.parent.tailConfig.setTypeSerializerSideOut(outputTag, this.outputSerializer);
                }
            });
            return nonChainedOutputs;
        }
    }
}

