/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.storm;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.elasticsearch.hadoop.EsHadoopException;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.RestService;
import org.elasticsearch.hadoop.rest.bulk.BulkResponse;
import org.elasticsearch.storm.TupleUtils;
import org.elasticsearch.storm.cfg.StormSettings;
import org.elasticsearch.storm.serialization.StormTupleBytesConverter;
import org.elasticsearch.storm.serialization.StormTupleFieldExtractor;
import org.elasticsearch.storm.serialization.StormValueWriter;

public class EsBolt
implements IRichBolt {
    private static transient Log log = LogFactory.getLog(EsBolt.class);
    private Map boltConfig = new LinkedHashMap();
    private transient RestService.PartitionWriter writer;
    private transient boolean flushOnTickTuple = true;
    private transient boolean ackWrites = false;
    private transient List<Tuple> inflightTuples = null;
    private transient int numberOfEntries = 0;
    private transient OutputCollector collector;

    public EsBolt(String target) {
        this.boltConfig.put("es.resource.write", target);
    }

    public EsBolt(String target, boolean writeAck) {
        this.boltConfig.put("es.resource.write", target);
        this.boltConfig.put("es.storm.bolt.write.ack", Boolean.toString(writeAck));
    }

    public EsBolt(String target, Map configuration) {
        this.boltConfig.putAll(configuration);
        this.boltConfig.put("es.resource.write", target);
    }

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        LinkedHashMap copy = new LinkedHashMap(conf);
        copy.putAll(this.boltConfig);
        StormSettings settings = new StormSettings(copy);
        this.flushOnTickTuple = settings.getStormTickTupleFlush();
        this.ackWrites = settings.getStormBoltAck();
        if (this.ackWrites) {
            settings.setProperty("es.batch.flush.manual", Boolean.TRUE.toString());
            this.numberOfEntries = settings.getStormBulkSize();
            settings.setProperty("es.batch.size.entries", String.valueOf(this.numberOfEntries));
            this.inflightTuples = new ArrayList<Tuple>(this.numberOfEntries + 1);
        }
        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
        InitializationUtils.setValueWriterIfNotSet(settings, StormValueWriter.class, log);
        InitializationUtils.setBytesConverterIfNeeded(settings, StormTupleBytesConverter.class, log);
        InitializationUtils.setFieldExtractorIfNotSet(settings, StormTupleFieldExtractor.class, log);
        this.writer = RestService.createWriter(settings, context.getThisTaskIndex(), totalTasks, log);
    }

    public void execute(Tuple input) {
        if (this.flushOnTickTuple && TupleUtils.isTickTuple(input)) {
            this.flush();
            return;
        }
        if (this.ackWrites) {
            this.inflightTuples.add(input);
        }
        try {
            this.writer.repository.writeToIndex(input);
            if (this.numberOfEntries > 0 && this.inflightTuples.size() >= this.numberOfEntries) {
                this.flush();
            }
            if (!this.ackWrites) {
                this.collector.ack(input);
            }
        }
        catch (RuntimeException ex) {
            if (!this.ackWrites) {
                this.collector.fail(input);
            }
            throw ex;
        }
    }

    private void flush() {
        if (this.ackWrites) {
            this.flushWithAck();
        } else {
            this.flushNoAck();
        }
    }

    private void flushWithAck() {
        BitSet flush = new BitSet();
        try {
            List<BulkResponse.BulkError> documentErrors = this.writer.repository.tryFlush().getDocumentErrors();
            for (BulkResponse.BulkError documentError : documentErrors) {
                flush.set(documentError.getOriginalPosition());
            }
        }
        catch (EsHadoopException ex) {
            for (Tuple input : this.inflightTuples) {
                this.collector.fail(input);
            }
            this.inflightTuples.clear();
            throw ex;
        }
        for (int index = 0; index < this.inflightTuples.size(); ++index) {
            Tuple tuple = this.inflightTuples.get(index);
            if (flush.get(index)) {
                this.collector.fail(tuple);
                continue;
            }
            this.collector.ack(tuple);
        }
        this.inflightTuples.clear();
    }

    private void flushNoAck() {
        this.writer.repository.flush();
    }

    public void cleanup() {
        if (this.writer != null) {
            try {
                this.flush();
            }
            finally {
                this.writer.close();
                this.writer = null;
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

