/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphContext;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph;
import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DefaultStreamGraphContext
implements StreamGraphContext {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamGraphContext.class);
    private final StreamGraph streamGraph;
    private final ImmutableStreamGraph immutableStreamGraph;
    private final Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap;
    private final Map<Integer, Integer> frozenNodeToStartNodeMap;
    private final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputsCaches;
    private final Map<String, IntermediateDataSet> consumerEdgeIdToIntermediateDataSetMap;
    private final Set<Integer> finishedStreamNodeIds;
    @Nullable
    private final StreamGraphContext.StreamGraphUpdateListener streamGraphUpdateListener;

    @VisibleForTesting
    public DefaultStreamGraphContext(StreamGraph streamGraph, Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap, Map<Integer, Integer> frozenNodeToStartNodeMap, Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputsCaches, Map<String, IntermediateDataSet> consumerEdgeIdToIntermediateDataSetMap, Set<Integer> finishedStreamNodeIds, ClassLoader userClassloader) {
        this(streamGraph, steamNodeIdToForwardGroupMap, frozenNodeToStartNodeMap, opIntermediateOutputsCaches, consumerEdgeIdToIntermediateDataSetMap, finishedStreamNodeIds, userClassloader, null);
    }

    public DefaultStreamGraphContext(StreamGraph streamGraph, Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap, Map<Integer, Integer> frozenNodeToStartNodeMap, Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputsCaches, Map<String, IntermediateDataSet> consumerEdgeIdToIntermediateDataSetMap, Set<Integer> finishedStreamNodeIds, ClassLoader userClassloader, @Nullable StreamGraphContext.StreamGraphUpdateListener streamGraphUpdateListener) {
        this.streamGraph = Preconditions.checkNotNull(streamGraph);
        this.steamNodeIdToForwardGroupMap = Preconditions.checkNotNull(steamNodeIdToForwardGroupMap);
        this.frozenNodeToStartNodeMap = Preconditions.checkNotNull(frozenNodeToStartNodeMap);
        this.opIntermediateOutputsCaches = Preconditions.checkNotNull(opIntermediateOutputsCaches);
        this.immutableStreamGraph = new ImmutableStreamGraph(this.streamGraph, userClassloader);
        this.consumerEdgeIdToIntermediateDataSetMap = Preconditions.checkNotNull(consumerEdgeIdToIntermediateDataSetMap);
        this.finishedStreamNodeIds = finishedStreamNodeIds;
        this.streamGraphUpdateListener = streamGraphUpdateListener;
    }

    @Override
    public ImmutableStreamGraph getStreamGraph() {
        return this.immutableStreamGraph;
    }

    @Override
    @Nullable
    public StreamOperatorFactory<?> getOperatorFactory(Integer streamNodeId) {
        return this.streamGraph.getStreamNode(streamNodeId).getOperatorFactory();
    }

    @Override
    public boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos) {
        for (StreamEdgeUpdateRequestInfo requestInfo : requestInfos) {
            if (this.validateStreamEdgeUpdateRequest(requestInfo)) continue;
            return false;
        }
        for (StreamEdgeUpdateRequestInfo requestInfo : requestInfos) {
            StreamEdge targetEdge = this.getStreamEdge(requestInfo.getSourceId(), requestInfo.getTargetId(), requestInfo.getEdgeId());
            StreamPartitioner<?> newPartitioner = requestInfo.getOutputPartitioner();
            if (newPartitioner != null) {
                this.modifyOutputPartitioner(targetEdge, newPartitioner);
            }
            if (requestInfo.getTypeNumber() != 0) {
                targetEdge.setTypeNumber(requestInfo.getTypeNumber());
            }
            if (requestInfo.getIntraInputKeyCorrelated() == null) continue;
            this.modifyIntraInputKeyCorrelation(targetEdge, requestInfo.getIntraInputKeyCorrelated());
        }
        if (this.streamGraphUpdateListener != null) {
            this.streamGraphUpdateListener.onStreamGraphUpdated();
        }
        return true;
    }

    @Override
    public boolean modifyStreamNode(List<StreamNodeUpdateRequestInfo> requestInfos) {
        for (StreamNodeUpdateRequestInfo requestInfo : requestInfos) {
            StreamNode streamNode = this.streamGraph.getStreamNode(requestInfo.getNodeId());
            if (requestInfo.getTypeSerializersIn() == null) continue;
            if (requestInfo.getTypeSerializersIn().length != streamNode.getTypeSerializersIn().length) {
                LOG.info("Modification for node {} is not allowed as the array size of typeSerializersIn is not matched.", (Object)requestInfo.getNodeId());
                return false;
            }
            streamNode.setSerializersIn(requestInfo.getTypeSerializersIn());
        }
        if (this.streamGraphUpdateListener != null) {
            this.streamGraphUpdateListener.onStreamGraphUpdated();
        }
        return true;
    }

    @Override
    public boolean checkUpstreamNodesFinished(ImmutableStreamNode streamNode, Integer typeNumber) {
        List inEdgesWithTypeNumber = streamNode.getInEdges().stream().filter(edge -> typeNumber == null || edge.getTypeNumber() == typeNumber.intValue()).collect(Collectors.toList());
        Preconditions.checkState(!inEdgesWithTypeNumber.isEmpty(), String.format("The stream edge with typeNumber %s does not exist.", typeNumber));
        return inEdgesWithTypeNumber.stream().allMatch(edge -> this.finishedStreamNodeIds.contains(edge.getSourceId()));
    }

    @Override
    public IntermediateDataSetID getConsumedIntermediateDataSetId(String edgeId) {
        return this.consumerEdgeIdToIntermediateDataSetMap.get(edgeId).getId();
    }

    @Override
    public StreamPartitioner<?> getOutputPartitioner(String edgeId, Integer sourceId, Integer targetId) {
        return Preconditions.checkNotNull(this.getStreamEdge(sourceId, targetId, edgeId)).getPartitioner();
    }

    private boolean validateStreamEdgeUpdateRequest(StreamEdgeUpdateRequestInfo requestInfo) {
        Integer sourceNodeId = requestInfo.getSourceId();
        Integer targetNodeId = requestInfo.getTargetId();
        StreamEdge targetEdge = this.getStreamEdge(sourceNodeId, targetNodeId, requestInfo.getEdgeId());
        if (requestInfo.getOutputPartitioner() != null) {
            Set consumerStreamEdges;
            NonChainedOutput output;
            Map<StreamEdge, NonChainedOutput> opIntermediateOutputs = this.opIntermediateOutputsCaches.get(sourceNodeId);
            NonChainedOutput nonChainedOutput = output = opIntermediateOutputs != null ? opIntermediateOutputs.get(targetEdge) : null;
            if (output != null && (consumerStreamEdges = opIntermediateOutputs.entrySet().stream().filter(entry -> ((NonChainedOutput)entry.getValue()).equals(output)).map(Map.Entry::getKey).collect(Collectors.toSet())).size() != 1) {
                LOG.info("Skip modifying edge {} because the subscribing output is reused.", (Object)targetEdge);
                return false;
            }
        }
        if (this.frozenNodeToStartNodeMap.containsKey(targetNodeId)) {
            LOG.info("Skip modifying edge {} because the target node with id {} is in frozen list.", (Object)targetEdge, (Object)targetNodeId);
            return false;
        }
        StreamPartitioner<?> newPartitioner = requestInfo.getOutputPartitioner();
        if (newPartitioner != null) {
            if (targetEdge.getPartitioner().getClass().equals(ForwardPartitioner.class)) {
                LOG.info("Modification for edge {} is not allowed as the origin partitioner is ForwardPartitioner.", (Object)targetEdge);
                return false;
            }
            if (newPartitioner.getClass().equals(ForwardPartitioner.class) && !ForwardGroupComputeUtil.canTargetMergeIntoSourceForwardGroup(this.steamNodeIdToForwardGroupMap.get(targetEdge.getSourceId()), this.steamNodeIdToForwardGroupMap.get(targetEdge.getTargetId()))) {
                LOG.info("Skip modifying edge {} because forward groups can not be merged.", (Object)targetEdge);
                return false;
            }
        }
        return true;
    }

    private void modifyOutputPartitioner(StreamEdge targetEdge, StreamPartitioner<?> newPartitioner) {
        Map<StreamEdge, NonChainedOutput> opIntermediateOutputs;
        NonChainedOutput output;
        if (newPartitioner == null) {
            return;
        }
        StreamPartitioner<?> oldPartitioner = targetEdge.getPartitioner();
        targetEdge.setPartitioner(newPartitioner);
        if (targetEdge.getPartitioner() instanceof ForwardPartitioner) {
            this.tryConvertForwardPartitionerAndMergeForwardGroup(targetEdge);
        }
        NonChainedOutput nonChainedOutput = output = (opIntermediateOutputs = this.opIntermediateOutputsCaches.get(targetEdge.getSourceId())) != null ? opIntermediateOutputs.get(targetEdge) : null;
        if (output != null) {
            output.setPartitioner(targetEdge.getPartitioner());
        }
        Optional.ofNullable(this.consumerEdgeIdToIntermediateDataSetMap.get(targetEdge.getEdgeId())).ifPresent(dataSet -> {
            DistributionPattern distributionPattern = targetEdge.getPartitioner().isPointwise() ? DistributionPattern.POINTWISE : DistributionPattern.ALL_TO_ALL;
            dataSet.updateOutputPattern(distributionPattern, targetEdge.getPartitioner().isBroadcast(), targetEdge.getPartitioner().getClass().equals(ForwardPartitioner.class));
        });
        LOG.info("The original partitioner of the edge {} is: {} , requested change to: {} , and finally modified to: {}.", new Object[]{targetEdge, oldPartitioner, newPartitioner, targetEdge.getPartitioner()});
    }

    private void modifyIntraInputKeyCorrelation(StreamEdge targetEdge, boolean existIntraInputKeyCorrelation) {
        if (targetEdge.isIntraInputKeyCorrelated() == existIntraInputKeyCorrelation) {
            return;
        }
        targetEdge.setIntraInputKeyCorrelated(existIntraInputKeyCorrelation);
    }

    private void tryConvertForwardPartitionerAndMergeForwardGroup(StreamEdge targetEdge) {
        Preconditions.checkState(targetEdge.getPartitioner() instanceof ForwardPartitioner);
        Integer sourceNodeId = targetEdge.getSourceId();
        Integer targetNodeId = targetEdge.getTargetId();
        if (this.canConvertToForwardPartitioner(targetEdge)) {
            targetEdge.setPartitioner(new ForwardPartitioner());
            Preconditions.checkState(this.mergeForwardGroups(sourceNodeId, targetNodeId));
        } else if (targetEdge.getPartitioner() instanceof ForwardForUnspecifiedPartitioner) {
            targetEdge.setPartitioner(new RescalePartitioner());
        } else if (targetEdge.getPartitioner() instanceof ForwardForConsecutiveHashPartitioner) {
            targetEdge.setPartitioner(((ForwardForConsecutiveHashPartitioner)targetEdge.getPartitioner()).getHashPartitioner());
        } else {
            Preconditions.checkState(this.mergeForwardGroups(sourceNodeId, targetNodeId));
        }
    }

    private boolean canConvertToForwardPartitioner(StreamEdge targetEdge) {
        Integer sourceNodeId = targetEdge.getSourceId();
        Integer targetNodeId = targetEdge.getTargetId();
        if (targetEdge.getPartitioner() instanceof ForwardForUnspecifiedPartitioner) {
            return !this.frozenNodeToStartNodeMap.containsKey(sourceNodeId) && StreamingJobGraphGenerator.isChainable(targetEdge, this.streamGraph, true) && ForwardGroupComputeUtil.canTargetMergeIntoSourceForwardGroup(this.steamNodeIdToForwardGroupMap.get(sourceNodeId), this.steamNodeIdToForwardGroupMap.get(targetNodeId));
        }
        if (targetEdge.getPartitioner() instanceof ForwardForConsecutiveHashPartitioner) {
            return ForwardGroupComputeUtil.canTargetMergeIntoSourceForwardGroup(this.steamNodeIdToForwardGroupMap.get(sourceNodeId), this.steamNodeIdToForwardGroupMap.get(targetNodeId));
        }
        return false;
    }

    private boolean mergeForwardGroups(Integer sourceNodeId, Integer targetNodeId) {
        StreamNodeForwardGroup sourceForwardGroup = this.steamNodeIdToForwardGroupMap.get(sourceNodeId);
        StreamNodeForwardGroup forwardGroupToMerge = this.steamNodeIdToForwardGroupMap.get(targetNodeId);
        if (sourceForwardGroup == null || forwardGroupToMerge == null) {
            return false;
        }
        if (!sourceForwardGroup.mergeForwardGroup(forwardGroupToMerge)) {
            return false;
        }
        forwardGroupToMerge.getVertexIds().forEach(nodeId -> this.steamNodeIdToForwardGroupMap.put((Integer)nodeId, sourceForwardGroup));
        return true;
    }

    private StreamEdge getStreamEdge(Integer sourceId, Integer targetId, String edgeId) {
        for (StreamEdge edge : this.streamGraph.getStreamEdges(sourceId, targetId)) {
            if (!edge.getEdgeId().equals(edgeId)) continue;
            return edge;
        }
        throw new RuntimeException(String.format("Stream edge with id '%s' is not found whose source id is %d, target id is %d.", edgeId, sourceId, targetId));
    }
}

