/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.WrappingFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.PartitionWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.sortpartition.KeyedSortPartitionOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

@Internal
public class KeyedPartitionWindowedStream<T, KEY>
implements PartitionWindowedStream<T> {
    private final StreamExecutionEnvironment environment;
    private final KeyedStream<T, KEY> input;

    public KeyedPartitionWindowedStream(StreamExecutionEnvironment environment, KeyedStream<T, KEY> input) {
        this.environment = environment;
        this.input = input;
    }

    @Override
    public <R> SingleOutputStreamOperator<R> mapPartition(MapPartitionFunction<T, R> mapPartitionFunction) {
        Preconditions.checkNotNull(mapPartitionFunction, "The map partition function must not be null.");
        mapPartitionFunction = this.environment.clean(mapPartitionFunction);
        String opName = "MapPartition";
        TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartitionFunction, this.input.getType(), opName, true);
        return this.input.window(GlobalWindows.createWithEndOfStreamTrigger()).apply(new KeyedMapPartitionWindowFunction(mapPartitionFunction), resultType);
    }

    @Override
    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction) {
        Preconditions.checkNotNull(reduceFunction, "The reduce function must not be null.");
        reduceFunction = this.environment.clean(reduceFunction);
        return this.input.window(GlobalWindows.createWithEndOfStreamTrigger()).reduce(reduceFunction);
    }

    @Override
    public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> aggregateFunction) {
        Preconditions.checkNotNull(aggregateFunction, "The aggregate function must not be null.");
        aggregateFunction = this.environment.clean(aggregateFunction);
        return this.input.window(GlobalWindows.createWithEndOfStreamTrigger()).aggregate(aggregateFunction);
    }

    @Override
    public SingleOutputStreamOperator<T> sortPartition(int field, Order order) {
        Preconditions.checkNotNull(order, "The order must not be null.");
        Preconditions.checkArgument(field > 0, "The field mustn't be less than zero.");
        TypeInformation inputType = this.input.getType();
        KeyedSortPartitionOperator operator = new KeyedSortPartitionOperator(inputType, field, order);
        String opName = "KeyedSortPartition";
        SingleOutputStreamOperator result = this.input.transform("KeyedSortPartition", inputType, operator).setParallelism(this.input.getParallelism());
        int managedMemoryWeight = Math.max(1, this.environment.getConfiguration().get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY).getMebiBytes());
        result.getTransformation().declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
        return result;
    }

    @Override
    public SingleOutputStreamOperator<T> sortPartition(String field, Order order) {
        Preconditions.checkNotNull(field, "The field must not be null.");
        Preconditions.checkNotNull(order, "The order must not be null.");
        TypeInformation inputType = this.input.getType();
        KeyedSortPartitionOperator operator = new KeyedSortPartitionOperator(inputType, field, order);
        String opName = "KeyedSortPartition";
        SingleOutputStreamOperator result = this.input.transform("KeyedSortPartition", inputType, operator).setParallelism(this.input.getParallelism());
        int managedMemoryWeight = Math.max(1, this.environment.getConfiguration().get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY).getMebiBytes());
        result.getTransformation().declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
        return result;
    }

    @Override
    public <K> SingleOutputStreamOperator<T> sortPartition(KeySelector<T, K> keySelector, Order order) {
        Preconditions.checkNotNull(keySelector, "The field must not be null.");
        Preconditions.checkNotNull(order, "The order must not be null.");
        TypeInformation inputType = this.input.getType();
        KeyedSortPartitionOperator operator = new KeyedSortPartitionOperator(inputType, this.environment.clean(keySelector), order);
        String opName = "KeyedSortPartition";
        SingleOutputStreamOperator result = this.input.transform("KeyedSortPartition", inputType, operator).setParallelism(this.input.getParallelism());
        int managedMemoryWeight = Math.max(1, this.environment.getConfiguration().get(ExecutionOptions.SORT_KEYED_PARTITION_MEMORY).getMebiBytes());
        result.getTransformation().declareManagedMemoryUseCaseAtOperatorScope(ManagedMemoryUseCase.OPERATOR, managedMemoryWeight);
        return result;
    }

    private static class KeyedMapPartitionWindowFunction<T, R, KEY>
    extends WrappingFunction<MapPartitionFunction<T, R>>
    implements WindowFunction<T, R, KEY, GlobalWindow> {
        public KeyedMapPartitionWindowFunction(MapPartitionFunction<T, R> function) {
            super(function);
        }

        @Override
        public void apply(KEY key, GlobalWindow window, Iterable<T> input, Collector<R> out) throws Exception {
            ((MapPartitionFunction)this.wrappedFunction).mapPartition(input, out);
        }
    }
}

