/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.loadgen;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.loadgen.ExecAndProcessLatencyEngine;
import org.apache.storm.loadgen.InputStream;
import org.apache.storm.loadgen.LoadCompConf;
import org.apache.storm.loadgen.OutputStream;
import org.apache.storm.loadgen.OutputStreamEngine;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBolt
extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(LoadBolt.class);
    private final List<OutputStream> outputStreamStats;
    private List<OutputStreamEngine> outputStreams;
    private final Map<GlobalStreamId, InputStream> inputStreams = new HashMap<GlobalStreamId, InputStream>();
    private OutputCollector collector;
    private final ExecAndProcessLatencyEngine sleep;
    private int executorIndex;

    public LoadBolt(LoadCompConf conf) {
        this.outputStreamStats = Collections.unmodifiableList(new ArrayList<OutputStream>(conf.streams));
        this.sleep = new ExecAndProcessLatencyEngine(conf.slp);
    }

    public void add(InputStream inputStream) {
        GlobalStreamId id = inputStream.gsid();
        this.inputStreams.put(id, inputStream);
    }

    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.outputStreams = Collections.unmodifiableList(this.outputStreamStats.stream().map(ss -> new OutputStreamEngine((OutputStream)ss)).collect(Collectors.toList()));
        this.collector = collector;
        this.executorIndex = context.getThisTaskIndex();
        this.sleep.prepare();
    }

    private void emitTuples(Tuple input) {
        for (OutputStreamEngine se : this.outputStreams) {
            while (se.shouldEmit() != null) {
                this.collector.emit(se.streamName, input, (List)new Values(new Object[]{se.nextKey(), "SOME-BOLT-VALUE"}));
            }
        }
    }

    public void execute(Tuple input) {
        long startTimeNs = System.nanoTime();
        InputStream in = this.inputStreams.get(input.getSourceGlobalStreamId());
        this.sleep.simulateProcessAndExecTime(this.executorIndex, startTimeNs, in, () -> {
            this.emitTuples(input);
            this.collector.ack(input);
        });
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        for (OutputStream s : this.outputStreamStats) {
            declarer.declareStream(s.id, new Fields(new String[]{"key", "value"}));
        }
    }
}

