package org.apache.flink.streaming.graph;

import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.class */
class StreamingJobGraphGeneratorNodeHashTest {

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpFilterFunction.class */
    private static class NoOpFilterFunction implements FilterFunction<String> {
        private static final long serialVersionUID = 500005424900187476L;

        private NoOpFilterFunction() {
        }

        public boolean filter(String str) throws Exception {
            return true;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpKeySelector.class */
    private static class NoOpKeySelector implements KeySelector<String, String> {
        private static final long serialVersionUID = -96127515593422991L;

        private NoOpKeySelector() {
        }

        public String getKey(String str) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpMapFunction.class */
    private static class NoOpMapFunction implements MapFunction<String, String> {
        private static final long serialVersionUID = 6584823409744624276L;

        private NoOpMapFunction() {
        }

        public String map(String str) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpReduceFunction.class */
    private static class NoOpReduceFunction implements ReduceFunction<String> {
        private static final long serialVersionUID = -8775747640749256372L;

        private NoOpReduceFunction() {
        }

        public String reduce(String str, String str2) throws Exception {
            return str;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest$NoOpSourceFunction.class */
    private static class NoOpSourceFunction implements ParallelSourceFunction<String> {
        private static final long serialVersionUID = -5459224792698512636L;

        private NoOpSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
        }

        public void cancel() {
        }
    }

    StreamingJobGraphGeneratorNodeHashTest() {
    }

    @Test
    void testNodeHashIsDeterministic() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "src0").map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").map(new NoOpMapFunction()).union(new DataStream[]{createLocalEnvironment.addSource(new NoOpSourceFunction(), "src1").filter(new NoOpFilterFunction()), createLocalEnvironment.addSource(new NoOpSourceFunction(), "src2").filter(new NoOpFilterFunction())}).sinkTo(new DiscardingSink()).name("sink");
        Map<JobVertexID, String> rememberIds = rememberIds(createLocalEnvironment.getStreamGraph().getJobGraph());
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.addSource(new NoOpSourceFunction(), "src0").map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").map(new NoOpMapFunction()).union(new DataStream[]{createLocalEnvironment2.addSource(new NoOpSourceFunction(), "src1").filter(new NoOpFilterFunction()), createLocalEnvironment2.addSource(new NoOpSourceFunction(), "src2").filter(new NoOpFilterFunction())}).sinkTo(new DiscardingSink()).name("sink");
        verifyIdsEqual(createLocalEnvironment2.getStreamGraph().getJobGraph(), rememberIds);
    }

    @Test
    void testNodeHashIdenticalSources() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).union(new DataStream[]{createLocalEnvironment.addSource(new NoOpSourceFunction())}).sinkTo(new DiscardingSink());
        List verticesSortedTopologicallyFromSources = createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources();
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).isInputVertex()).isTrue();
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).isInputVertex()).isTrue();
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getID()).isNotNull();
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getID()).isNotNull();
        Assertions.assertThat(((JobVertex) verticesSortedTopologicallyFromSources.get(0)).getID()).isNotEqualTo(((JobVertex) verticesSortedTopologicallyFromSources.get(1)).getID());
    }

    @Test
    void testNodeHashAfterSourceUnchaining() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).filter(new NoOpFilterFunction()).sinkTo(new DiscardingSink());
        JobVertexID id = ((JobVertex) createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(0)).getID();
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).startNewChain().filter(new NoOpFilterFunction()).sinkTo(new DiscardingSink());
        Assertions.assertThat(((JobVertex) createLocalEnvironment2.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(0)).getID()).isNotEqualTo(id);
    }

    @Test
    void testNodeHashAfterIntermediateUnchaining() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).name("map").startNewChain().filter(new NoOpFilterFunction()).sinkTo(new DiscardingSink());
        JobVertex jobVertex = (JobVertex) createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(1);
        Assertions.assertThat(jobVertex.getName()).startsWith("map");
        JobVertexID id = jobVertex.getID();
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).name("map").startNewChain().filter(new NoOpFilterFunction()).startNewChain().sinkTo(new DiscardingSink());
        JobVertex jobVertex2 = (JobVertex) createLocalEnvironment2.getStreamGraph().getJobGraph().getVerticesSortedTopologicallyFromSources().get(1);
        Assertions.assertThat(jobVertex2.getName()).isEqualTo("map");
        Assertions.assertThat(jobVertex2.getID()).isNotEqualTo(id);
    }

    @Test
    void testNodeHashIdenticalNodes() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        DataStreamSource addSource = createLocalEnvironment.addSource(new NoOpSourceFunction());
        addSource.map(new NoOpMapFunction()).sinkTo(new DiscardingSink());
        addSource.map(new NoOpMapFunction()).sinkTo(new DiscardingSink());
        JobGraph jobGraph = createLocalEnvironment.getStreamGraph().getJobGraph();
        HashSet hashSet = new HashSet();
        Iterator it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(hashSet.add(((JobVertex) it.next()).getID())).isTrue();
        }
    }

    @Test
    void testChangedOperatorName() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "A").map(new NoOpMapFunction());
        JobVertexID id = createLocalEnvironment.getStreamGraph().getJobGraph().getVerticesAsArray()[0].getID();
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.addSource(new NoOpSourceFunction(), "B").map(new NoOpMapFunction());
        Assertions.assertThat(createLocalEnvironment2.getStreamGraph().getJobGraph().getVerticesAsArray()[0].getID()).isEqualTo(id);
    }

    @Test
    void testManualHashAssignment() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        SingleOutputStreamOperator uid = createLocalEnvironment.addSource(new NoOpSourceFunction()).name("source").uid("source");
        uid.map(new NoOpMapFunction()).sinkTo(new DiscardingSink()).name("sink0").uid("sink0");
        uid.map(new NoOpMapFunction()).sinkTo(new DiscardingSink()).name("sink1").uid("sink1");
        JobGraph jobGraph = createLocalEnvironment.getStreamGraph().getJobGraph();
        HashSet hashSet = new HashSet();
        Iterator it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(hashSet.add(((JobVertex) it.next()).getID())).isTrue();
        }
        LocalStreamEnvironment createLocalEnvironment2 = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment2.setParallelism(4);
        createLocalEnvironment2.disableOperatorChaining();
        SingleOutputStreamOperator uid2 = createLocalEnvironment2.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).name("source").uid("source");
        uid2.map(new NoOpMapFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).sinkTo(new DiscardingSink()).name("sink0").uid("sink0");
        uid2.map(new NoOpMapFunction()).keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).sinkTo(new DiscardingSink()).name("sink1").uid("sink1");
        JobGraph jobGraph2 = createLocalEnvironment2.getStreamGraph().getJobGraph();
        Assertions.assertThat(jobGraph2.getJobID()).isNotEqualTo(jobGraph.getJobID());
        for (JobVertex jobVertex : jobGraph2.getVertices()) {
            if (jobVertex.getName().endsWith("source") || jobVertex.getName().endsWith("sink0") || jobVertex.getName().endsWith("sink1")) {
                Assertions.assertThat(jobVertex.getID()).isIn(hashSet);
            }
        }
    }

    @Test
    void testManualHashAssignmentCollisionThrowsException() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.disableOperatorChaining();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).uid("source").map(new NoOpMapFunction()).uid("source").sinkTo(new DiscardingSink());
        Assertions.assertThatThrownBy(() -> {
            createLocalEnvironment.getStreamGraph().getJobGraph();
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).map(new NoOpMapFunction()).uid("map").sinkTo(new DiscardingSink());
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    void testManualHashAssignmentForStartNodeInInChain() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(4);
        createLocalEnvironment.addSource(new NoOpSourceFunction()).uid("source").map(new NoOpMapFunction()).sinkTo(new DiscardingSink());
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    void testUserProvidedHashingOnChainSupported() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").map(new NoOpMapFunction()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").filter(new NoOpFilterFunction()).setUidHash("cccccccccccccccccccccccccccccccc").keyBy(new NoOpKeySelector()).reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
        createLocalEnvironment.getStreamGraph().getJobGraph();
    }

    @Test
    void testDisablingAutoUidsFailsStreamGraphCreation() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.getConfig().disableAutoGeneratedUIDs();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).sinkTo(new DiscardingSink());
        createLocalEnvironment.getClass();
        Assertions.assertThatThrownBy(createLocalEnvironment::getStreamGraph).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testDisablingAutoUidsAcceptsManuallySetId() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.getConfig().disableAutoGeneratedUIDs();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).uid("uid1").sinkTo(new DiscardingSink()).uid("uid2");
        createLocalEnvironment.getStreamGraph();
    }

    @Test
    void testDisablingAutoUidsAcceptsManuallySetHash() {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.getConfig().disableAutoGeneratedUIDs();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").addSink(new org.apache.flink.streaming.api.functions.sink.DiscardingSink()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
        createLocalEnvironment.getStreamGraph();
    }

    @Test
    void testDisablingAutoUidsWorksWithKeyBy() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.getConfig().disableAutoGeneratedUIDs();
        createLocalEnvironment.addSource(new NoOpSourceFunction()).setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").keyBy(str -> {
            return str;
        }).addSink(new org.apache.flink.streaming.api.functions.sink.DiscardingSink()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
        createLocalEnvironment.getStreamGraph();
    }

    private Map<JobVertexID, String> rememberIds(JobGraph jobGraph) {
        HashMap hashMap = new HashMap();
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            hashMap.put(jobVertex.getID(), jobVertex.getName());
        }
        return hashMap;
    }

    private void verifyIdsEqual(JobGraph jobGraph, Map<JobVertexID, String> map) {
        Assertions.assertThat(map).hasSize(jobGraph.getNumberOfVertices());
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            Assertions.assertThat(jobVertex.getName()).isNotNull().isEqualTo(map.get(jobVertex.getID()));
        }
    }

    private void verifyIdsNotEqual(JobGraph jobGraph, Map<JobVertexID, String> map) {
        Assertions.assertThat(map).hasSize(jobGraph.getNumberOfVertices());
        Iterator it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(map).doesNotContainKey(((JobVertex) it.next()).getID());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 336357648:
                if (implMethodName.equals("lambda$testDisablingAutoUidsWorksWithKeyBy$3558be8e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/String;")) {
                    return str -> {
                        return str;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
