/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.attribute.Attribute;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobStatusHook;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.FunctionMasterCheckpointHookFactory;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.lineage.LineageGraph;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamGraph
implements Pipeline,
ExecutionPlan {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    private long initialClientHeartbeatTimeout;
    private String jobName;
    private JobID jobId;
    private final Configuration jobConfiguration;
    private transient ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
    private GlobalStreamExchangeMode globalExchangeMode;
    private boolean enableCheckpointsAfterTasksFinish;
    private boolean allVerticesInSameSlotSharingGroupByDefault = true;
    private transient Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Set<Integer> sinks;
    private transient Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
    private transient Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes;
    protected Map<Integer, String> vertexIDtoBrokerID;
    protected Map<Integer, Long> vertexIDtoLoopTimeout;
    private transient StateBackend stateBackend;
    private transient CheckpointStorage checkpointStorage;
    private InternalTimeServiceManager.Provider timerServiceProvider;
    private transient LineageGraph lineageGraph;
    private JobType jobType = JobType.STREAMING;
    private Map<String, ResourceProfile> slotSharingGroupResources;
    private PipelineOptions.VertexDescriptionMode descriptionMode = PipelineOptions.VertexDescriptionMode.TREE;
    private boolean vertexNameIncludeIndexPrefix = false;
    private final List<JobStatusHook> jobStatusHooks = new ArrayList<JobStatusHook>();
    private boolean dynamic;
    private boolean autoParallelismEnabled;
    private final Map<StreamNode, StreamOperatorFactory<?>> nodeToHeadOperatorCache = new HashMap();
    private JobCheckpointingSettings checkpointingSettings;
    private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<PermanentBlobKey>();
    private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<String, DistributedCache.DistributedCacheEntry>();
    private List<URL> classpath = Collections.emptyList();
    private final List<Path> userJars = new ArrayList<Path>();
    private boolean isEmpty;
    private UserDefinedObjectsHolder userDefinedObjectsHolder;
    private final Map<Integer, ResourceSpec> streamNodeMinResources = new HashMap<Integer, ResourceSpec>();
    private byte[] serializedWatermarkDeclarations;

    public StreamGraph(Configuration jobConfiguration, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, SavepointRestoreSettings savepointRestoreSettings) {
        this.jobConfiguration = new Configuration(Preconditions.checkNotNull(jobConfiguration));
        this.executionConfig = Preconditions.checkNotNull(executionConfig);
        this.checkpointConfig = Preconditions.checkNotNull(checkpointConfig);
        this.savepointRestoreSettings = Preconditions.checkNotNull(savepointRestoreSettings);
        this.jobId = new JobID();
        this.jobName = "(unnamed job)";
        this.clear();
    }

    public void clear() {
        this.streamNodes = new HashMap<Integer, StreamNode>();
        this.virtualSideOutputNodes = new HashMap<Integer, Tuple2<Integer, OutputTag>>();
        this.virtualPartitionNodes = new HashMap();
        this.vertexIDtoBrokerID = new HashMap<Integer, String>();
        this.vertexIDtoLoopTimeout = new HashMap<Integer, Long>();
        this.sources = new HashSet<Integer>();
        this.sinks = new HashSet<Integer>();
        this.slotSharingGroupResources = new HashMap<String, ResourceProfile>();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    @Override
    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public CheckpointConfig getCheckpointConfig() {
        return this.checkpointConfig;
    }

    void cacheHeadOperatorForNode(StreamNode node, StreamOperatorFactory<?> headOperator) {
        this.nodeToHeadOperatorCache.put(node, headOperator);
    }

    StreamOperatorFactory<?> getHeadOperatorForNodeFromCache(StreamNode node) {
        return this.nodeToHeadOperatorCache.get(node);
    }

    public CheckpointingMode getCheckpointingMode() {
        return StreamGraph.getCheckpointingMode(this.checkpointConfig);
    }

    public static CheckpointingMode getCheckpointingMode(CheckpointConfig checkpointConfig) {
        CheckpointingMode checkpointingMode = checkpointConfig.getCheckpointingConsistencyMode();
        Preconditions.checkArgument(checkpointingMode == CheckpointingMode.EXACTLY_ONCE || checkpointingMode == CheckpointingMode.AT_LEAST_ONCE, "Unexpected checkpointing mode.");
        if (checkpointConfig.isCheckpointingEnabled()) {
            return checkpointingMode;
        }
        return CheckpointingMode.AT_LEAST_ONCE;
    }

    public void addJar(Path jar) {
        if (jar == null) {
            throw new IllegalArgumentException();
        }
        if (!this.userJars.contains(jar)) {
            this.userJars.add(jar);
        }
    }

    @Override
    public List<Path> getUserJars() {
        return this.userJars;
    }

    public void createJobCheckpointingSettings() {
        this.checkpointingSettings = this.createJobCheckpointingSettingsInternal();
    }

    private JobCheckpointingSettings createJobCheckpointingSettingsInternal() {
        SerializedValue<CheckpointStorage> serializedCheckpointStorage;
        SerializedValue<StateBackend> serializedStateBackend;
        SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
        CheckpointRetentionPolicy retentionAfterTermination;
        CheckpointConfig cfg = this.getCheckpointConfig();
        long interval = cfg.getCheckpointInterval();
        if (interval < 10L) {
            interval = Long.MAX_VALUE;
        }
        if (cfg.isExternalizedCheckpointsEnabled()) {
            ExternalizedCheckpointRetention cleanup = cfg.getExternalizedCheckpointRetention();
            if (cleanup == null) {
                throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            retentionAfterTermination = cleanup.deleteOnCancellation() ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }
        ArrayList<FunctionMasterCheckpointHookFactory> hooks = new ArrayList<FunctionMasterCheckpointHookFactory>();
        for (StreamNode node : this.getStreamNodes()) {
            Function f;
            if (node.getOperatorFactory() == null || !(node.getOperatorFactory() instanceof UdfStreamOperatorFactory) || !((f = ((UdfStreamOperatorFactory)node.getOperatorFactory()).getUserFunction()) instanceof WithMasterCheckpointHook)) continue;
            hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook)((Object)f)));
        }
        if (hooks.isEmpty()) {
            serializedHooks = null;
        } else {
            try {
                MasterTriggerRestoreHook.Factory[] asArray = hooks.toArray(new MasterTriggerRestoreHook.Factory[0]);
                serializedHooks = new SerializedValue<MasterTriggerRestoreHook.Factory[]>(asArray);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
            }
        }
        if (this.stateBackend == null) {
            serializedStateBackend = null;
        } else {
            try {
                serializedStateBackend = new SerializedValue<StateBackend>(this.stateBackend);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("State backend is not serializable", e);
            }
        }
        if (this.checkpointStorage == null) {
            serializedCheckpointStorage = null;
        } else {
            try {
                serializedCheckpointStorage = new SerializedValue<CheckpointStorage>(this.checkpointStorage);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Checkpoint storage is not serializable", e);
            }
        }
        return new JobCheckpointingSettings(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(interval).setCheckpointIntervalDuringBacklog(cfg.getCheckpointIntervalDuringBacklog()).setCheckpointTimeout(cfg.getCheckpointTimeout()).setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints()).setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints()).setCheckpointRetentionPolicy(retentionAfterTermination).setExactlyOnce(this.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE).setTolerableCheckpointFailureNumber(cfg.getTolerableCheckpointFailureNumber()).setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled()).setCheckpointIdOfIgnoredInFlightData(cfg.getCheckpointIdOfIgnoredInFlightData()).setAlignedCheckpointTimeout(cfg.getAlignedCheckpointTimeout().toMillis()).setEnableCheckpointsAfterTasksFinish(this.isEnableCheckpointsAfterTasksFinish()).build(), serializedStateBackend, this.getJobConfiguration().getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG).map(TernaryBoolean::fromBoolean).orElse(TernaryBoolean.UNDEFINED), serializedCheckpointStorage, serializedHooks, Optional.ofNullable(this.stateBackend).map(StateBackend::useManagedMemory).map(TernaryBoolean::fromBoolean).orElse(TernaryBoolean.UNDEFINED));
    }

    @Override
    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    @Override
    public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
        Preconditions.checkNotNull(this.userDefinedObjectsHolder);
        return this.userDefinedObjectsHolder.serializedExecutionConfig;
    }

    @Override
    public SavepointRestoreSettings getSavepointRestoreSettings() {
        return this.savepointRestoreSettings;
    }

    public String getJobName() {
        return this.jobName;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public LineageGraph getLineageGraph() {
        return this.lineageGraph;
    }

    public void setLineageGraph(LineageGraph lineageGraph) {
        this.lineageGraph = lineageGraph;
    }

    public void setStateBackend(StateBackend backend) {
        this.stateBackend = backend;
    }

    @VisibleForTesting
    public StateBackend getStateBackend() {
        return this.stateBackend;
    }

    public void setCheckpointStorage(CheckpointStorage checkpointStorage) {
        this.checkpointStorage = checkpointStorage;
    }

    public InternalTimeServiceManager.Provider getTimerServiceProvider() {
        return this.timerServiceProvider;
    }

    public void setTimerServiceProvider(InternalTimeServiceManager.Provider timerServiceProvider) {
        this.timerServiceProvider = Preconditions.checkNotNull(timerServiceProvider);
    }

    public GlobalStreamExchangeMode getGlobalStreamExchangeMode() {
        return this.globalExchangeMode;
    }

    public void setGlobalStreamExchangeMode(GlobalStreamExchangeMode globalExchangeMode) {
        this.globalExchangeMode = globalExchangeMode;
    }

    public void setSlotSharingGroupResource(Map<String, ResourceProfile> slotSharingGroupResources) {
        this.slotSharingGroupResources.putAll(slotSharingGroupResources);
    }

    public Optional<ResourceProfile> getSlotSharingGroupResource(String groupId) {
        return Optional.ofNullable(this.slotSharingGroupResources.get(groupId));
    }

    public boolean hasFineGrainedResource() {
        return this.slotSharingGroupResources.values().stream().anyMatch(resourceProfile -> !resourceProfile.equals(ResourceProfile.UNKNOWN));
    }

    public void setAllVerticesInSameSlotSharingGroupByDefault(boolean allVerticesInSameSlotSharingGroupByDefault) {
        this.allVerticesInSameSlotSharingGroupByDefault = allVerticesInSameSlotSharingGroupByDefault;
    }

    public boolean isAllVerticesInSameSlotSharingGroupByDefault() {
        return this.allVerticesInSameSlotSharingGroupByDefault;
    }

    public boolean isEnableCheckpointsAfterTasksFinish() {
        return this.enableCheckpointsAfterTasksFinish;
    }

    public void setEnableCheckpointsAfterTasksFinish(boolean enableCheckpointsAfterTasksFinish) {
        this.enableCheckpointsAfterTasksFinish = enableCheckpointsAfterTasksFinish;
    }

    public boolean isChainingEnabled() {
        return this.jobConfiguration.get(PipelineOptions.OPERATOR_CHAINING);
    }

    public boolean isChainingOfOperatorsWithDifferentMaxParallelismEnabled() {
        return this.jobConfiguration.get(PipelineOptions.OPERATOR_CHAINING_CHAIN_OPERATORS_WITH_DIFFERENT_MAX_PARALLELISM);
    }

    public boolean isIterative() {
        return !this.vertexIDtoLoopTimeout.isEmpty();
    }

    public <IN, OUT> void addSource(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, SourceOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        this.addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName, SourceOperatorStreamTask.class);
        this.sources.add(vertexID);
    }

    public <IN, OUT> void addLegacySource(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        this.addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
        this.sources.add(vertexID);
    }

    public <IN, OUT> void addSink(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        this.addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
        if (operatorFactory instanceof OutputFormatOperatorFactory) {
            this.setOutputFormat(vertexID, ((OutputFormatOperatorFactory)operatorFactory).getOutputFormat());
        }
        this.sinks.add(vertexID);
    }

    public <IN, OUT> void addOperator(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        Class invokableClass = operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
        this.addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName, invokableClass);
    }

    private <IN, OUT> void addOperator(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName, Class<? extends TaskInvokable> invokableClass) {
        this.addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
        this.setSerializers(vertexID, this.createSerializer(inTypeInfo), null, this.createSerializer(outTypeInfo));
        if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
            operatorFactory.setOutputType(outTypeInfo, this.executionConfig);
        }
        if (operatorFactory.isInputTypeConfigurable()) {
            operatorFactory.setInputType(inTypeInfo, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", (Object)vertexID);
        }
    }

    public <IN1, IN2, OUT> void addCoOperator(Integer vertexID, String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> taskOperatorFactory, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        Class<TwoInputStreamTask> vertexClass = TwoInputStreamTask.class;
        this.addNode(vertexID, slotSharingGroup, coLocationGroup, vertexClass, taskOperatorFactory, operatorName);
        TypeSerializer<OUT> outSerializer = this.createSerializer(outTypeInfo);
        this.setSerializers(vertexID, in1TypeInfo.createSerializer(this.executionConfig.getSerializerConfig()), in2TypeInfo.createSerializer(this.executionConfig.getSerializerConfig()), outSerializer);
        if (taskOperatorFactory.isOutputTypeConfigurable()) {
            taskOperatorFactory.setOutputType(outTypeInfo, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", (Object)vertexID);
        }
    }

    public <OUT> void addMultipleInputOperator(Integer vertexID, String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, List<TypeInformation<?>> inTypeInfos, TypeInformation<OUT> outTypeInfo, String operatorName) {
        Class<MultipleInputStreamTask> vertexClass = MultipleInputStreamTask.class;
        this.addNode(vertexID, slotSharingGroup, coLocationGroup, vertexClass, operatorFactory, operatorName);
        this.setSerializers(vertexID, inTypeInfos, this.createSerializer(outTypeInfo));
        if (operatorFactory.isOutputTypeConfigurable()) {
            operatorFactory.setOutputType(outTypeInfo, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", (Object)vertexID);
        }
    }

    protected StreamNode addNode(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, Class<? extends TaskInvokable> vertexClass, @Nullable StreamOperatorFactory<?> operatorFactory, String operatorName) {
        if (this.streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }
        StreamNode vertex = new StreamNode(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, vertexClass);
        this.streamNodes.put(vertexID, vertex);
        this.isEmpty = false;
        return vertex;
    }

    public void addVirtualSideOutputNode(Integer originalId, Integer virtualId, OutputTag outputTag) {
        if (this.virtualSideOutputNodes.containsKey(virtualId)) {
            throw new IllegalStateException("Already has virtual output node with id " + virtualId);
        }
        for (Tuple2<Integer, OutputTag> tag : this.virtualSideOutputNodes.values()) {
            if (!((Integer)tag.f0).equals(originalId) || !((OutputTag)tag.f1).getId().equals(outputTag.getId()) || ((OutputTag)tag.f1).getTypeInfo().equals(outputTag.getTypeInfo())) continue;
            throw new IllegalArgumentException("Trying to add a side output for the same side-output id with a different type. This is not allowed. Side-output ID: " + ((OutputTag)tag.f1).getId());
        }
        this.virtualSideOutputNodes.put(virtualId, new Tuple2<Integer, OutputTag>(originalId, outputTag));
    }

    public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner, StreamExchangeMode exchangeMode) {
        if (this.virtualPartitionNodes.containsKey(virtualId)) {
            throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
        }
        this.virtualPartitionNodes.put(virtualId, new Tuple3(originalId, partitioner, exchangeMode));
    }

    public String getSlotSharingGroup(Integer id) {
        if (this.virtualSideOutputNodes.containsKey(id)) {
            Integer mappedId = (Integer)this.virtualSideOutputNodes.get((Object)id).f0;
            return this.getSlotSharingGroup(mappedId);
        }
        if (this.virtualPartitionNodes.containsKey(id)) {
            Integer mappedId = (Integer)this.virtualPartitionNodes.get((Object)id).f0;
            return this.getSlotSharingGroup(mappedId);
        }
        StreamNode node = this.getStreamNode(id);
        return node.getSlotSharingGroup();
    }

    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        this.addEdge(upStreamVertexID, downStreamVertexID, typeNumber, null);
    }

    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, IntermediateDataSetID intermediateDataSetId) {
        this.addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>(), null, null, intermediateDataSetId);
    }

    private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, StreamExchangeMode exchangeMode, IntermediateDataSetID intermediateDataSetId) {
        if (this.virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = (Integer)this.virtualSideOutputNodes.get((Object)Integer.valueOf((int)virtualId)).f0;
            if (outputTag == null) {
                outputTag = (OutputTag)this.virtualSideOutputNodes.get((Object)Integer.valueOf((int)virtualId)).f1;
            }
            this.addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, exchangeMode, intermediateDataSetId);
        } else if (this.virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = (Integer)this.virtualPartitionNodes.get((Object)Integer.valueOf((int)virtualId)).f0;
            if (partitioner == null) {
                partitioner = (StreamPartitioner)this.virtualPartitionNodes.get((Object)Integer.valueOf((int)virtualId)).f1;
            }
            exchangeMode = (StreamExchangeMode)((Object)this.virtualPartitionNodes.get((Object)Integer.valueOf((int)virtualId)).f2);
            this.addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, exchangeMode, intermediateDataSetId);
        } else {
            this.createActualEdge(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputTag, exchangeMode, intermediateDataSetId);
        }
    }

    private void createActualEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, OutputTag outputTag, StreamExchangeMode exchangeMode, IntermediateDataSetID intermediateDataSetId) {
        StreamNode upstreamNode = this.getStreamNode(upStreamVertexID);
        StreamNode downstreamNode = this.getStreamNode(downStreamVertexID);
        if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
            partitioner = this.dynamic ? new ForwardForUnspecifiedPartitioner() : new ForwardPartitioner();
        } else if (partitioner == null) {
            partitioner = new RebalancePartitioner();
        }
        if (partitioner instanceof ForwardPartitioner && upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
            if (partitioner instanceof ForwardForConsecutiveHashPartitioner) {
                partitioner = ((ForwardForConsecutiveHashPartitioner)partitioner).getHashPartitioner();
            } else {
                throw new UnsupportedOperationException("Forward partitioning does not allow change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
        }
        if (exchangeMode == null) {
            exchangeMode = StreamExchangeMode.UNDEFINED;
        }
        int uniqueId = this.getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, partitioner, outputTag, exchangeMode, uniqueId, intermediateDataSetId);
        this.getStreamNode(edge.getSourceId()).addOutEdge(edge);
        this.getStreamNode(edge.getTargetId()).addInEdge(edge);
    }

    public void setParallelism(Integer vertexID, int parallelism) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setParallelism(parallelism);
        }
    }

    @Override
    public boolean isDynamic() {
        return this.dynamic;
    }

    @Override
    public JobCheckpointingSettings getCheckpointingSettings() {
        if (this.checkpointingSettings == null) {
            this.createJobCheckpointingSettings();
        }
        return this.checkpointingSettings;
    }

    @Override
    public boolean isEmpty() {
        return this.streamNodes == null ? this.isEmpty : this.streamNodes.isEmpty();
    }

    public void setParallelism(Integer vertexId, int parallelism, boolean parallelismConfigured) {
        if (this.getStreamNode(vertexId) != null) {
            this.getStreamNode(vertexId).setParallelism(parallelism, parallelismConfigured);
        }
    }

    public void setDynamic(boolean dynamic) {
        this.dynamic = dynamic;
    }

    public void setMaxParallelism(int vertexID, int maxParallelism) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setMaxParallelism(maxParallelism);
        }
    }

    public void setResources(int vertexID, ResourceSpec minResources, ResourceSpec preferredResources) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setResources(minResources, preferredResources);
            this.streamNodeMinResources.put(vertexID, minResources);
        }
    }

    public void setManagedMemoryUseCaseWeights(int vertexID, Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights, Set<ManagedMemoryUseCase> slotScopeUseCases) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setManagedMemoryUseCaseWeights(operatorScopeUseCaseWeights, slotScopeUseCases);
        }
    }

    public void setOneInputStateKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
        StreamNode node = this.getStreamNode(vertexID);
        node.setStatePartitioners(keySelector);
        node.setStateKeySerializer(keySerializer);
    }

    public void setTwoInputStateKey(Integer vertexID, KeySelector<?, ?> keySelector1, KeySelector<?, ?> keySelector2, TypeSerializer<?> keySerializer) {
        StreamNode node = this.getStreamNode(vertexID);
        node.setStatePartitioners(keySelector1, keySelector2);
        node.setStateKeySerializer(keySerializer);
    }

    public void setMultipleInputStateKey(Integer vertexID, List<KeySelector<?, ?>> keySelectors, TypeSerializer<?> keySerializer) {
        StreamNode node = this.getStreamNode(vertexID);
        node.setStatePartitioners((KeySelector[])keySelectors.stream().toArray(KeySelector[]::new));
        node.setStateKeySerializer(keySerializer);
    }

    public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
        }
    }

    public void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
        StreamNode vertex = this.getStreamNode(vertexID);
        vertex.setSerializersIn(in1, in2);
        vertex.setSerializerOut(out);
    }

    private <OUT> void setSerializers(Integer vertexID, List<TypeInformation<?>> inTypeInfos, TypeSerializer<OUT> out) {
        StreamNode vertex = this.getStreamNode(vertexID);
        vertex.setSerializersIn((TypeSerializer[])inTypeInfos.stream().map(typeInfo -> typeInfo.createSerializer(this.executionConfig.getSerializerConfig())).toArray(TypeSerializer[]::new));
        vertex.setSerializerOut(out);
    }

    public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
        this.getStreamNode(vertexID).setInputFormat(inputFormat);
    }

    public void setOutputFormat(Integer vertexID, OutputFormat<?> outputFormat) {
        this.getStreamNode(vertexID).setOutputFormat(outputFormat);
    }

    public void setTransformationUID(Integer nodeId, String transformationId) {
        StreamNode node = this.streamNodes.get(nodeId);
        if (node != null) {
            node.setTransformationUID(transformationId);
        }
    }

    void setTransformationUserHash(Integer nodeId, String nodeHash) {
        StreamNode node = this.streamNodes.get(nodeId);
        if (node != null) {
            node.setUserHash(nodeHash);
        }
    }

    public StreamNode getStreamNode(Integer vertexID) {
        return this.streamNodes.get(vertexID);
    }

    protected Collection<? extends Integer> getVertexIDs() {
        return this.streamNodes.keySet();
    }

    @VisibleForTesting
    public List<StreamEdge> getStreamEdges(int sourceId) {
        return this.getStreamNode(sourceId).getOutEdges();
    }

    public List<StreamEdge> getStreamEdges(int sourceId, int targetId) {
        ArrayList<StreamEdge> result = new ArrayList<StreamEdge>();
        for (StreamEdge edge : this.getStreamNode(sourceId).getOutEdges()) {
            if (edge.getTargetId() != targetId) continue;
            result.add(edge);
        }
        return result;
    }

    @Deprecated
    @VisibleForTesting
    public List<StreamEdge> getStreamEdgesOrThrow(int sourceId, int targetId) {
        List<StreamEdge> result = this.getStreamEdges(sourceId, targetId);
        if (result.isEmpty()) {
            throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
        }
        return result;
    }

    public Collection<Integer> getSourceIDs() {
        return this.sources;
    }

    public Collection<Integer> getSinkIDs() {
        return this.sinks;
    }

    public Collection<StreamNode> getStreamNodes() {
        return this.streamNodes.values();
    }

    public String getBrokerID(Integer vertexID) {
        return this.vertexIDtoBrokerID.get(vertexID);
    }

    public long getLoopTimeout(Integer vertexID) {
        return this.vertexIDtoLoopTimeout.get(vertexID);
    }

    public StreamNode getSourceVertex(StreamEdge edge) {
        return this.streamNodes.get(edge.getSourceId());
    }

    public StreamNode getTargetVertex(StreamEdge edge) {
        return this.streamNodes.get(edge.getTargetId());
    }

    @VisibleForTesting
    public JobGraph getJobGraph() {
        return this.getJobGraph(Thread.currentThread().getContextClassLoader(), this.jobId);
    }

    public JobGraph getJobGraph(ClassLoader userClassLoader) {
        return this.getJobGraph(userClassLoader, this.jobId);
    }

    public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
        return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
    }

    public String getStreamingPlanAsJSON() {
        try {
            return new JSONGenerator(this).getJSON();
        }
        catch (Exception e) {
            throw new RuntimeException("JSON plan creation failed", e);
        }
    }

    private <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInfo) {
        return typeInfo != null && !(typeInfo instanceof MissingTypeInfo) ? typeInfo.createSerializer(this.executionConfig.getSerializerConfig()) : null;
    }

    public void setJobType(JobType jobType) {
        this.jobType = jobType;
    }

    @Override
    public String getName() {
        return this.jobName;
    }

    @Override
    public JobType getJobType() {
        return this.jobType;
    }

    public boolean isAutoParallelismEnabled() {
        return this.autoParallelismEnabled;
    }

    public void setAutoParallelismEnabled(boolean autoParallelismEnabled) {
        this.autoParallelismEnabled = autoParallelismEnabled;
    }

    public PipelineOptions.VertexDescriptionMode getVertexDescriptionMode() {
        return this.descriptionMode;
    }

    public void setVertexDescriptionMode(PipelineOptions.VertexDescriptionMode mode) {
        this.descriptionMode = mode;
    }

    public void setVertexNameIncludeIndexPrefix(boolean includePrefix) {
        this.vertexNameIncludeIndexPrefix = includePrefix;
    }

    public boolean isVertexNameIncludeIndexPrefix() {
        return this.vertexNameIncludeIndexPrefix;
    }

    public void registerJobStatusHook(JobStatusHook hook) {
        Preconditions.checkNotNull(hook, "Registering a null JobStatusHook is not allowed. ");
        if (!this.jobStatusHooks.contains(hook)) {
            this.jobStatusHooks.add(hook);
        }
    }

    public List<JobStatusHook> getJobStatusHooks() {
        return this.jobStatusHooks;
    }

    public void setSupportsConcurrentExecutionAttempts(Integer vertexId, boolean supportsConcurrentExecutionAttempts) {
        StreamNode streamNode = this.getStreamNode(vertexId);
        if (streamNode != null) {
            streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
        }
    }

    public void setAttribute(Integer vertexId, Attribute attribute) {
        if (this.getStreamNode(vertexId) != null) {
            this.getStreamNode(vertexId).setAttribute(attribute);
        }
    }

    public void setJobId(JobID jobId) {
        this.jobId = jobId;
    }

    @Override
    public JobID getJobID() {
        return this.jobId;
    }

    public void setClasspath(List<URL> paths) {
        this.classpath = paths;
    }

    public List<URL> getClasspath() {
        return this.classpath;
    }

    public void addJars(List<URL> jarFilesToAttach) {
        for (URL jar : jarFilesToAttach) {
            try {
                this.addJar(new Path(jar.toURI()));
            }
            catch (URISyntaxException e) {
                throw new RuntimeException("URL is invalid. This should not happen.", e);
            }
        }
    }

    @Override
    public List<PermanentBlobKey> getUserJarBlobKeys() {
        return this.userJarBlobKeys;
    }

    @Override
    public List<URL> getClasspaths() {
        return this.classpath;
    }

    public void addUserArtifact(String name, DistributedCache.DistributedCacheEntry file) {
        if (file == null) {
            throw new IllegalArgumentException();
        }
        this.userArtifacts.putIfAbsent(name, file);
    }

    @Override
    public Map<String, DistributedCache.DistributedCacheEntry> getUserArtifacts() {
        return this.userArtifacts;
    }

    @Override
    public void addUserJarBlobKey(PermanentBlobKey key) {
        if (key == null) {
            throw new IllegalArgumentException();
        }
        if (!this.userJarBlobKeys.contains(key)) {
            this.userJarBlobKeys.add(key);
        }
    }

    @Override
    public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) throws IOException {
        byte[] serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
        this.userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry(originalEntry.filePath, originalEntry.isExecutable, serializedBlobKey, originalEntry.isZipped));
    }

    @Override
    public void writeUserArtifactEntriesToConfiguration() {
        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : this.userArtifacts.entrySet()) {
            DistributedCache.writeFileInfoToConfig(userArtifact.getKey(), userArtifact.getValue(), this.jobConfiguration);
        }
    }

    @Override
    public int getMaximumParallelism() {
        int maxParallelism = -1;
        for (StreamNode node : this.streamNodes.values()) {
            maxParallelism = Math.max(node.getParallelism(), maxParallelism);
        }
        return maxParallelism;
    }

    public void setInitialClientHeartbeatTimeout(long initialClientHeartbeatTimeout) {
        this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout;
    }

    @Override
    public long getInitialClientHeartbeatTimeout() {
        return this.initialClientHeartbeatTimeout;
    }

    @Override
    public boolean isPartialResourceConfigured() {
        boolean hasVerticesWithUnknownResource = false;
        boolean hasVerticesWithConfiguredResource = false;
        for (ResourceSpec minResource : this.streamNodeMinResources.values()) {
            if (minResource == ResourceSpec.UNKNOWN) {
                hasVerticesWithUnknownResource = true;
            } else {
                hasVerticesWithConfiguredResource = true;
            }
            if (!hasVerticesWithUnknownResource || !hasVerticesWithConfiguredResource) continue;
            return true;
        }
        return false;
    }

    public void serializeUserDefinedInstances() throws IOException {
        ExecutorService serializationExecutor = Executors.newFixedThreadPool(Math.max(1, Math.min(Hardware.getNumberCPUCores(), this.getExecutionConfig().getParallelism())), new ExecutorThreadFactory("flink-operator-serialization-io"));
        try {
            this.userDefinedObjectsHolder = new UserDefinedObjectsHolder(this.streamNodes, this.virtualSideOutputNodes, this.virtualPartitionNodes, this.executionConfig, serializationExecutor);
        }
        finally {
            serializationExecutor.shutdown();
        }
    }

    public void deserializeUserDefinedInstances(ClassLoader userClassLoader, Executor serializationExecutor) throws Exception {
        this.userDefinedObjectsHolder.deserialize(userClassLoader, serializationExecutor);
    }

    public List<StreamNode> getStreamNodesSortedTopologicallyFromSources() throws InvalidProgramException {
        if (this.streamNodes.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<StreamNode> sorted = new ArrayList<StreamNode>(this.streamNodes.size());
        LinkedHashSet<StreamNode> remaining = new LinkedHashSet<StreamNode>(this.streamNodes.values());
        for (Integer sourceNodeId : this.sources) {
            StreamNode streamNode = this.getStreamNode(sourceNodeId);
            sorted.add(streamNode);
            remaining.remove(streamNode);
        }
        int startNodePos = 0;
        while (!remaining.isEmpty()) {
            if (startNodePos >= sorted.size()) {
                throw new InvalidProgramException("The stream graph is cyclic.");
            }
            StreamNode current = (StreamNode)sorted.get(startNodePos++);
            this.addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
        }
        return sorted;
    }

    private void addNodesThatHaveNoNewPredecessors(StreamNode start, List<StreamNode> target, Set<StreamNode> remaining) {
        for (StreamEdge outEdge : start.getOutEdges()) {
            StreamNode v = this.getStreamNode(outEdge.getTargetId());
            if (!remaining.contains(v)) continue;
            boolean hasNewPredecessors = false;
            for (StreamEdge e : v.getInEdges()) {
                StreamNode source;
                if (e == outEdge || !remaining.contains(source = this.getStreamNode(e.getSourceId()))) continue;
                hasNewPredecessors = true;
                break;
            }
            if (hasNewPredecessors) continue;
            target.add(v);
            remaining.remove(v);
            this.addNodesThatHaveNoNewPredecessors(v, target, remaining);
        }
    }

    public void serializeAndSaveWatermarkDeclarations() {
        Set<AbstractInternalWatermarkDeclaration<?>> watermarkDeclarations = WatermarkUtils.getInternalWatermarkDeclarationsFromStreamGraph(this);
        if (!watermarkDeclarations.isEmpty()) {
            try {
                this.serializedWatermarkDeclarations = InstantiationUtil.serializeObject(watermarkDeclarations);
            }
            catch (IOException e) {
                throw new StreamTaskException("Could not serialize watermark declarations.", e);
            }
        }
    }

    public byte[] getSerializedWatermarkDeclarations() {
        return this.serializedWatermarkDeclarations;
    }

    public String toString() {
        return "StreamGraph(jobId: " + this.jobId + ")";
    }

    private class UserDefinedObjectsHolder
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final SerializedValue<Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>>> serializedVirtualPartitionNodes;
        private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
        private SerializedValue<Map<Integer, StreamNode>> serializedStreamNodes;
        private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> streamNodeToSerializedOperatorFactories;
        private final SerializedValue<Map<Integer, Tuple2<Integer, OutputTag>>> serializedVirtualSideOutputNodes;

        public UserDefinedObjectsHolder(Map<Integer, StreamNode> streamNodes, Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes, Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes, ExecutionConfig executionConfig, Executor serializationExecutor) throws IOException {
            this.serializeStreamNodes(streamNodes, serializationExecutor);
            this.serializedVirtualSideOutputNodes = new SerializedValue<Map<Integer, Tuple2<Integer, OutputTag>>>(virtualSideOutputNodes);
            this.serializedVirtualPartitionNodes = new SerializedValue(virtualPartitionNodes);
            this.serializedExecutionConfig = new SerializedValue<ExecutionConfig>(executionConfig);
        }

        private void serializeStreamNodes(Map<Integer, StreamNode> toBeSerializedStreamNodes, Executor serializationExecutor) {
            try {
                this.streamNodeToSerializedOperatorFactories = this.serializeOperatorFactories(toBeSerializedStreamNodes.values(), serializationExecutor);
                this.serializedStreamNodes = new SerializedValue<Map<Integer, StreamNode>>(toBeSerializedStreamNodes);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not serialize stream nodes", e);
            }
        }

        private Collection<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> serializeOperatorFactories(Collection<StreamNode> streamNodes, Executor serializationExecutor) throws Exception {
            List futures = streamNodes.stream().filter(node -> node.getOperatorFactory() != null).map(node -> this.serializeOperatorFactoriesAsync(serializationExecutor, (StreamNode)node)).collect(Collectors.toList());
            return (Collection)FutureUtils.combineAll(futures).get();
        }

        private CompletableFuture<Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>> serializeOperatorFactoriesAsync(Executor serializationExecutor, StreamNode node) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return Tuple2.of(node.getId(), new SerializedValue(node.getOperatorFactory()));
                }
                catch (Throwable throwable) {
                    throw new RuntimeException(String.format("Could not serialize stream node %s", node), throwable);
                }
            }, serializationExecutor);
        }

        private void deserialize(ClassLoader userClassLoader, Executor serializationExecutor) throws Exception {
            Collection<Tuple2<Integer, StreamOperatorFactory<?>>> streamNodeToOperatorFactories = this.deserializeOperators(userClassLoader, serializationExecutor);
            StreamGraph.this.virtualSideOutputNodes = this.serializedVirtualSideOutputNodes.deserializeValue(userClassLoader);
            StreamGraph.this.virtualPartitionNodes = this.serializedVirtualPartitionNodes.deserializeValue(userClassLoader);
            StreamGraph.this.executionConfig = this.serializedExecutionConfig.deserializeValue(userClassLoader);
            StreamGraph.this.streamNodes = this.serializedStreamNodes.deserializeValue(userClassLoader);
            streamNodeToOperatorFactories.forEach(tuple2 -> StreamGraph.this.getStreamNode((Integer)tuple2.f0).setOperatorFactory((StreamOperatorFactory)tuple2.f1));
        }

        private Collection<Tuple2<Integer, StreamOperatorFactory<?>>> deserializeOperators(ClassLoader userClassLoader, Executor serializationExecutor) throws Exception {
            List futures = this.streamNodeToSerializedOperatorFactories.stream().map(tuple2 -> this.deserializeOperatorFactoriesAsync(userClassLoader, serializationExecutor, (Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>>)tuple2)).collect(Collectors.toList());
            return (Collection)FutureUtils.combineAll(futures).get();
        }

        private CompletableFuture<Tuple2<Integer, StreamOperatorFactory<?>>> deserializeOperatorFactoriesAsync(ClassLoader userClassLoader, Executor serializationExecutor, Tuple2<Integer, SerializedValue<StreamOperatorFactory<?>>> tuple2) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    StreamOperatorFactory streamOperatorFactory = (StreamOperatorFactory)((SerializedValue)tuple2.f1).deserializeValue(userClassLoader);
                    return Tuple2.of((Integer)tuple2.f0, streamOperatorFactory);
                }
                catch (Throwable throwable) {
                    throw new RuntimeException(String.format("Could not deserialize stream node %s", tuple2.f0), throwable);
                }
            }, serializationExecutor);
        }
    }
}

