/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.statemachine.generator;

import java.io.IOException;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGenerator;
import org.apache.flink.util.Collector;

public class StandaloneThreadedGenerator {
    public static void runGenerator(Collector<Event>[] collectors) throws IOException {
        GeneratorThread[] threads = new GeneratorThread[collectors.length];
        int range = Integer.MAX_VALUE / collectors.length;
        for (int i = 0; i < threads.length; ++i) {
            GeneratorThread thread;
            int min = range * i;
            int max = min + range;
            threads[i] = thread = new GeneratorThread(collectors[i], min, max);
            thread.setName("Generator " + i);
        }
        long delay = 2L;
        int nextErroneous = 0;
        boolean running = true;
        for (GeneratorThread t : threads) {
            t.setDelay(delay);
            t.start();
        }
        ThroughputLogger throughputLogger = new ThroughputLogger(threads);
        throughputLogger.start();
        System.out.println("Commands:");
        System.out.println(" -> q : Quit");
        System.out.println(" -> + : increase latency");
        System.out.println(" -> - : decrease latency");
        System.out.println(" -> e : inject invalid state transition");
        block10: while (running) {
            int next = System.in.read();
            switch (next) {
                case 113: {
                    System.out.println("Quitting...");
                    running = false;
                    break;
                }
                case 101: {
                    System.out.println("Injecting erroneous transition ...");
                    threads[nextErroneous].sendInvalidStateTransition();
                    nextErroneous = (nextErroneous + 1) % threads.length;
                    break;
                }
                case 43: {
                    delay = Math.max(delay * 2L, 1L);
                    System.out.println("Delay is " + delay);
                    for (GeneratorThread t : threads) {
                        t.setDelay(delay);
                    }
                    continue block10;
                }
                case 45: {
                    System.out.println("Delay is " + (delay /= 2L));
                    for (GeneratorThread t : threads) {
                        t.setDelay(delay);
                    }
                    continue block10;
                }
            }
        }
        throughputLogger.shutdown();
        for (GeneratorThread t : threads) {
            t.shutdown();
            try {
                t.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private static class ThroughputLogger
    extends Thread {
        private final GeneratorThread[] generators;
        private volatile boolean running;

        ThroughputLogger(GeneratorThread[] generators) {
            this.generators = generators;
            this.running = true;
        }

        @Override
        public void run() {
            long lastCount = 0L;
            long lastTimeStamp = System.currentTimeMillis();
            while (this.running) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    break;
                }
                long ts = System.currentTimeMillis();
                long currCount = 0L;
                for (GeneratorThread generator : this.generators) {
                    currCount += generator.currentCount();
                }
                double factor = (double)(ts - lastTimeStamp) / 1000.0;
                double perSec = (double)(currCount - lastCount) / factor;
                lastTimeStamp = ts;
                lastCount = currCount;
                System.out.println(perSec + " / sec");
            }
        }

        public void shutdown() {
            this.running = false;
            this.interrupt();
        }
    }

    private static class GeneratorThread
    extends Thread {
        private final Collector<Event> out;
        private final int minAddress;
        private final int maxAddress;
        private long delay;
        private long count;
        private volatile boolean running;
        private volatile boolean injectInvalidNext;

        GeneratorThread(Collector<Event> out, int minAddress, int maxAddress) {
            this.out = out;
            this.minAddress = minAddress;
            this.maxAddress = maxAddress;
            this.running = true;
        }

        @Override
        public void run() {
            EventsGenerator generator = new EventsGenerator();
            while (this.running) {
                if (this.injectInvalidNext) {
                    this.injectInvalidNext = false;
                    Event next = generator.nextInvalid();
                    if (next != null) {
                        this.out.collect((Object)next);
                    }
                } else {
                    this.out.collect((Object)generator.next(this.minAddress, this.maxAddress));
                }
                ++this.count;
                if (this.delay <= 0L) continue;
                try {
                    Thread.sleep(this.delay);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        public long currentCount() {
            return this.count;
        }

        public void shutdown() {
            this.running = false;
            this.interrupt();
        }

        public void setDelay(long delay) {
            this.delay = delay;
        }

        public void sendInvalidStateTransition() {
            this.injectInvalidNext = true;
        }
    }
}

