/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Stack;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncIntervalJoinOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncStreamFlatMap;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.KeyedPartitionWindowedStream;
import org.apache.flink.streaming.api.datastream.PartitionWindowedStream;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.Utils;

@Public
public class KeyedStream<T, KEY>
extends DataStream<T> {
    private final KeySelector<T, KEY> keySelector;
    private final TypeInformation<KEY> keyType;
    private boolean isEnableAsyncState = false;

    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        this(dataStream, new PartitionTransformation<T>(dataStream.getTransformation(), new KeyGroupStreamPartitioner<T, KEY>(keySelector, 128)), keySelector, keyType);
    }

    @Internal
    KeyedStream(DataStream<T> stream, PartitionTransformation<T> partitionTransformation, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
        super(stream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = this.clean(keySelector);
        this.keyType = this.validateKeyType(keyType);
    }

    private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> keyType) {
        Stack<TypeInformation<Object>> stack = new Stack<TypeInformation<Object>>();
        stack.push(keyType);
        ArrayList<TypeInformation> unsupportedTypes = new ArrayList<TypeInformation>();
        while (!stack.isEmpty()) {
            TypeInformation typeInfo = (TypeInformation)stack.pop();
            if (!this.validateKeyTypeIsHashable(typeInfo)) {
                unsupportedTypes.add(typeInfo);
            }
            if (!(typeInfo instanceof TupleTypeInfoBase)) continue;
            for (int i = 0; i < typeInfo.getArity(); ++i) {
                stack.push(((TupleTypeInfoBase)typeInfo).getTypeAt(i));
            }
        }
        if (!unsupportedTypes.isEmpty()) {
            throw new InvalidProgramException("Type " + keyType + " cannot be used as key. Contained UNSUPPORTED key types: " + StringUtils.join(unsupportedTypes, (String)", ") + ". Look at the keyBy() documentation for the conditions a type has to satisfy in order to be eligible for a key.");
        }
        return keyType;
    }

    private boolean validateKeyTypeIsHashable(TypeInformation<?> type) {
        try {
            return type instanceof PojoTypeInfo ? !type.getTypeClass().getMethod("hashCode", new Class[0]).getDeclaringClass().equals(Object.class) : !KeyedStream.isArrayType(type) && !KeyedStream.isEnumType(type);
        }
        catch (NoSuchMethodException noSuchMethodException) {
            return false;
        }
    }

    private static boolean isArrayType(TypeInformation<?> type) {
        return type instanceof PrimitiveArrayTypeInfo || type instanceof BasicArrayTypeInfo || type instanceof ObjectArrayTypeInfo;
    }

    private static boolean isEnumType(TypeInformation<?> type) {
        return type instanceof EnumTypeInfo;
    }

    @Internal
    public KeySelector<T, KEY> getKeySelector() {
        return this.keySelector;
    }

    @Internal
    public TypeInformation<KEY> getKeyType() {
        return this.keyType;
    }

    @Override
    protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
        throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
    }

    @Override
    protected <R> SingleOutputStreamOperator<R> doTransform(String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) {
        SingleOutputStreamOperator<R> returnStream = super.doTransform(operatorName, outTypeInfo, operatorFactory);
        OneInputTransformation transform = (OneInputTransformation)returnStream.getTransformation();
        transform.setStateKeySelector(this.keySelector);
        transform.setStateKeyType(this.keyType);
        if (this.isEnableAsyncState) {
            transform.enableAsyncState();
        }
        return returnStream;
    }

    @Override
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        DataStreamSink<T> result = super.addSink(sinkFunction);
        result.getLegacyTransformation().setStateKeySelector(this.keySelector);
        result.getLegacyTransformation().setStateKeyType(this.keyType);
        return result;
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
        TypeInformation outType = TypeExtractor.getUnaryOperatorReturnType(keyedProcessFunction, KeyedProcessFunction.class, 1, 2, TypeExtractor.NO_INDEX, this.getType(), Utils.getCallLocationName(), true);
        return this.process(keyedProcessFunction, outType);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction, TypeInformation<R> outputType) {
        AbstractStreamOperator operator = this.isEnableAsyncState() ? new AsyncKeyedProcessOperator<KEY, T, R>(this.clean(keyedProcessFunction)) : new KeyedProcessOperator<KEY, T, R>(this.clean(keyedProcessFunction));
        return this.transform("KeyedProcess", outputType, operator);
    }

    @Override
    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
        AbstractStreamOperator operator = this.isEnableAsyncState() ? new AsyncStreamFlatMap<T, R>(this.clean(flatMapper)) : new StreamFlatMap<T, R>(this.clean(flatMapper));
        return this.transform("Flat Map", outputType, operator);
    }

    @PublicEvolving
    public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
        return new IntervalJoin(this, otherStream);
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return this.window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
        return this.window(GlobalWindows.create()).evictor(CountEvictor.of(size)).trigger(CountTrigger.of(slide));
    }

    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream(this, assigner);
    }

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) {
        ReduceTransformation<T, KEY> reduce = new ReduceTransformation<T, KEY>("Keyed Reduce", this.environment.getParallelism(), this.transformation, this.clean(reducer), this.keySelector, this.getKeyType(), false);
        if (this.isEnableAsyncState) {
            reduce.enableAsyncState();
        }
        this.getExecutionEnvironment().addOperator(reduce);
        return new SingleOutputStreamOperator(this.getExecutionEnvironment(), reduce);
    }

    public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return this.aggregate(new SumAggregator(positionToSum, this.getType(), this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return this.aggregate(new SumAggregator(field, this.getType(), this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return this.aggregate(new ComparableAggregator(positionToMin, this.getType(), AggregationFunction.AggregationType.MIN, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationFunction.AggregationType.MIN, false, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return this.aggregate(new ComparableAggregator(positionToMax, this.getType(), AggregationFunction.AggregationType.MAX, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationFunction.AggregationType.MAX, false, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationFunction.AggregationType.MINBY, first, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return this.aggregate(new ComparableAggregator(field, this.getType(), AggregationFunction.AggregationType.MAXBY, first, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return this.aggregate(new ComparableAggregator(positionToMinBy, this.getType(), AggregationFunction.AggregationType.MINBY, first, this.getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return this.aggregate(new ComparableAggregator(positionToMaxBy, this.getType(), AggregationFunction.AggregationType.MAXBY, first, this.getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        return this.reduce(aggregate).name("Keyed Aggregation");
    }

    @Override
    @PublicEvolving
    public PartitionWindowedStream<T> fullWindowPartition() {
        return new KeyedPartitionWindowedStream(this.environment, this);
    }

    @Deprecated
    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName) {
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor(UUID.randomUUID().toString(), this.getType());
        return this.asQueryableState(queryableStateName, valueStateDescriptor);
    }

    @Deprecated
    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName, ValueStateDescriptor<T> stateDescriptor) {
        this.transform("Queryable state: " + queryableStateName, this.getType(), new QueryableValueStateOperator(queryableStateName, stateDescriptor));
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        return new QueryableStateStream(queryableStateName, stateDescriptor, this.getKeyType().createSerializer(this.getExecutionConfig().getSerializerConfig()));
    }

    @Deprecated
    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String queryableStateName, ReducingStateDescriptor<T> stateDescriptor) {
        this.transform("Queryable state: " + queryableStateName, this.getType(), new QueryableAppendingStateOperator(queryableStateName, stateDescriptor));
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        return new QueryableStateStream(queryableStateName, stateDescriptor, this.getKeyType().createSerializer(this.getExecutionConfig().getSerializerConfig()));
    }

    @Experimental
    public KeyedStream<T, KEY> enableAsyncState() {
        this.isEnableAsyncState = true;
        return this;
    }

    @Internal
    boolean isEnableAsyncState() {
        return this.isEnableAsyncState;
    }

    @PublicEvolving
    public static class IntervalJoined<IN1, IN2, KEY> {
        private final KeyedStream<IN1, KEY> left;
        private final KeyedStream<IN2, KEY> right;
        private final long lowerBound;
        private final long upperBound;
        private final KeySelector<IN1, KEY> keySelector1;
        private final KeySelector<IN2, KEY> keySelector2;
        private boolean lowerBoundInclusive;
        private boolean upperBoundInclusive;
        private OutputTag<IN1> leftLateDataOutputTag;
        private OutputTag<IN2> rightLateDataOutputTag;
        private boolean isEnableAsyncState = false;

        public IntervalJoined(KeyedStream<IN1, KEY> left, KeyedStream<IN2, KEY> right, long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive) {
            this.left = Preconditions.checkNotNull(left);
            this.right = Preconditions.checkNotNull(right);
            this.lowerBound = lowerBound;
            this.upperBound = upperBound;
            this.lowerBoundInclusive = lowerBoundInclusive;
            this.upperBoundInclusive = upperBoundInclusive;
            this.keySelector1 = left.getKeySelector();
            this.keySelector2 = right.getKeySelector();
            if (left.isEnableAsyncState() || right.isEnableAsyncState()) {
                this.enableAsyncState();
            }
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
            this.upperBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
            this.lowerBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> sideOutputLeftLateData(OutputTag<IN1> outputTag) {
            outputTag = this.left.getExecutionEnvironment().clean(outputTag);
            this.leftLateDataOutputTag = outputTag;
            return this;
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> sideOutputRightLateData(OutputTag<IN2> outputTag) {
            outputTag = this.right.getExecutionEnvironment().clean(outputTag);
            this.rightLateDataOutputTag = outputTag;
            return this;
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
            Preconditions.checkNotNull(processJoinFunction);
            TypeInformation outputType = TypeExtractor.getBinaryOperatorReturnType(processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.left.getType(), this.right.getType(), Utils.getCallLocationName(), true);
            return this.process(processJoinFunction, outputType);
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(outputType);
            ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = this.left.getExecutionEnvironment().clean(processJoinFunction);
            if (this.isEnableAsyncState) {
                AsyncIntervalJoinOperator operator = new AsyncIntervalJoinOperator(this.lowerBound, this.upperBound, this.lowerBoundInclusive, this.upperBoundInclusive, this.leftLateDataOutputTag, this.rightLateDataOutputTag, this.left.getType().createSerializer(this.left.getExecutionConfig().getSerializerConfig()), this.right.getType().createSerializer(this.right.getExecutionConfig().getSerializerConfig()), cleanedUdf);
                return this.left.connect(this.right).keyBy(this.keySelector1, this.keySelector2).transform("Interval Join [Async]", outputType, operator);
            }
            IntervalJoinOperator operator = new IntervalJoinOperator(this.lowerBound, this.upperBound, this.lowerBoundInclusive, this.upperBoundInclusive, this.leftLateDataOutputTag, this.rightLateDataOutputTag, this.left.getType().createSerializer(this.left.getExecutionConfig().getSerializerConfig()), this.right.getType().createSerializer(this.right.getExecutionConfig().getSerializerConfig()), cleanedUdf);
            return this.left.connect(this.right).keyBy(this.keySelector1, this.keySelector2).transform("Interval Join", outputType, operator);
        }

        @Experimental
        public IntervalJoined<IN1, IN2, KEY> enableAsyncState() {
            this.isEnableAsyncState = true;
            return this;
        }
    }

    @PublicEvolving
    public static class IntervalJoin<T1, T2, KEY> {
        private final KeyedStream<T1, KEY> streamOne;
        private final KeyedStream<T2, KEY> streamTwo;
        private TimeBehaviour timeBehaviour = TimeBehaviour.EventTime;

        IntervalJoin(KeyedStream<T1, KEY> streamOne, KeyedStream<T2, KEY> streamTwo) {
            this.streamOne = Preconditions.checkNotNull(streamOne);
            this.streamTwo = Preconditions.checkNotNull(streamTwo);
        }

        public IntervalJoin<T1, T2, KEY> inEventTime() {
            this.timeBehaviour = TimeBehaviour.EventTime;
            return this;
        }

        public IntervalJoin<T1, T2, KEY> inProcessingTime() {
            this.timeBehaviour = TimeBehaviour.ProcessingTime;
            return this;
        }

        @PublicEvolving
        public IntervalJoined<T1, T2, KEY> between(Duration lowerBound, Duration upperBound) {
            if (this.timeBehaviour != TimeBehaviour.EventTime) {
                throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
            }
            Preconditions.checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
            Preconditions.checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");
            return new IntervalJoined<T1, T2, KEY>(this.streamOne, this.streamTwo, lowerBound.toMillis(), upperBound.toMillis(), true, true);
        }

        static enum TimeBehaviour {
            ProcessingTime,
            EventTime;

        }
    }
}

