package io.eventuate.messaging.kafka.producer;

import io.eventuate.messaging.kafka.common.EventuateBinaryMessageEncoding;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:io/eventuate/messaging/kafka/producer/EventuateKafkaProducer.class */
public class EventuateKafkaProducer {
    private Producer<String, byte[]> producer;
    private StringSerializer stringSerializer = new StringSerializer();
    private EventuateKafkaPartitioner eventuateKafkaPartitioner = new EventuateKafkaPartitioner();
    private Properties producerProps = new Properties();

    public EventuateKafkaProducer(String str, EventuateKafkaProducerConfigurationProperties eventuateKafkaProducerConfigurationProperties) {
        this.producerProps.put("bootstrap.servers", str);
        this.producerProps.put("acks", "all");
        this.producerProps.put("retries", 0);
        this.producerProps.put("batch.size", 16384);
        this.producerProps.put("linger.ms", 1);
        this.producerProps.put("buffer.memory", 33554432);
        this.producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producerProps.putAll(eventuateKafkaProducerConfigurationProperties.getProperties());
        this.producer = new KafkaProducer(this.producerProps);
    }

    public CompletableFuture<?> send(String str, String str2, String str3) {
        return send(str, str2, EventuateBinaryMessageEncoding.stringToBytes(str3));
    }

    public CompletableFuture<?> send(String str, int i, String str2, String str3) {
        return send(str, i, str2, EventuateBinaryMessageEncoding.stringToBytes(str3));
    }

    public CompletableFuture<?> send(String str, String str2, byte[] bArr) {
        return send(new ProducerRecord<>(str, str2, bArr));
    }

    public CompletableFuture<?> send(String str, int i, String str2, byte[] bArr) {
        return send(new ProducerRecord<>(str, Integer.valueOf(i), str2, bArr));
    }

    private CompletableFuture<?> send(ProducerRecord<String, byte[]> producerRecord) {
        CompletableFuture<?> completableFuture = new CompletableFuture<>();
        this.producer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc == null) {
                completableFuture.complete(recordMetadata);
            } else {
                completableFuture.completeExceptionally(exc);
            }
        });
        return completableFuture;
    }

    public int partitionFor(String str, String str2) {
        return this.eventuateKafkaPartitioner.partition(str, this.stringSerializer.serialize(str, str2), partitionsFor(str));
    }

    public List<PartitionInfo> partitionsFor(String str) {
        return this.producer.partitionsFor(str);
    }

    public void close() {
        this.producer.close(1L, TimeUnit.SECONDS);
    }
}
