/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.examples.rollingaggregation;

import com.hazelcast.jet.examples.rollingaggregation.Trade;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public final class TradeGenerator {
    private static final long NASDAQLISTED_ROWCOUNT = 3170L;
    private final List<String> tickers;
    private final long emitPeriodNanos;
    private final long startTimeMillis;
    private final long startTimeNanos;
    private final long endTimeNanos;
    private long scheduledTimeNanos;

    private TradeGenerator(long numTickers, int tradesPerSec, long timeoutSeconds) {
        this.tickers = TradeGenerator.loadTickers(numTickers);
        this.emitPeriodNanos = TimeUnit.SECONDS.toNanos(1L) / (long)tradesPerSec;
        this.startTimeNanos = this.scheduledTimeNanos = System.nanoTime();
        this.endTimeNanos = this.startTimeNanos + TimeUnit.SECONDS.toNanos(timeoutSeconds);
        this.startTimeMillis = System.currentTimeMillis();
    }

    public static StreamSource<Trade> tradeSource(int numTickers, int tradesPerSec, long timeoutSeconds) {
        return SourceBuilder.timestampedStream((String)"trade-source", (FunctionEx & Serializable)x -> new TradeGenerator(numTickers, tradesPerSec, timeoutSeconds)).fillBufferFn(TradeGenerator::generateTrades).build();
    }

    private void generateTrades(SourceBuilder.TimestampedSourceBuffer<Trade> buf) {
        if (this.scheduledTimeNanos >= this.endTimeNanos) {
            buf.close();
            return;
        }
        ThreadLocalRandom rnd = ThreadLocalRandom.current();
        long nowNanos = System.nanoTime();
        while (this.scheduledTimeNanos <= nowNanos) {
            String ticker = this.tickers.get(rnd.nextInt(this.tickers.size()));
            long tradeTimeMillis = this.startTimeMillis + TimeUnit.NANOSECONDS.toMillis(this.scheduledTimeNanos - this.startTimeNanos);
            Trade trade = new Trade(tradeTimeMillis, ticker, 1, rnd.nextInt(5000));
            buf.add((Object)trade, tradeTimeMillis);
            this.scheduledTimeNanos += this.emitPeriodNanos;
            if (this.scheduledTimeNanos <= nowNanos) continue;
            nowNanos = System.nanoTime();
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static List<String> loadTickers(long numTickers) {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(TradeGenerator.class.getResourceAsStream("/nasdaqlisted.txt"), StandardCharsets.UTF_8));){
            String line;
            ArrayList<String> result = new ArrayList<String>();
            long strideLength = 3170L / numTickers;
            int rowCount = 0;
            while ((line = reader.readLine()) != null) {
                if ((long)(++rowCount) % strideLength != 0L) continue;
                result.add(line.substring(0, line.indexOf(124)));
                if ((long)result.size() != numTickers) continue;
            }
            ArrayList<String> arrayList = result;
            return arrayList;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

