/*
 * Decompiled with CFR 0.152.
 */
package org.sdase.commons.server.kafka.producer;

import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.sdase.commons.server.kafka.producer.MessageProducer;
import org.sdase.commons.server.kafka.prometheus.ProducerTopicMessageCounter;

public class KafkaMessageProducer<K, V>
implements MessageProducer<K, V> {
    private String topic;
    private KafkaProducer<K, V> producer;
    private ProducerTopicMessageCounter msgCounter;
    private String producerName;

    public KafkaMessageProducer(String topic, KafkaProducer<K, V> producer, ProducerTopicMessageCounter msgCounter, String producerName) {
        this.producer = producer;
        this.topic = topic;
        this.msgCounter = msgCounter;
        this.producerName = producerName;
    }

    @Override
    public Future<RecordMetadata> send(K key, V value) {
        ProducerRecord record = new ProducerRecord(this.topic, key, value);
        this.msgCounter.increase(this.producerName, record.topic());
        return this.producer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(K key, V value, Headers headers) {
        ProducerRecord record = new ProducerRecord(this.topic, null, key, value, (Iterable)headers);
        this.msgCounter.increase(this.producerName, record.topic());
        return this.producer.send(record);
    }

    public void close() {
        this.producer.close();
    }

    @Override
    public void flush() {
        this.producer.flush();
    }
}

