/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.loadgen;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.loadgen.ExecAndProcessLatencyEngine;
import org.apache.storm.loadgen.HttpForwardingMetricsConsumer;
import org.apache.storm.loadgen.LoadMetricsServer;
import org.apache.storm.loadgen.LoadSpout;
import org.apache.storm.loadgen.OutputStreamEngine;
import org.apache.storm.loadgen.ScopedTopologySet;
import org.apache.storm.loadgen.SlowExecutorPattern;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThroughputVsLatency {
    private static final Logger LOG = LoggerFactory.getLogger(ThroughputVsLatency.class);
    private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
    private static final long DEFAULT_RATE_PER_SECOND = 500L;
    private static final String DEFAULT_TOPO_NAME = "wc-test";
    private static final int DEFAULT_NUM_SPOUTS = 1;
    private static final int DEFAULT_NUM_SPLITS = 1;
    private static final int DEFAULT_NUM_COUNTS = 1;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        Options options = new Options();
        options.addOption(Option.builder("h").longOpt("help").desc("Print a help message").build());
        options.addOption(Option.builder("t").longOpt("test-time").argName("MINS").hasArg().desc("How long to run the tests for in mins (defaults to 5)").build());
        options.addOption(Option.builder().longOpt("rate").argName("SENTENCES/SEC").hasArg().desc("How many sentences per second to run. (defaults to 500)").build());
        options.addOption(Option.builder().longOpt("name").argName("TOPO_NAME").hasArg().desc("Name of the topology to run (defaults to wc-test)").build());
        options.addOption(Option.builder().longOpt("spouts").argName("NUM").hasArg().desc("Number of spouts to use (defaults to 1)").build());
        options.addOption(Option.builder().longOpt("splitters").argName("NUM").hasArg().desc("Number of splitter bolts to use (defaults to 1)").build());
        options.addOption(Option.builder().longOpt("splitter-imbalance").argName("MS(:COUNT)?").hasArg().desc("The number of ms that the first COUNT splitters will wait before processing.  This creates an imbalance that helps test load aware groupings (defaults to 0:1)").build());
        options.addOption(Option.builder().longOpt("counters").argName("NUM").hasArg().desc("Number of counter bolts to use (defaults to 1)").build());
        LoadMetricsServer.addCommandLineOptions(options);
        DefaultParser parser = new DefaultParser();
        CommandLine cmd = null;
        Exception commandLineException = null;
        SlowExecutorPattern slowness = null;
        double numMins = 5.0;
        double ratePerSecond = 500.0;
        String name = DEFAULT_TOPO_NAME;
        int numSpouts = 1;
        int numSplits = 1;
        int numCounts = 1;
        try {
            cmd = parser.parse(options, args);
            if (cmd.hasOption("t")) {
                numMins = Double.valueOf(cmd.getOptionValue("t"));
            }
            if (cmd.hasOption("rate")) {
                ratePerSecond = Double.parseDouble(cmd.getOptionValue("rate"));
            }
            if (cmd.hasOption("name")) {
                name = cmd.getOptionValue("name");
            }
            if (cmd.hasOption("spouts")) {
                numSpouts = Integer.parseInt(cmd.getOptionValue("spouts"));
            }
            if (cmd.hasOption("splitters")) {
                numSplits = Integer.parseInt(cmd.getOptionValue("splitters"));
            }
            if (cmd.hasOption("counters")) {
                numCounts = Integer.parseInt(cmd.getOptionValue("counters"));
            }
            if (cmd.hasOption("splitter-imbalance")) {
                slowness = SlowExecutorPattern.fromString(cmd.getOptionValue("splitter-imbalance"));
            }
        }
        catch (NumberFormatException | ParseException e) {
            commandLineException = e;
        }
        if (commandLineException != null || cmd.hasOption('h')) {
            if (commandLineException != null) {
                System.err.println("ERROR " + commandLineException.getMessage());
            }
            new HelpFormatter().printHelp("ThroughputVsLatency [options]", options);
            return;
        }
        LinkedHashMap<String, Object> metrics = new LinkedHashMap<String, Object>();
        metrics.put("target_rate", ratePerSecond);
        metrics.put("spout_parallel", numSpouts);
        metrics.put("split_parallel", numSplits);
        metrics.put("count_parallel", numCounts);
        Config conf = new Config();
        Map sysConf = Utils.readStormConfig();
        LoadMetricsServer metricServer = new LoadMetricsServer(sysConf, cmd, metrics);
        metricServer.serve();
        String url = metricServer.getUrl();
        NimbusClient client = NimbusClient.Builder.withConf((Map)sysConf).build();
        conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
        conf.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, (Object)url, 1L);
        HashMap<String, String> workerMetrics = new HashMap<String, String>();
        if (!NimbusClient.isLocalOverride()) {
            workerMetrics.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
        }
        conf.put((Object)"topology.worker.metrics", workerMetrics);
        conf.put((Object)"topology.builtin.metrics.bucket.size.secs", (Object)10);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", (IRichSpout)new FastRandomSentenceSpout((long)ratePerSecond / (long)numSpouts), (Number)numSpouts);
        builder.setBolt("split", (IBasicBolt)new SplitSentence(slowness), (Number)numSplits).shuffleGrouping("spout");
        builder.setBolt("count", (IBasicBolt)new WordCount(), (Number)numCounts).fieldsGrouping("split", new Fields(new String[]{"word"}));
        int exitStatus = -1;
        try (ScopedTopologySet topologyNames = new ScopedTopologySet(client.getClient());){
            StormSubmitter.submitTopology((String)name, (Map)conf, (StormTopology)builder.createTopology());
            topologyNames.add(name);
            metricServer.monitorFor(numMins, client.getClient(), topologyNames);
            exitStatus = 0;
        }
        catch (Exception e) {
            LOG.error("Error while running test", e);
        }
        finally {
            System.exit(exitStatus);
        }
    }

    public static class FastRandomSentenceSpout
    extends LoadSpout {
        static final String[] SENTENCES = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};

        public FastRandomSentenceSpout(long ratePerSecond) {
            super(ratePerSecond);
        }

        @Override
        protected Values getNextValues(OutputStreamEngine se) {
            String sentence = SENTENCES[se.nextInt(SENTENCES.length)];
            return new Values(new Object[]{sentence});
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(new String[]{"sentence"}));
        }
    }

    public static class SplitSentence
    extends BaseBasicBolt {
        private ExecAndProcessLatencyEngine sleep;
        private int executorIndex;

        public SplitSentence(SlowExecutorPattern slowness) {
            this.sleep = new ExecAndProcessLatencyEngine(slowness);
        }

        public void prepare(Map<String, Object> stormConf, TopologyContext context) {
            this.executorIndex = context.getThisTaskIndex();
            this.sleep.prepare();
        }

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            this.sleep.simulateProcessAndExecTime(this.executorIndex, Time.nanoTime(), null, () -> {
                String sentence = tuple.getString(0);
                for (String word : sentence.split("\\s+")) {
                    collector.emit((List)new Values(new Object[]{word, 1}));
                }
            });
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(new String[]{"word", "count"}));
        }
    }

    public static class WordCount
    extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();

        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = this.counts.get(word);
            if (count == null) {
                count = 0;
            }
            Integer n = count;
            count = count + 1;
            this.counts.put(word, count);
            collector.emit((List)new Values(new Object[]{word, count}));
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(new String[]{"word", "count"}));
        }
    }
}

