/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.io.Serializable;
import java.time.Duration;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ParameterTool;

public class GroupedProcessingTimeWindowExample {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        boolean asyncState = params.has("async-state");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        long numElementsPerParallel = 20000000L;
        long numKeys = 10000L;
        DataGeneratorFunction generatorFunction = new DataGeneratorFunction(20000000L, 10000L);
        DataGeneratorSource generatorSource = new DataGeneratorSource((GeneratorFunction)generatorFunction, 20000000L * (long)env.getParallelism(), Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.LONG, Types.LONG}));
        DataStreamSource stream = env.fromSource((Source)generatorSource, WatermarkStrategy.noWatermarks(), "Data Generator");
        KeyedStream keyedStream = stream.keyBy((KeySelector & Serializable)value -> (Long)value.f0);
        if (asyncState) {
            keyedStream = keyedStream.enableAsyncState();
        }
        keyedStream.window((WindowAssigner)SlidingProcessingTimeWindows.of((Duration)Duration.ofMillis(2500L), (Duration)Duration.ofMillis(500L))).reduce((ReduceFunction)new SummingReducer()).sinkTo((Sink)new DiscardingSink());
        env.execute();
    }

    private static class DataGeneratorFunction
    implements GeneratorFunction<Long, Tuple2<Long, Long>> {
        private final long numElements;
        private final long numKeys;
        private long startTime;

        public DataGeneratorFunction(long numElements, long numKeys) {
            this.numElements = numElements;
            this.numKeys = numKeys;
        }

        public Tuple2<Long, Long> map(Long value) throws Exception {
            if (value % this.numElements == 0L) {
                this.startTime = System.currentTimeMillis();
            }
            if (value % this.numElements + 1L == this.numElements) {
                long endTime = System.currentTimeMillis();
                System.out.println(String.valueOf(Thread.currentThread()) + ": Took " + (endTime - this.startTime) + " msecs for " + this.numElements + " values");
            }
            return new Tuple2((Object)(value % this.numKeys), (Object)1L);
        }
    }

    private static class SummingReducer
    implements ReduceFunction<Tuple2<Long, Long>> {
        private SummingReducer() {
        }

        public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) {
            return new Tuple2((Object)((Long)value1.f0), (Object)((Long)value1.f1 + (Long)value2.f1));
        }
    }

    private static class SummingWindowFunction
    implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
        private SummingWindowFunction() {
        }

        public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
            long sum = 0L;
            for (Tuple2<Long, Long> value : values) {
                sum += ((Long)value.f1).longValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }
    }

    private static class FirstFieldKeyExtractor<Type extends Tuple, Key>
    implements KeySelector<Type, Key> {
        private FirstFieldKeyExtractor() {
        }

        public Key getKey(Type value) {
            return (Key)value.getField(0);
        }
    }
}

