/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DataStreamTest {
    DataStreamTest() {
    }

    @Test
    void testErgonomicWatermarkStrategy() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new String[]{"bonjour"});
        input.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofMillis(10L)));
        input.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofMillis(10L)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(event, timestamp) -> 42L));
    }

    @Test
    void testUnion() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        SingleOutputStreamOperator input1 = env.fromSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator selfUnion = input1.union(new DataStream[]{input1}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator input6 = env.fromSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator selfUnionDifferentPartition = input6.broadcast().union(new DataStream[]{input6}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        SingleOutputStreamOperator input2 = env.fromSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator input3 = env.fromSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(2);
        SingleOutputStreamOperator unionDifferingParallelism = input2.union(new DataStream[]{input3}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator input4 = env.fromSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(2);
        SingleOutputStreamOperator input5 = env.fromSequence(0L, 0L).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        SingleOutputStreamOperator unionDifferingPartitioning = input4.broadcast().union(new DataStream[]{input5}).map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).setParallelism(4);
        StreamGraph streamGraph = DataStreamTest.getStreamGraph(env);
        Assertions.assertThat((List)streamGraph.getStreamNode(Integer.valueOf(selfUnion.getId())).getInEdges()).hasSize(2);
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(selfUnion.getId())).getInEdges()) {
            Assertions.assertThat((Object)edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
        }
        Assertions.assertThat((List)streamGraph.getStreamNode(Integer.valueOf(selfUnionDifferentPartition.getId())).getInEdges()).hasSize(2);
        boolean hasForward = false;
        boolean hasBroadcast = false;
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(selfUnionDifferentPartition.getId())).getInEdges()) {
            if (edge.getPartitioner() instanceof ForwardPartitioner) {
                hasForward = true;
            }
            if (!(edge.getPartitioner() instanceof BroadcastPartitioner)) continue;
            hasBroadcast = true;
        }
        Assertions.assertThat((hasForward && hasBroadcast ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((List)streamGraph.getStreamNode(Integer.valueOf(unionDifferingParallelism.getId())).getInEdges()).hasSize(2);
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(unionDifferingParallelism.getId())).getInEdges()) {
            if (edge.getSourceId() == input2.getId()) {
                Assertions.assertThat((Object)edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
                continue;
            }
            if (edge.getSourceId() == input3.getId()) {
                Assertions.assertThat((Object)edge.getPartitioner()).isInstanceOf(RebalancePartitioner.class);
                continue;
            }
            Assertions.fail((String)"Wrong input edge.");
        }
        Assertions.assertThat((List)streamGraph.getStreamNode(Integer.valueOf(unionDifferingPartitioning.getId())).getInEdges()).hasSize(2);
        for (StreamEdge edge : streamGraph.getStreamNode(Integer.valueOf(unionDifferingPartitioning.getId())).getInEdges()) {
            if (edge.getSourceId() == input4.getId()) {
                Assertions.assertThat((Object)edge.getPartitioner()).isInstanceOf(BroadcastPartitioner.class);
                continue;
            }
            if (edge.getSourceId() == input5.getId()) {
                Assertions.assertThat((Object)edge.getPartitioner()).isInstanceOf(ForwardPartitioner.class);
                continue;
            }
            Assertions.fail((String)"Wrong input edge.");
        }
    }

    @Test
    void testNaming() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator dataStream1 = env.fromSequence(0L, 0L).name("testSource1").map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).name("testMap");
        SingleOutputStreamOperator dataStream2 = env.fromSequence(0L, 0L).name("testSource2").map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).name("testMap");
        dataStream1.connect((DataStream)dataStream2).flatMap((CoFlatMapFunction)new CoFlatMapFunction<Long, Long, Long>(){

            public void flatMap1(Long value, Collector<Long> out) throws Exception {
            }

            public void flatMap2(Long value, Collector<Long> out) throws Exception {
            }
        }).name("testCoFlatMap").windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public Long reduce(Long value1, Long value2) throws Exception {
                return null;
            }
        }).name("testWindowReduce").print();
        String plan = env.getExecutionPlan();
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"testSource1"});
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"testSource2"});
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"testMap"});
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"testMap"});
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"testCoFlatMap"});
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"testWindowReduce"});
    }

    @Test
    void testPartitioning() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src1 = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        DataStreamSource src2 = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        ConnectedStreams connected = src1.connect((DataStream)src2);
        KeyedStream group1 = src1.keyBy((KeySelector & Serializable)x -> (Long)x.f0);
        KeyedStream group2 = src1.keyBy((KeySelector & Serializable)x -> Tuple2.of((Object)((Long)x.f1), (Object)((Long)x.f0)), Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.LONG, Types.LONG}));
        KeyedStream group3 = src1.keyBy((KeySelector)new FirstSelector());
        int id1 = DataStreamTest.createDownStreamId(group1);
        int id2 = DataStreamTest.createDownStreamId(group2);
        int id3 = DataStreamTest.createDownStreamId(group3);
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), id1))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), id2))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), id3))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(group1)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(group2)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(group3)).isTrue();
        KeyedStream partition1 = src1.keyBy((KeySelector & Serializable)x -> (Long)x.f0);
        KeyedStream partition2 = src1.keyBy((KeySelector & Serializable)x -> Tuple2.of((Object)((Long)x.f1), (Object)((Long)x.f0)), Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.LONG, Types.LONG}));
        KeyedStream partition3 = src1.keyBy((KeySelector)new FirstSelector());
        int pid1 = DataStreamTest.createDownStreamId(partition1);
        int pid2 = DataStreamTest.createDownStreamId(partition2);
        int pid3 = DataStreamTest.createDownStreamId(partition3);
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), pid1))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), pid2))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), pid3))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(partition1)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(partition2)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(partition3)).isTrue();
        Partitioner<Long> longPartitioner = new Partitioner<Long>(){

            public int partition(Long key, int numPartitions) {
                return 100;
            }
        };
        DataStream customPartition1 = src1.partitionCustom((Partitioner)longPartitioner, (KeySelector & Serializable)x -> (Long)x.f0);
        DataStream customPartition3 = src1.partitionCustom((Partitioner)longPartitioner, (KeySelector & Serializable)x -> (Long)x.f0);
        DataStream customPartition4 = src1.partitionCustom((Partitioner)longPartitioner, (KeySelector)new FirstSelector());
        int cid1 = DataStreamTest.createDownStreamId(customPartition1);
        int cid2 = DataStreamTest.createDownStreamId(customPartition3);
        int cid3 = DataStreamTest.createDownStreamId(customPartition4);
        Assertions.assertThat((boolean)DataStreamTest.isCustomPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), cid1))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isCustomPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), cid2))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isCustomPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), cid3))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(customPartition1)).isFalse();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(customPartition3)).isFalse();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(customPartition4)).isFalse();
        ConnectedStreams connectedGroup1 = connected.keyBy(0, 0);
        Integer downStreamId1 = DataStreamTest.createDownStreamId(connectedGroup1);
        ConnectedStreams connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
        Integer downStreamId2 = DataStreamTest.createDownStreamId(connectedGroup2);
        ConnectedStreams connectedGroup3 = connected.keyBy("f0", "f0");
        Integer downStreamId3 = DataStreamTest.createDownStreamId(connectedGroup3);
        ConnectedStreams connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
        Integer downStreamId4 = DataStreamTest.createDownStreamId(connectedGroup4);
        ConnectedStreams connectedGroup5 = connected.keyBy((KeySelector)new FirstSelector(), (KeySelector)new FirstSelector());
        Integer downStreamId5 = DataStreamTest.createDownStreamId(connectedGroup5);
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), downStreamId1.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), downStreamId1.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), downStreamId2.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), downStreamId2.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), downStreamId3.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), downStreamId3.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), downStreamId4.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), downStreamId4.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), downStreamId5.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), downStreamId5.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedGroup1)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedGroup2)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedGroup3)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedGroup4)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedGroup5)).isTrue();
        ConnectedStreams connectedPartition1 = connected.keyBy(0, 0);
        Integer connectDownStreamId1 = DataStreamTest.createDownStreamId(connectedPartition1);
        ConnectedStreams connectedPartition2 = connected.keyBy(new int[]{0}, new int[]{0});
        Integer connectDownStreamId2 = DataStreamTest.createDownStreamId(connectedPartition2);
        ConnectedStreams connectedPartition3 = connected.keyBy("f0", "f0");
        Integer connectDownStreamId3 = DataStreamTest.createDownStreamId(connectedPartition3);
        ConnectedStreams connectedPartition4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
        Integer connectDownStreamId4 = DataStreamTest.createDownStreamId(connectedPartition4);
        ConnectedStreams connectedPartition5 = connected.keyBy((KeySelector)new FirstSelector(), (KeySelector)new FirstSelector());
        Integer connectDownStreamId5 = DataStreamTest.createDownStreamId(connectedPartition5);
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), connectDownStreamId1.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), connectDownStreamId1.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), connectDownStreamId2.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), connectDownStreamId2.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), connectDownStreamId3.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), connectDownStreamId3.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), connectDownStreamId4.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), connectDownStreamId4.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src1.getId(), connectDownStreamId5.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isPartitioned(DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(src2.getId(), connectDownStreamId5.intValue()))).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedPartition1)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedPartition2)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedPartition3)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedPartition4)).isTrue();
        Assertions.assertThat((boolean)DataStreamTest.isKeyed(connectedPartition5)).isTrue();
    }

    @Test
    void testParallelism() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)0L, (Object)0L)});
        env.setParallelism(10);
        SingleOutputStreamOperator map = src.map((MapFunction)new MapFunction<Tuple2<Long, Long>, Long>(){

            public Long map(Tuple2<Long, Long> value) throws Exception {
                return null;
            }
        }).name("MyMap");
        SingleOutputStreamOperator windowed = map.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceFunction<Long>(){

            public Long reduce(Long value1, Long value2) throws Exception {
                return null;
            }
        });
        windowed.sinkTo((Sink)new DiscardingSink());
        DataStreamSink sink = map.addSink((SinkFunction)new SinkFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public void invoke(Long value) throws Exception {
            }
        });
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(src.getId())).getParallelism()).isOne();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map.getId())).getParallelism()).isEqualTo(10);
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(windowed.getId())).getParallelism()).isOne();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getParallelism()).isEqualTo(10);
        env.setParallelism(7);
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(src.getId())).getParallelism()).isOne();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map.getId())).getParallelism()).isEqualTo(10);
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(windowed.getId())).getParallelism()).isOne();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getParallelism()).isEqualTo(10);
        DataStreamSource parallelSource = env.fromSequence(0L, 0L);
        parallelSource.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(parallelSource.getId())).getParallelism()).isEqualTo(7);
        parallelSource.setParallelism(3);
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(parallelSource.getId())).getParallelism()).isEqualTo(3);
        map.setParallelism(2);
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map.getId())).getParallelism()).isEqualTo(2);
        sink.setParallelism(4);
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getParallelism()).isEqualTo(4);
    }

    @Test
    void testResources() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ResourceSpec minResource1 = ResourceSpec.newBuilder((double)1.0, (int)100).build();
        ResourceSpec preferredResource1 = ResourceSpec.newBuilder((double)2.0, (int)200).build();
        ResourceSpec minResource2 = ResourceSpec.newBuilder((double)1.0, (int)200).build();
        ResourceSpec preferredResource2 = ResourceSpec.newBuilder((double)2.0, (int)300).build();
        ResourceSpec minResource3 = ResourceSpec.newBuilder((double)1.0, (int)300).build();
        ResourceSpec preferredResource3 = ResourceSpec.newBuilder((double)2.0, (int)400).build();
        ResourceSpec minResource4 = ResourceSpec.newBuilder((double)1.0, (int)400).build();
        ResourceSpec preferredResource4 = ResourceSpec.newBuilder((double)2.0, (int)500).build();
        ResourceSpec minResource5 = ResourceSpec.newBuilder((double)1.0, (int)500).build();
        ResourceSpec preferredResource5 = ResourceSpec.newBuilder((double)2.0, (int)600).build();
        ResourceSpec minResource6 = ResourceSpec.newBuilder((double)1.0, (int)600).build();
        ResourceSpec preferredResource6 = ResourceSpec.newBuilder((double)2.0, (int)700).build();
        ResourceSpec minResource7 = ResourceSpec.newBuilder((double)1.0, (int)700).build();
        ResourceSpec preferredResource7 = ResourceSpec.newBuilder((double)2.0, (int)800).build();
        Method opMethod = SingleOutputStreamOperator.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
        opMethod.setAccessible(true);
        Method sinkMethod = DataStreamSink.class.getDeclaredMethod("setResources", ResourceSpec.class, ResourceSpec.class);
        sinkMethod.setAccessible(true);
        DataStreamSource source1 = env.fromSequence(0L, 0L);
        opMethod.invoke((Object)source1, minResource1, preferredResource1);
        SingleOutputStreamOperator map1 = source1.map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        opMethod.invoke((Object)map1, minResource2, preferredResource2);
        DataStreamSource source2 = env.fromSequence(0L, 0L);
        opMethod.invoke((Object)source2, minResource3, preferredResource3);
        SingleOutputStreamOperator map2 = source2.map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        });
        opMethod.invoke((Object)map2, minResource4, preferredResource4);
        SingleOutputStreamOperator connected = map1.connect((DataStream)map2).flatMap((CoFlatMapFunction)new CoFlatMapFunction<Long, Long, Long>(){

            public void flatMap1(Long value, Collector<Long> out) throws Exception {
            }

            public void flatMap2(Long value, Collector<Long> out) throws Exception {
            }
        });
        opMethod.invoke((Object)connected, minResource5, preferredResource5);
        SingleOutputStreamOperator windowed = connected.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public Long reduce(Long value1, Long value2) throws Exception {
                return null;
            }
        });
        opMethod.invoke((Object)windowed, minResource6, preferredResource6);
        DataStreamSink sink = windowed.print();
        sinkMethod.invoke((Object)sink, minResource7, preferredResource7);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(source1.getId())).getMinResources()).isEqualTo((Object)minResource1);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(source1.getId())).getPreferredResources()).isEqualTo((Object)preferredResource1);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map1.getId())).getMinResources()).isEqualTo((Object)minResource2);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map1.getId())).getPreferredResources()).isEqualTo((Object)preferredResource2);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(source2.getId())).getMinResources()).isEqualTo((Object)minResource3);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(source2.getId())).getPreferredResources()).isEqualTo((Object)preferredResource3);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map2.getId())).getMinResources()).isEqualTo((Object)minResource4);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(map2.getId())).getPreferredResources()).isEqualTo((Object)preferredResource4);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(connected.getId())).getMinResources()).isEqualTo((Object)minResource5);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(connected.getId())).getPreferredResources()).isEqualTo((Object)preferredResource5);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(windowed.getId())).getMinResources()).isEqualTo((Object)minResource6);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(windowed.getId())).getPreferredResources()).isEqualTo((Object)preferredResource6);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getMinResources()).isEqualTo((Object)minResource7);
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getPreferredResources()).isEqualTo((Object)preferredResource7);
    }

    @Test
    void testTypeInfo() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src1 = env.fromSequence(0L, 0L);
        Assertions.assertThat((Object)src1.getType()).isEqualTo((Object)TypeExtractor.getForClass(Long.class));
        SingleOutputStreamOperator map = src1.map((MapFunction)new MapFunction<Long, Tuple2<Integer, String>>(){

            public Tuple2<Integer, String> map(Long value) throws Exception {
                return null;
            }
        });
        Assertions.assertThat((Object)map.getType()).isEqualTo((Object)TypeExtractor.getForObject((Object)new Tuple2((Object)0, (Object)"")));
        SingleOutputStreamOperator window = map.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)5L))).apply((AllWindowFunction)new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>(){

            public void apply(GlobalWindow window, Iterable<Tuple2<Integer, String>> values, Collector<String> out) throws Exception {
            }
        });
        Assertions.assertThat((Object)window.getType()).isEqualTo((Object)TypeExtractor.getForClass(String.class));
        SingleOutputStreamOperator flatten = window.windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)5L))).aggregate((AggregateFunction)new AggregateFunction<String, CustomPOJO, CustomPOJO>(){
            private static final long serialVersionUID = 1L;

            public CustomPOJO createAccumulator() {
                return null;
            }

            public CustomPOJO add(String value, CustomPOJO accumulator) {
                return null;
            }

            public CustomPOJO getResult(CustomPOJO accumulator) {
                return null;
            }

            public CustomPOJO merge(CustomPOJO a, CustomPOJO b) {
                return null;
            }
        });
        Assertions.assertThat((Object)flatten.getType()).isEqualTo((Object)TypeExtractor.getForClass(CustomPOJO.class));
    }

    @Test
    void testKeyedStreamKeyedProcessTranslation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromSequence(0L, 0L);
        KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = new KeyedProcessFunction<Long, Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void processElement(Long value, KeyedProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            }

            public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator processed = src.keyBy(new IdentityKeySelector()).process((KeyedProcessFunction)keyedProcessFunction);
        processed.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)DataStreamTest.getFunctionForDataStream(processed)).isEqualTo((Object)keyedProcessFunction);
        Assertions.assertThat(DataStreamTest.getOperatorForDataStream(processed)).isInstanceOf(KeyedProcessOperator.class);
    }

    @Test
    void testAsyncKeyedStreamKeyedProcessTranslation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromSequence(0L, 0L);
        KeyedProcessFunction<Long, Long, Integer> keyedProcessFunction = new KeyedProcessFunction<Long, Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void processElement(Long value, KeyedProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            }

            public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator processed = src.keyBy(new IdentityKeySelector()).enableAsyncState().process((KeyedProcessFunction)keyedProcessFunction);
        processed.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)((AbstractAsyncStateUdfStreamOperator)DataStreamTest.getOperatorForDataStream(processed)).getUserFunction()).isEqualTo((Object)keyedProcessFunction);
        Assertions.assertThat(DataStreamTest.getOperatorForDataStream(processed)).isInstanceOf(AsyncKeyedProcessOperator.class);
    }

    @Test
    void testProcessTranslation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromSequence(0L, 0L);
        ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void processElement(Long value, ProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            }

            public void onTimer(long timestamp, ProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator processed = src.process((ProcessFunction)processFunction);
        processed.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)DataStreamTest.getFunctionForDataStream(processed)).isEqualTo((Object)processFunction);
        Assertions.assertThat(DataStreamTest.getOperatorForDataStream(processed)).isInstanceOf(ProcessOperator.class);
    }

    @Test
    void testFailedTranslationOnKeyed() {
        MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KeyedStream srcOne = env.fromSequence(0L, 5L).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<Long>(){

            public long extractTimestamp(Long element, long previousElementTimestamp) {
                return element;
            }
        }).keyBy((KeySelector & Serializable)value -> value);
        SingleOutputStreamOperator srcTwo = env.fromData((Object[])new String[]{"Test:0", "Test:1", "Test:2", "Test:3", "Test:4", "Test:5"}).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<String>(){

            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(":")[1]);
            }
        });
        BroadcastStream broadcast = srcTwo.broadcast(new MapStateDescriptor[]{descriptor});
        BroadcastConnectedStream bcStream = srcOne.connect(broadcast);
        Assertions.assertThatThrownBy(() -> bcStream.process((BroadcastProcessFunction)new BroadcastProcessFunction<Long, String, String>(){

            public void processBroadcastElement(String value, BroadcastProcessFunction.Context ctx, Collector<String> out) {
            }

            public void processElement(Long value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) {
            }
        })).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testFailedTranslationOnNonKeyed() {
        MapStateDescriptor descriptor = new MapStateDescriptor("broadcast", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator srcOne = env.fromSequence(0L, 5L).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<Long>(){

            public long extractTimestamp(Long element, long previousElementTimestamp) {
                return element;
            }
        });
        SingleOutputStreamOperator srcTwo = env.fromData((Object[])new String[]{"Test:0", "Test:1", "Test:2", "Test:3", "Test:4", "Test:5"}).assignTimestampsAndWatermarks((WatermarkStrategy)new CustomWmEmitter<String>(){

            public long extractTimestamp(String element, long previousElementTimestamp) {
                return Long.parseLong(element.split(":")[1]);
            }
        });
        BroadcastStream broadcast = srcTwo.broadcast(new MapStateDescriptor[]{descriptor});
        BroadcastConnectedStream bcStream = srcOne.connect(broadcast);
        Assertions.assertThatThrownBy(() -> bcStream.process((KeyedBroadcastProcessFunction)new KeyedBroadcastProcessFunction<String, Long, String, String>(){

            public void processBroadcastElement(String value, KeyedBroadcastProcessFunction.Context ctx, Collector<String> out) {
            }

            public void processElement(Long value, KeyedBroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) {
            }
        })).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testWindowOperatorDescription() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator dataStream1 = env.fromSequence(0L, 0L).windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public Long reduce(Long value1, Long value2) throws Exception {
                return null;
            }
        });
        Assertions.assertThat((String)dataStream1.getTransformation().getName()).isEqualTo("GlobalWindows");
        Assertions.assertThat((String)dataStream1.getTransformation().getDescription()).contains(new CharSequence[]{"PurgingTrigger"});
        SingleOutputStreamOperator dataStream2 = env.fromSequence(0L, 0L).keyBy((KeySelector & Serializable)value -> value).window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofMillis(1000L))).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public Long reduce(Long value1, Long value2) throws Exception {
                return null;
            }
        });
        Assertions.assertThat((String)dataStream2.getTransformation().getName()).isEqualTo("TumblingEventTimeWindows");
        Assertions.assertThat((String)dataStream2.getTransformation().getDescription()).contains(new CharSequence[]{"PurgingTrigger"});
    }

    @Test
    void testUserDefinedDescription() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator dataStream1 = env.fromSequence(0L, 0L).name("testSource1").setDescription("this is test source 1").map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).name("testMap").setDescription("this is test map 1");
        SingleOutputStreamOperator dataStream2 = env.fromSequence(0L, 0L).name("testSource2").setDescription("this is test source 2").map((MapFunction)new MapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                return null;
            }
        }).name("testMap").setDescription("this is test map 2");
        dataStream1.connect((DataStream)dataStream2).flatMap((CoFlatMapFunction)new CoFlatMapFunction<Long, Long, Long>(){

            public void flatMap1(Long value, Collector<Long> out) throws Exception {
            }

            public void flatMap2(Long value, Collector<Long> out) throws Exception {
            }
        }).name("testCoFlatMap").setDescription("this is test co flat map").windowAll((WindowAssigner)GlobalWindows.create()).trigger((Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)10L))).reduce((ReduceFunction)new ReduceFunction<Long>(){
            private static final long serialVersionUID = 1L;

            public Long reduce(Long value1, Long value2) throws Exception {
                return null;
            }
        }).name("testWindowReduce").setDescription("this is test window reduce").print();
        String plan = env.getExecutionPlan();
        Assertions.assertThat((String)plan).contains(new CharSequence[]{"this is test source 1", "this is test map 1", "this is test map 2", "this is test co flat map", "this is test window reduce"});
    }

    @Test
    void operatorTest() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromSequence(0L, 0L);
        MapFunction<Long, Integer> mapFunction = new MapFunction<Long, Integer>(){

            public Integer map(Long value) throws Exception {
                return null;
            }
        };
        SingleOutputStreamOperator map = src.map((MapFunction)mapFunction);
        map.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)DataStreamTest.getFunctionForDataStream(map)).isEqualTo((Object)mapFunction);
        FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>(){
            private static final long serialVersionUID = 1L;

            public void flatMap(Long value, Collector<Integer> out) throws Exception {
            }
        };
        SingleOutputStreamOperator flatMap = src.flatMap((FlatMapFunction)flatMapFunction);
        flatMap.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)DataStreamTest.getFunctionForDataStream(flatMap)).isEqualTo((Object)flatMapFunction);
        FilterFunction<Integer> filterFunction = new FilterFunction<Integer>(){

            public boolean filter(Integer value) throws Exception {
                return false;
            }
        };
        SingleOutputStreamOperator unionFilter = map.union(new DataStream[]{flatMap}).filter((FilterFunction)filterFunction);
        unionFilter.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)DataStreamTest.getFunctionForDataStream(unionFilter)).isEqualTo((Object)filterFunction);
        DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(map.getId(), unionFilter.getId());
        DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(flatMap.getId(), unionFilter.getId());
        ConnectedStreams connect = map.connect((DataStream)flatMap);
        CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>(){
            private static final long serialVersionUID = 1L;

            public String map1(Integer value) {
                return null;
            }

            public String map2(Integer value) {
                return null;
            }
        };
        SingleOutputStreamOperator coMap = connect.map((CoMapFunction)coMapper);
        coMap.sinkTo((Sink)new DiscardingSink());
        Assertions.assertThat((Object)DataStreamTest.getFunctionForDataStream(coMap)).isEqualTo((Object)coMapper);
        DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(map.getId(), coMap.getId());
        DataStreamTest.getStreamGraph(env).getStreamEdgesOrThrow(flatMap.getId(), coMap.getId());
    }

    @Test
    void testKeyedConnectedStreamsType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource stream1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource stream2 = env.fromData((Object[])new Integer[]{1, 2});
        ConnectedStreams connectedStreams = stream1.connect((DataStream)stream2).keyBy((KeySelector & Serializable)v -> v, (KeySelector & Serializable)v -> v);
        KeyedStream firstKeyedInput = (KeyedStream)connectedStreams.getFirstInput();
        KeyedStream secondKeyedInput = (KeyedStream)connectedStreams.getSecondInput();
        Assertions.assertThat((Object)firstKeyedInput.getKeyType()).isEqualTo((Object)Types.INT);
        Assertions.assertThat((Object)secondKeyedInput.getKeyType()).isEqualTo((Object)Types.INT);
    }

    @Test
    void sinkKeyTest() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink sink = env.fromSequence(1L, 100L).print();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getStatePartitioners().length).isZero();
        Assertions.assertThat((Object)((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink.getTransformation().getId())).getInEdges().get(0)).getPartitioner()).isInstanceOf(ForwardPartitioner.class);
        KeySelector<Long, Long> key1 = new KeySelector<Long, Long>(){
            private static final long serialVersionUID = 1L;

            public Long getKey(Long value) throws Exception {
                return 0L;
            }
        };
        DataStreamSink sink2 = env.fromSequence(1L, 100L).keyBy((KeySelector)key1).print();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStatePartitioners().length).isOne();
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStateKeySerializer()).isNotNull();
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStateKeySerializer()).isNotNull();
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getStatePartitioners()[0]).isEqualTo((Object)key1);
        Assertions.assertThat((Object)((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink2.getTransformation().getId())).getInEdges().get(0)).getPartitioner()).isInstanceOf(KeyGroupStreamPartitioner.class);
        KeySelector<Long, Long> key2 = new KeySelector<Long, Long>(){
            private static final long serialVersionUID = 1L;

            public Long getKey(Long value) throws Exception {
                return 0L;
            }
        };
        DataStreamSink sink3 = env.fromSequence(1L, 100L).keyBy((KeySelector)key2).print();
        Assertions.assertThat((int)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink3.getTransformation().getId())).getStatePartitioners().length).isOne();
        Assertions.assertThat((Object)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink3.getTransformation().getId())).getStatePartitioners()[0]).isEqualTo((Object)key2);
        Assertions.assertThat((Object)((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamNode(Integer.valueOf(sink3.getTransformation().getId())).getInEdges().get(0)).getPartitioner()).isInstanceOf(KeyGroupStreamPartitioner.class);
    }

    @Test
    void testChannelSelectors() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromSequence(0L, 0L);
        DataStream broadcast = src.broadcast();
        DataStreamSink broadcastSink = broadcast.print();
        StreamPartitioner broadcastPartitioner = ((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamEdges(src.getId(), broadcastSink.getTransformation().getId()).get(0)).getPartitioner();
        Assertions.assertThat((Object)broadcastPartitioner).isInstanceOf(BroadcastPartitioner.class);
        DataStream shuffle = src.shuffle();
        DataStreamSink shuffleSink = shuffle.print();
        StreamPartitioner shufflePartitioner = ((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamEdges(src.getId(), shuffleSink.getTransformation().getId()).get(0)).getPartitioner();
        Assertions.assertThat((Object)shufflePartitioner).isInstanceOf(ShufflePartitioner.class);
        DataStream forward = src.forward();
        DataStreamSink forwardSink = forward.print();
        StreamPartitioner forwardPartitioner = ((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamEdges(src.getId(), forwardSink.getTransformation().getId()).get(0)).getPartitioner();
        Assertions.assertThat((Object)forwardPartitioner).isInstanceOf(ForwardPartitioner.class);
        DataStream rebalance = src.rebalance();
        DataStreamSink rebalanceSink = rebalance.print();
        StreamPartitioner rebalancePartitioner = ((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamEdges(src.getId(), rebalanceSink.getTransformation().getId()).get(0)).getPartitioner();
        Assertions.assertThat((Object)rebalancePartitioner).isInstanceOf(RebalancePartitioner.class);
        DataStream global = src.global();
        DataStreamSink globalSink = global.print();
        StreamPartitioner globalPartitioner = ((StreamEdge)DataStreamTest.getStreamGraph(env).getStreamEdges(src.getId(), globalSink.getTransformation().getId()).get(0)).getPartitioner();
        Assertions.assertThat((Object)globalPartitioner).isInstanceOf(GlobalPartitioner.class);
    }

    @Test
    void testPrimitiveArrayKeyRejection() {
        KeySelector<Tuple2<Integer[], String>, int[]> keySelector = new KeySelector<Tuple2<Integer[], String>, int[]>(){

            public int[] getKey(Tuple2<Integer[], String> value) throws Exception {
                int[] ks = new int[((Integer[])value.f0).length];
                for (int i = 0; i < ks.length; ++i) {
                    ks[i] = ((Integer[])value.f0)[i];
                }
                return ks;
            }
        };
        this.assertArrayKeyRejection((KeySelector)keySelector, (TypeInformation)PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
    }

    @Test
    void testBasicArrayKeyRejection() {
        KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector = new KeySelector<Tuple2<Integer[], String>, Integer[]>(){

            public Integer[] getKey(Tuple2<Integer[], String> value) throws Exception {
                return (Integer[])value.f0;
            }
        };
        this.assertArrayKeyRejection((KeySelector)keySelector, (TypeInformation)BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
    }

    @Test
    void testObjectArrayKeyRejection() {
        KeySelector<Tuple2<Integer[], String>, Object[]> keySelector = new KeySelector<Tuple2<Integer[], String>, Object[]>(){

            public Object[] getKey(Tuple2<Integer[], String> value) throws Exception {
                Object[] ks = new Object[((Integer[])value.f0).length];
                for (int i = 0; i < ks.length; ++i) {
                    ks[i] = new Object();
                }
                return ks;
            }
        };
        ObjectArrayTypeInfo keyTypeInfo = ObjectArrayTypeInfo.getInfoFor(Object[].class, (TypeInformation)new GenericTypeInfo(Object.class));
        this.assertArrayKeyRejection((KeySelector)keySelector, (TypeInformation)keyTypeInfo);
    }

    private <K> void assertArrayKeyRejection(KeySelector<Tuple2<Integer[], String>, K> keySelector, TypeInformation<K> expectedKeyType) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)new Integer[]{1, 2}, (Object)"barfoo")});
        Assertions.assertThat((Object)TypeExtractor.getKeySelectorTypes(keySelector, (TypeInformation)input.getType())).isEqualTo(expectedKeyType);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> DataStreamTest.lambda$assertArrayKeyRejection$2((DataStream)input, keySelector)).isInstanceOf(InvalidProgramException.class)).hasMessageStartingWith("Type " + expectedKeyType + " cannot be used as key.");
    }

    @Test
    void testEnumKeyRejection() {
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> (TestEnum)((Object)((Object)value.f0));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new Tuple2[]{Tuple2.of((Object)((Object)TestEnum.FOO), (Object)"Foo"), Tuple2.of((Object)((Object)TestEnum.BAR), (Object)"Bar")});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> DataStreamTest.lambda$testEnumKeyRejection$3((DataStream)input, keySelector)).isInstanceOf(InvalidProgramException.class)).hasMessageStartingWith("Type " + EnumTypeInfo.of(TestEnum.class) + " cannot be used as key.");
    }

    @Test
    void testPOJOWithNestedArrayNoHashCodeKeyRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new POJOWithHashCode[]{new POJOWithHashCode(new int[]{1, 2})});
        PrimitiveArrayTypeInfo expectedTypeInfo = PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO;
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> DataStreamTest.lambda$testPOJOWithNestedArrayNoHashCodeKeyRejection$4((DataStream)input)).isInstanceOf(InvalidProgramException.class)).hasMessageStartingWith("Type " + (TypeInformation)expectedTypeInfo + " cannot be used as key.");
    }

    @Test
    void testPOJOWithNestedArrayAndHashCodeWorkAround() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new POJOWithHashCode[]{new POJOWithHashCode(new int[]{1, 2})});
        input.keyBy((KeySelector)new KeySelector<POJOWithHashCode, POJOWithHashCode>(){

            public POJOWithHashCode getKey(POJOWithHashCode value) throws Exception {
                return value;
            }
        }).addSink((SinkFunction)new SinkFunction<POJOWithHashCode>(){

            public void invoke(POJOWithHashCode value) {
                Assertions.assertThat((int[])value.getId()).containsExactly(new int[]{1, 2});
            }
        });
    }

    @Test
    void testPOJOnoHashCodeKeyRejection() {
        KeySelector<POJOWithoutHashCode, POJOWithoutHashCode> keySelector = new KeySelector<POJOWithoutHashCode, POJOWithoutHashCode>(){

            public POJOWithoutHashCode getKey(POJOWithoutHashCode value) throws Exception {
                return value;
            }
        };
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new POJOWithoutHashCode[]{new POJOWithoutHashCode(new int[]{1, 2})});
        Assertions.assertThatThrownBy(() -> DataStreamTest.lambda$testPOJOnoHashCodeKeyRejection$5((DataStream)input, (KeySelector)keySelector)).isInstanceOf(InvalidProgramException.class);
    }

    @Test
    void testTupleNestedArrayKeyRejection() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)new Integer[]{1, 2}, (Object)"test-test")});
        TupleTypeInfo expectedTypeInfo = new TupleTypeInfo(new TypeInformation[]{BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO});
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.lambda$testTupleNestedArrayKeyRejection$6((DataStream)input)).isInstanceOf(InvalidProgramException.class)).hasMessageStartingWith("Type " + (TypeInformation)expectedTypeInfo + " cannot be used as key.");
    }

    @Test
    void testPrimitiveKeyAcceptance() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setMaxParallelism(1);
        DataStreamSource input = env.fromData((Object[])new Integer[]{new Integer(10000)});
        KeyedStream keyedStream = input.keyBy((KeySelector)new KeySelector<Integer, Object>(){

            public Object getKey(Integer value) throws Exception {
                return value;
            }
        });
        keyedStream.addSink((SinkFunction)new SinkFunction<Integer>(){

            public void invoke(Integer value) throws Exception {
                Assertions.assertThat((Integer)value).isEqualTo(10000);
            }
        });
    }

    private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
        StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
        StreamGraph streamGraph = DataStreamTest.getStreamGraph(env);
        return streamGraph.getStreamNode(Integer.valueOf(dataStream.getId())).getOperator();
    }

    private static Function getFunctionForDataStream(DataStream<?> dataStream) {
        AbstractUdfStreamOperator operator = (AbstractUdfStreamOperator)DataStreamTest.getOperatorForDataStream(dataStream);
        return operator.getUserFunction();
    }

    private static StreamGraph getStreamGraph(StreamExecutionEnvironment sEnv) {
        return sEnv.getStreamGraph(false);
    }

    private static Integer createDownStreamId(DataStream<?> dataStream) {
        return dataStream.print().getTransformation().getId();
    }

    private static boolean isKeyed(DataStream<?> dataStream) {
        return dataStream instanceof KeyedStream;
    }

    private static Integer createDownStreamId(ConnectedStreams dataStream) {
        SingleOutputStreamOperator coMap = dataStream.map((CoMapFunction)new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>(){
            private static final long serialVersionUID = 1L;

            public Object map1(Tuple2<Long, Long> value) {
                return null;
            }

            public Object map2(Tuple2<Long, Long> value) {
                return null;
            }
        });
        coMap.sinkTo((Sink)new DiscardingSink());
        return coMap.getId();
    }

    private static boolean isKeyed(ConnectedStreams<?, ?> dataStream) {
        return dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream;
    }

    private static boolean isPartitioned(List<StreamEdge> edges) {
        boolean result = true;
        for (StreamEdge edge : edges) {
            if (edge.getPartitioner() instanceof KeyGroupStreamPartitioner) continue;
            result = false;
        }
        return result;
    }

    private static boolean isCustomPartitioned(List<StreamEdge> edges) {
        boolean result = true;
        for (StreamEdge edge : edges) {
            if (edge.getPartitioner() instanceof CustomPartitionerWrapper) continue;
            result = false;
        }
        return result;
    }

    private /* synthetic */ void lambda$testTupleNestedArrayKeyRejection$6(DataStream input) throws Throwable {
        input.keyBy((KeySelector)new KeySelector<Tuple2<Integer[], String>, Tuple2<Integer[], String>>(){

            public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception {
                return value;
            }
        });
    }

    private static /* synthetic */ void lambda$testPOJOnoHashCodeKeyRejection$5(DataStream input, KeySelector keySelector) throws Throwable {
        input.keyBy(keySelector);
    }

    private static /* synthetic */ void lambda$testPOJOWithNestedArrayNoHashCodeKeyRejection$4(DataStream input) throws Throwable {
        input.keyBy(POJOWithoutHashCode::getId);
    }

    private static /* synthetic */ void lambda$testEnumKeyRejection$3(DataStream input, KeySelector keySelector) throws Throwable {
        input.keyBy(keySelector);
    }

    private static /* synthetic */ void lambda$assertArrayKeyRejection$2(DataStream input, KeySelector keySelector) throws Throwable {
        input.keyBy(keySelector);
    }

    private static enum TestEnum {
        FOO,
        BAR;

    }

    private static class CustomPOJO {
        private String s;
        private int i;

        public void setS(String s) {
            this.s = s;
        }

        public void setI(int i) {
            this.i = i;
        }

        public String getS() {
            return this.s;
        }

        public int getI() {
            return this.i;
        }
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class FirstSelector
    implements KeySelector<Tuple2<Long, Long>, Long> {
        private static final long serialVersionUID = 1L;

        private FirstSelector() {
        }

        public Long getKey(Tuple2<Long, Long> value) throws Exception {
            return (Long)value.f0;
        }
    }

    public static class POJOWithHashCode
    extends POJOWithoutHashCode {
        public POJOWithHashCode() {
        }

        public POJOWithHashCode(int[] id) {
            super(id);
        }

        public int hashCode() {
            int hash = 31;
            for (int i : this.getId()) {
                hash = 37 * hash + i;
            }
            return hash;
        }
    }

    public static class POJOWithoutHashCode {
        private int[] id;

        public POJOWithoutHashCode() {
        }

        public POJOWithoutHashCode(int[] id) {
            this.id = id;
        }

        public int[] getId() {
            return this.id;
        }

        public void setId(int[] id) {
            this.id = id;
        }
    }

    private static abstract class CustomWmEmitter<T>
    implements WatermarkStrategyWithPunctuatedWatermarks<T> {
        private CustomWmEmitter() {
        }

        @Nullable
        public Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }
}

