/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamGraphGeneratorTest;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;

class StreamGraphGeneratorExecutionModeDetectionTest {
    StreamGraphGeneratorExecutionModeDetectionTest() {
    }

    @Test
    void testExecutionModePropagationFromEnvWithDefaultAndBoundedSource() {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.enableCheckpointing(100L);
        environment.fromSource((Source)new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), "bounded-source").print();
        Assertions.assertThat((Object)environment.getStreamGraph()).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, true, true)));
    }

    @Test
    void testExecutionModePropagationFromEnvWithDefaultAndUnboundedSource() {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.fromSource((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100), WatermarkStrategy.noWatermarks(), "unbounded-source").print();
        Assertions.assertThat((Object)environment.getStreamGraph()).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testExecutionModePropagationFromEnvWithAutomaticAndBoundedSource() {
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.AUTOMATIC);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.enableCheckpointing(100L);
        environment.configure((ReadableConfig)config, this.getClass().getClassLoader());
        environment.fromSource((Source)new MockSource(Boundedness.BOUNDED, 100), WatermarkStrategy.noWatermarks(), "bounded-source").print();
        Assertions.assertThat((boolean)environment.isChainingEnabled()).isTrue();
        Assertions.assertThat((long)environment.getCheckpointInterval()).isEqualTo(100L);
        StreamGraph streamGraph = environment.getStreamGraph();
        Assertions.assertThat((Object)streamGraph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false)));
    }

    @Test
    void testExecutionModePropagationFromEnvWithBatchAndUnboundedSource() {
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.configure((ReadableConfig)config, this.getClass().getClassLoader());
        environment.fromSource((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100), WatermarkStrategy.noWatermarks(), "unbounded-source").print();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)environment).getStreamGraph()).isInstanceOf(IllegalStateException.class)).hasMessageContaining("combination is not allowed");
    }

    @Test
    void testDetectionThroughTransitivePredecessors() {
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat((Comparable)bounded.getBoundedness()).isEqualTo((Object)Boundedness.BOUNDED);
        SourceTransformation<Integer, ?, ?> unbounded = this.getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat((Comparable)unbounded.getBoundedness()).isEqualTo((Object)Boundedness.CONTINUOUS_UNBOUNDED);
        TwoInputTransformation resultTransform = new TwoInputTransformation(bounded, unbounded, "Test Two Input Transformation", (StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)new StreamGraphGeneratorTest.OutputTypeConfigurableOperationWithTwoInputs()), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1);
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{resultTransform});
        Assertions.assertThat((Object)graph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testBoundedDetection() {
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat((Comparable)bounded.getBoundedness()).isEqualTo((Object)Boundedness.BOUNDED);
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{bounded});
        Assertions.assertThat((Object)graph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false)));
    }

    @Test
    void testUnboundedDetection() {
        SourceTransformation<Integer, ?, ?> unbounded = this.getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat((Comparable)unbounded.getBoundedness()).isEqualTo((Object)Boundedness.CONTINUOUS_UNBOUNDED);
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{unbounded});
        Assertions.assertThat((Object)graph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testMixedDetection() {
        SourceTransformation<Integer, ?, ?> unbounded = this.getSourceTransformation("Unbounded Source", Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat((Comparable)unbounded.getBoundedness()).isEqualTo((Object)Boundedness.CONTINUOUS_UNBOUNDED);
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat((Comparable)bounded.getBoundedness()).isEqualTo((Object)Boundedness.BOUNDED);
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{unbounded});
        Assertions.assertThat((Object)graph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    @Test
    void testExplicitOverridesDetectedMode() {
        SourceTransformation<Integer, ?, ?> bounded = this.getSourceTransformation("Bounded Source", Boundedness.BOUNDED);
        Assertions.assertThat((Comparable)bounded.getBoundedness()).isEqualTo((Object)Boundedness.BOUNDED);
        StreamGraph graph = this.generateStreamGraph(RuntimeExecutionMode.AUTOMATIC, new Transformation[]{bounded});
        Assertions.assertThat((Object)graph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING, JobType.BATCH, false, false)));
        StreamGraph streamingGraph = this.generateStreamGraph(RuntimeExecutionMode.STREAMING, new Transformation[]{bounded});
        Assertions.assertThat((Object)streamingGraph).is((Condition)HamcrestCondition.matching(StreamGraphGeneratorExecutionModeDetectionTest.hasProperties(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED, JobType.STREAMING, false, true)));
    }

    private StreamGraph generateStreamGraph(RuntimeExecutionMode initMode, Transformation<?> ... transformations) {
        ArrayList registeredTransformations = new ArrayList();
        Collections.addAll(registeredTransformations, transformations);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)initMode);
        return new StreamGraphGenerator(registeredTransformations, new ExecutionConfig(), new CheckpointConfig(), configuration).generate();
    }

    private SourceTransformation<Integer, ?, ?> getSourceTransformation(String name, Boundedness boundedness) {
        return new SourceTransformation(name, (Source)new MockSource(boundedness, 100), WatermarkStrategy.noWatermarks(), IntegerTypeInfo.of(Integer.class), 1);
    }

    private static TypeSafeMatcher<StreamGraph> hasProperties(final GlobalStreamExchangeMode exchangeMode, final JobType jobType, final boolean isCheckpointingEnabled, final boolean isAllVerticesInSameSlotSharingGroupByDefault) {
        return new TypeSafeMatcher<StreamGraph>(){

            protected boolean matchesSafely(StreamGraph actualStreamGraph) {
                return exchangeMode == actualStreamGraph.getGlobalStreamExchangeMode() && jobType == actualStreamGraph.getJobType() && actualStreamGraph.getCheckpointConfig().isCheckpointingEnabled() == isCheckpointingEnabled && actualStreamGraph.isAllVerticesInSameSlotSharingGroupByDefault() == isAllVerticesInSameSlotSharingGroupByDefault;
            }

            public void describeTo(Description description) {
                description.appendText("a StreamGraph with exchangeMode=").appendValue((Object)exchangeMode).appendText(", jobType=").appendValue((Object)jobType).appendText(", isCheckpointingEnabled=").appendValue((Object)isCheckpointingEnabled).appendText(", isAllVerticesInSameSlotSharingGroupByDefault=").appendValue((Object)isAllVerticesInSameSlotSharingGroupByDefault);
            }

            protected void describeMismatchSafely(StreamGraph item, Description mismatchDescription) {
                mismatchDescription.appendText("was ").appendText("a StreamGraph with exchangeMode=").appendValue((Object)exchangeMode).appendText(", jobType=").appendValue((Object)jobType).appendText(", isCheckpointingEnabled=").appendValue((Object)isCheckpointingEnabled).appendText(", isAllVerticesInSameSlotSharingGroupByDefault=").appendValue((Object)isAllVerticesInSameSlotSharingGroupByDefault);
            }
        };
    }
}

