/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;

class SinkV2TransformationTranslatorITCase {
    static final String NAME = "FileSink";
    static final String SLOT_SHARE_GROUP = "FileGroup";
    static final String UID = "FileUid";
    static final int PARALLELISM = 2;

    SinkV2TransformationTranslatorITCase() {
    }

    private static void assertNoUnalignedOutput(StreamNode src) {
        Assertions.assertThat((List)src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints());
    }

    private static void assertUnalignedOutput(StreamNode src) {
        Assertions.assertThat((List)src.getOutEdges()).allMatch(StreamEdge::supportsUnalignedCheckpoints);
    }

    Sink<Integer> simpleSink() {
        return TestSinkV2.newBuilder().build();
    }

    Sink<Integer> sinkWithCommitter() {
        return TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).build();
    }

    Sink<Integer> sinkWithCommitterAndGlobalCommitter() {
        return TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWithPostCommitTopology(true).build();
    }

    DataStreamSink<Integer> sinkTo(DataStream<Integer> stream, Sink<Integer> sink) {
        return stream.sinkTo(sink);
    }

    @Test
    void testSettingOperatorUidHash() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromData((Object[])new Integer[]{1, 2});
        String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
        String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
        String globalCommitterHash = "77e6aa6eeb1643b3765e1e4a7a672f37";
        CustomSinkOperatorUidHashes operatorsUidHashes = CustomSinkOperatorUidHashes.builder().setWriterUidHash("f6b178ce445dc3ffaa06bad27a51fead").setCommitterUidHash("68ac8ae79eae4e3135a54f9689c4aa10").setGlobalCommitterUidHash("77e6aa6eeb1643b3765e1e4a7a672f37").build();
        src.sinkTo(this.sinkWithCommitterAndGlobalCommitter(), operatorsUidHashes).name(NAME);
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((String)this.findWriter(streamGraph).getUserHash()).isEqualTo("f6b178ce445dc3ffaa06bad27a51fead");
        Assertions.assertThat((String)this.findCommitter(streamGraph).getUserHash()).isEqualTo("68ac8ae79eae4e3135a54f9689c4aa10");
        Assertions.assertThat((String)this.findGlobalCommitter(streamGraph).getUserHash()).isEqualTo("77e6aa6eeb1643b3765e1e4a7a672f37");
    }

    @Test
    void testSettingOperatorUids() {
        String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromData((Object[])new Integer[]{1, 2});
        src.sinkTo(this.sinkWithCommitterAndGlobalCommitter()).name(NAME).uid("f6b178ce445dc3ffaa06bad27a51fead");
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((String)this.findWriter(streamGraph).getTransformationUID()).isEqualTo("f6b178ce445dc3ffaa06bad27a51fead");
        Assertions.assertThat((String)this.findCommitter(streamGraph).getTransformationUID()).isEqualTo(String.format("Sink Committer: %s", "f6b178ce445dc3ffaa06bad27a51fead"));
        Assertions.assertThat((String)this.findGlobalCommitter(streamGraph).getTransformationUID()).isEqualTo(String.format("Sink %s Global Committer", "f6b178ce445dc3ffaa06bad27a51fead"));
    }

    @Test
    void testSettingOperatorNames() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromData((Object[])new Integer[]{1, 2});
        src.sinkTo(this.sinkWithCommitterAndGlobalCommitter()).name(NAME);
        StreamGraph streamGraph = env.getStreamGraph();
        Assertions.assertThat((String)this.findWriter(streamGraph).getOperatorName()).isEqualTo(String.format("%s: Writer", NAME));
        Assertions.assertThat((String)this.findCommitter(streamGraph).getOperatorName()).isEqualTo(String.format("%s: Committer", NAME));
        Assertions.assertThat((String)this.findGlobalCommitter(streamGraph).getOperatorName()).isEqualTo(String.format("%s: Global Committer", NAME));
    }

    @ParameterizedTest
    @EnumSource(value=RuntimeExecutionMode.class)
    void generateWriterTopology(RuntimeExecutionMode runtimeExecutionMode) {
        StreamGraph streamGraph = this.buildGraph(this.simpleSink(), runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeName(streamGraph, node -> node.contains("Source"));
        StreamNode writerNode = this.findWriter(streamGraph);
        Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(2);
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, SinkWriterOperatorFactory.class, 2, -1);
    }

    @ParameterizedTest
    @EnumSource(value=RuntimeExecutionMode.class)
    void generateWriterCommitterTopology(RuntimeExecutionMode runtimeExecutionMode) {
        StreamGraph streamGraph = this.buildGraph(this.sinkWithCommitter(), runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeName(streamGraph, node -> node.contains("Source"));
        StreamNode writerNode = this.findWriter(streamGraph);
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, SinkWriterOperatorFactory.class, 2, -1);
        StreamNode committerNode = this.findNodeName(streamGraph, name -> name.contains("Committer"));
        Assertions.assertThat((Collection)streamGraph.getStreamNodes()).hasSize(3);
        SinkV2TransformationTranslatorITCase.assertNoUnalignedOutput(writerNode);
        SinkV2TransformationTranslatorITCase.assertUnalignedOutput(sourceNode);
        this.validateTopology(writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, committerNode, CommitterOperatorFactory.class, 2, -1);
    }

    @ParameterizedTest
    @CsvSource(value={"STREAMING, true", "STREAMING, false", "BATCH, true", "BATCH, false"})
    void testParallelismConfigured(RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) {
        StreamGraph streamGraph = this.buildGraph(this.sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism);
        StreamNode writerNode = this.findWriter(streamGraph);
        StreamNode committerNode = this.findCommitter(streamGraph);
        Assertions.assertThat((boolean)writerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism);
        Assertions.assertThat((boolean)committerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism);
    }

    @ParameterizedTest
    @EnumSource(value=RuntimeExecutionMode.class)
    void throwExceptionWithoutSettingUid(RuntimeExecutionMode runtimeExecutionMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)runtimeExecutionMode);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        env.getConfig().disableAutoGeneratedUIDs();
        this.sinkTo((DataStream<Integer>)env.fromData((Object[])new Integer[]{1, 2}), this.simpleSink());
        Assertions.assertThatThrownBy(() -> ((StreamExecutionEnvironment)env).getStreamGraph()).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void disableOperatorChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSink dataStreamSink = this.sinkTo((DataStream<Integer>)src, this.sinkWithCommitter()).name(NAME);
        dataStreamSink.disableChaining();
        StreamGraph streamGraph = env.getStreamGraph();
        StreamNode writer = this.findWriter(streamGraph);
        StreamNode committer = this.findCommitter(streamGraph);
        Assertions.assertThat((Comparable)writer.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.NEVER);
        Assertions.assertThat((Comparable)committer.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.NEVER);
    }

    void validateTopology(StreamNode src, Class<?> srcOutTypeInfo, StreamNode dest, Class<? extends StreamOperatorFactory> operatorFactoryClass, int expectedParallelism, int expectedMaxParallelism) {
        StreamEdge srcOutEdge = (StreamEdge)src.getOutEdges().get(0);
        Assertions.assertThat((int)srcOutEdge.getTargetId()).isEqualTo(dest.getId());
        Assertions.assertThat((Object)src.getTypeSerializerOut()).isInstanceOf(srcOutTypeInfo);
        StreamEdge destInputEdge = (StreamEdge)dest.getInEdges().get(0);
        Assertions.assertThat((int)destInputEdge.getTargetId()).isEqualTo(dest.getId());
        Assertions.assertThat((Object)dest.getTypeSerializersIn()[0]).isInstanceOf(srcOutTypeInfo);
        Assertions.assertThat((String)dest.getOperatorName()).isNotEqualTo((Object)src.getOperatorName());
        Assertions.assertThat((String)dest.getTransformationUID()).isNotEqualTo((Object)src.getTransformationUID());
        Assertions.assertThat((Object)dest.getOperatorFactory()).isInstanceOf(operatorFactoryClass);
        Assertions.assertThat((int)dest.getParallelism()).isEqualTo(expectedParallelism);
        Assertions.assertThat((int)dest.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
        Assertions.assertThat((Comparable)dest.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.ALWAYS);
        Assertions.assertThat((String)dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP);
    }

    StreamGraph buildGraph(Sink<Integer> sink, RuntimeExecutionMode runtimeExecutionMode) {
        return this.buildGraph(sink, runtimeExecutionMode, true);
    }

    StreamGraph buildGraph(Sink<Integer> sink, RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)runtimeExecutionMode);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        DataStreamSource src = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSink<Integer> dataStreamSink = this.sinkTo((DataStream<Integer>)src.rebalance(), sink);
        this.setSinkProperty(dataStreamSink, setSinkParallelism);
        env.getExecutionPlan();
        return env.getStreamGraph();
    }

    private void setSinkProperty(DataStreamSink<Integer> dataStreamSink, boolean setSinkParallelism) {
        dataStreamSink.name(NAME);
        dataStreamSink.uid(UID);
        if (setSinkParallelism) {
            dataStreamSink.setParallelism(2);
        }
        dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
    }

    StreamNode findNodeName(StreamGraph streamGraph, Predicate<String> predicate) {
        return streamGraph.getStreamNodes().stream().filter(node -> predicate.test(node.getOperatorName())).findFirst().orElseThrow(() -> new IllegalStateException("Can not find the node"));
    }

    StreamNode findWriter(StreamGraph streamGraph) {
        return this.findNodeName(streamGraph, name -> name.contains("Writer") && !name.contains("Committer"));
    }

    StreamNode findCommitter(StreamGraph streamGraph) {
        return this.findNodeName(streamGraph, name -> name.contains("Committer") && !name.contains("Global Committer"));
    }

    StreamNode findGlobalCommitter(StreamGraph streamGraph) {
        return this.findNodeName(streamGraph, name -> name.contains("Global Committer"));
    }
}

