/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.slidingwindow;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.jet.examples.slidingwindow.Trade;
import com.hazelcast.jet.examples.slidingwindow.TradeGenerator;
import com.hazelcast.jet.function.ComparatorEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.WindowDefinition;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class TopNStocks {
    private static final int JOB_DURATION = 15;

    private static Pipeline buildPipeline() {
        Pipeline p = Pipeline.create();
        ComparatorEx comparingValue = ComparatorEx.comparing(WindowResult::result);
        AggregateOperation1 aggrOpTopN = AggregateOperations.allOf((AggregateOperation1)AggregateOperations.topN((int)5, (ComparatorEx)comparingValue), (AggregateOperation1)AggregateOperations.topN((int)5, (ComparatorEx)comparingValue.reversed()), TopNResult::new);
        p.drawFrom(TradeGenerator.tradeSource(500, 6000)).withNativeTimestamps(1000L).groupingKey(Trade::getTicker).window((WindowDefinition)WindowDefinition.sliding((long)10000L, (long)1000L)).aggregate(AggregateOperations.linearTrend(Trade::getTime, Trade::getPrice)).window((WindowDefinition)WindowDefinition.tumbling((long)1000L)).aggregate(aggrOpTopN).drainTo(Sinks.logger((FunctionEx & Serializable)wr -> String.format("%nAt %s...%n%s", Util.toLocalTime((long)wr.end()), wr.result())));
        return p;
    }

    public static void main(String[] args) throws InterruptedException {
        JetInstance[] instances = new JetInstance[2];
        Arrays.parallelSetAll(instances, i -> Jet.newJetInstance());
        try {
            Job job = instances[0].newJob(TopNStocks.buildPipeline());
            TimeUnit.SECONDS.sleep(15L);
            job.cancel();
            job.join();
        }
        catch (CancellationException cancellationException) {
        }
        finally {
            Jet.shutdownAll();
        }
    }

    static final class TopNResult {
        final List<KeyedWindowResult<String, Double>> topIncrease;
        final List<KeyedWindowResult<String, Double>> topDecrease;

        TopNResult(List<KeyedWindowResult<String, Double>> topIncrease, List<KeyedWindowResult<String, Double>> topDecrease) {
            this.topIncrease = topIncrease;
            this.topDecrease = topDecrease;
        }

        public String toString() {
            return String.format("Top rising stocks:%n%s\nTop falling stocks:%n%s", this.topIncrease.stream().map(kwr -> String.format("   %s by %.2f", kwr.key(), kwr.result())).collect(Collectors.joining("\n")), this.topDecrease.stream().map(kwr -> String.format("   %s by %.2f", kwr.key(), kwr.result())).collect(Collectors.joining("\n")));
        }
    }
}

