/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.loadgen;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.storm.loadgen.LoadCompConf;
import org.apache.storm.loadgen.NormalDistStats;
import org.apache.storm.loadgen.OutputStream;
import org.apache.storm.loadgen.OutputStreamEngine;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class LoadSpout
extends BaseRichSpout {
    private final List<OutputStream> streamStats;
    private List<OutputStreamEngineWithHisto> streams;
    private SpoutOutputCollector collector;
    private long nextStreamCounter = 0L;
    private final int numStreams;
    private final Queue<SentWithTime> replays = new ArrayDeque<SentWithTime>();

    public LoadSpout(double ratePerSecond) {
        OutputStream test = new OutputStream.Builder().withId("default").withRate(new NormalDistStats(ratePerSecond, 0.0, ratePerSecond, ratePerSecond)).build();
        this.streamStats = Arrays.asList(test);
        this.numStreams = 1;
    }

    public LoadSpout(LoadCompConf conf) {
        this.streamStats = Collections.unmodifiableList(new ArrayList<OutputStream>(conf.streams));
        this.numStreams = this.streamStats.size();
    }

    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.streams = Collections.unmodifiableList(this.streamStats.stream().map(ss -> new OutputStreamEngineWithHisto((OutputStream)ss, context)).collect(Collectors.toList()));
        this.collector = collector;
    }

    public void nextTuple() {
        if (!this.replays.isEmpty()) {
            SentWithTime swt = this.replays.poll();
            this.collector.emit(swt.streamName, (List)swt.keyValue, (Object)swt);
            return;
        }
        int size = this.numStreams;
        for (int tries = 0; tries < size; ++tries) {
            int index;
            OutputStreamEngineWithHisto se;
            Long emitTupleTime;
            if ((emitTupleTime = (se = this.streams.get(index = Math.abs((int)(this.nextStreamCounter++ % (long)size)))).shouldEmit()) == null) continue;
            SentWithTime swt = new SentWithTime(se.streamName, this.getNextValues(se), emitTupleTime, se.histogram);
            this.collector.emit(swt.streamName, (List)swt.keyValue, (Object)swt);
            break;
        }
    }

    protected Values getNextValues(OutputStreamEngine se) {
        return new Values(new Object[]{se.nextKey(), "JUST_SOME_VALUE"});
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        for (OutputStream s : this.streamStats) {
            declarer.declareStream(s.id, new Fields(new String[]{"key", "value"}));
        }
    }

    public void ack(Object id) {
        ((SentWithTime)id).done();
    }

    public void fail(Object id) {
        this.replays.add((SentWithTime)id);
    }

    private static class SentWithTime {
        public final String streamName;
        public final Values keyValue;
        public final long time;
        public final HistogramMetric histogram;

        SentWithTime(String streamName, Values keyValue, long time, HistogramMetric histogram) {
            this.streamName = streamName;
            this.keyValue = keyValue;
            this.time = time;
            this.histogram = histogram;
        }

        public void done() {
            this.histogram.recordValue(Math.max(0L, System.nanoTime() - this.time));
        }
    }

    private static class OutputStreamEngineWithHisto
    extends OutputStreamEngine {
        public final HistogramMetric histogram = new HistogramMetric(3600000000000L, 3);

        OutputStreamEngineWithHisto(OutputStream stats, TopologyContext context) {
            super(stats);
            context.registerMetric("comp-lat-histo-" + stats.id, (IMetric)this.histogram, 10);
        }
    }
}

