/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.slidingwindow;

import com.hazelcast.jet.accumulator.LongLongAccumulator;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.function.ObjLongBiFunction;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.examples.slidingwindow.Trade;
import com.hazelcast.jet.function.BiConsumerEx;
import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.function.SupplierEx;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public final class TradeGenerator {
    private static final int MAX_LAG = 1000;
    private static final int QUANTITY = 100;
    private static final int PRICE_UNITS_PER_CENT = 100;
    private final List<String> tickers;
    private final long emitPeriodNanos;
    private final long startTimeMillis;
    private final long startTimeNanos;
    private final Map<String, LongLongAccumulator> pricesAndTrends;
    private long scheduledTimeNanos;

    private TradeGenerator(long numTickers, int tradesPerSec) {
        this.tickers = TradeGenerator.loadTickers(numTickers);
        this.pricesAndTrends = this.tickers.stream().collect(Collectors.toMap(t -> t, t -> new LongLongAccumulator(10000L, 10L)));
        this.emitPeriodNanos = TimeUnit.SECONDS.toNanos(1L) / (long)tradesPerSec;
        this.startTimeNanos = this.scheduledTimeNanos = System.nanoTime();
        this.startTimeMillis = System.currentTimeMillis();
    }

    public static StreamSource<Trade> tradeSource(int numTickers, int tradesPerSec) {
        return SourceBuilder.timestampedStream((String)"trade-source", (FunctionEx & Serializable)x -> new TradeGenerator(numTickers, tradesPerSec)).fillBufferFn(TradeGenerator::generateTrades).build();
    }

    public static ProcessorMetaSupplier tradeSourceP(int numTickers, int tradesPerSec, long watermarkStride) {
        return SourceProcessors.convenientTimestampedSourceP((FunctionEx & Serializable)procCtx -> new TradeGenerator(numTickers, tradesPerSec), TradeGenerator::generateTrades, (EventTimePolicy)EventTimePolicy.eventTimePolicy(Trade::getTime, (ObjLongBiFunction & Serializable)(trade, x) -> trade, (SupplierEx)WatermarkPolicy.limitingLag((long)1000L), (long)watermarkStride, (long)0L, (long)60000L), (FunctionEx & Serializable)ctx -> null, (BiConsumerEx & Serializable)(ctx, states) -> {}, (ConsumerEx)ConsumerEx.noop(), (int)0);
    }

    private void generateTrades(SourceBuilder.TimestampedSourceBuffer<Trade> buf) {
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        long nowNanos = System.nanoTime();
        while (this.scheduledTimeNanos <= nowNanos) {
            String ticker = this.tickers.get(rnd.nextInt(this.tickers.size()));
            LongLongAccumulator priceAndDelta = this.pricesAndTrends.get(ticker);
            int price = (int)(priceAndDelta.get1() / 100L);
            priceAndDelta.add1(priceAndDelta.get2());
            priceAndDelta.add2(rnd.nextLong(101L) - 50L);
            long tradeTimeNanos = this.scheduledTimeNanos - rnd.nextLong(1000L);
            long tradeTimeMillis = this.startTimeMillis + TimeUnit.NANOSECONDS.toMillis(tradeTimeNanos - this.startTimeNanos);
            Trade trade = new Trade(tradeTimeMillis, ticker, 100, price);
            buf.add((Object)trade, tradeTimeMillis);
            this.scheduledTimeNanos += this.emitPeriodNanos;
            if (this.scheduledTimeNanos <= nowNanos) continue;
            nowNanos = System.nanoTime();
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static List<String> loadTickers(long numTickers) {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(TradeGenerator.class.getResourceAsStream("/nasdaqlisted.txt"), StandardCharsets.UTF_8));){
            List<String> list = reader.lines().skip(1L).limit(numTickers).map(l -> l.split("\\|")[0]).collect(Collectors.toList());
            return list;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

