package uk.co.gresearch.siembol.common.storm;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/siembol-common-2.3.0.jar:uk/co/gresearch/siembol/common/storm/KafkaWriterBoltBase.class */
public abstract class KafkaWriterBoltBase extends BaseRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String AUTH_EXCEPTION_MESSAGE = "Authorization exception {} during writing messages to the kafka";
    private static final String KAFKA_EXCEPTION_MESSAGE = "Exception {} during writing messages to the kafka";
    private static final String SENDING_MESSAGE_LOG = "Sending message: {}, key :{} to the topic: {} ";
    private final Properties props;
    private OutputCollector collector;
    private Producer<String, String> producer;

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaWriterBoltBase(Properties properties) {
        this.props = properties;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.producer = new KafkaProducer(this.props, new StringSerializer(), new StringSerializer());
        prepareInternally();
    }

    protected void prepareInternally() {
    }

    public void cleanup() {
        this.producer.close();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeMessages(List<KafkaWriterMessage> list, KafkaWriterAnchor kafkaWriterAnchor) {
        kafkaWriterAnchor.acquire(list.size());
        list.forEach(kafkaWriterMessage -> {
            writeMessage(kafkaWriterMessage, kafkaWriterAnchor);
        });
    }

    private Callback createProducerCallback(KafkaWriterAnchor kafkaWriterAnchor) {
        return (recordMetadata, exc) -> {
            synchronized (this.collector) {
                if (exc != null) {
                    LOG.error(KAFKA_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(exc));
                    this.collector.fail(kafkaWriterAnchor.getTuple());
                } else if (kafkaWriterAnchor.release()) {
                    this.collector.ack(kafkaWriterAnchor.getTuple());
                }
            }
        };
    }

    private void writeMessage(KafkaWriterMessage kafkaWriterMessage, KafkaWriterAnchor kafkaWriterAnchor) {
        try {
            Callback createProducerCallback = createProducerCallback(kafkaWriterAnchor);
            LOG.debug(SENDING_MESSAGE_LOG, kafkaWriterMessage.getMessage(), kafkaWriterMessage.getKey(), kafkaWriterMessage.getTopic());
            this.producer.send(kafkaWriterMessage.getProducerRecord(), createProducerCallback);
        } catch (AuthorizationException e) {
            LOG.error(AUTH_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(e));
            this.producer.close();
            throw new IllegalStateException((Throwable) e);
        } catch (Exception e2) {
            LOG.error(KAFKA_EXCEPTION_MESSAGE, ExceptionUtils.getStackTrace(e2));
            this.collector.fail(kafkaWriterAnchor.getTuple());
        }
    }
}
