/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.service.producer.transaction;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.service.api.producer.KafkaRecordPartitioner;
import org.apache.nifi.kafka.service.api.producer.PublishContext;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.kafka.service.producer.ProducerCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaProducerWrapper {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final long MAX_PARTITION_CACHE_MILLIS = Duration.ofMinutes(5L).toMillis();
    protected final Producer<byte[], byte[]> producer;
    private final ConcurrentMap<String, CachedInteger> partitionCountCache = new ConcurrentHashMap<String, CachedInteger>();

    public KafkaProducerWrapper(Producer<byte[], byte[]> producer) {
        this.producer = producer;
    }

    public void send(Iterator<KafkaRecord> kafkaRecords, PublishContext publishContext, ProducerCallback callback) {
        while (kafkaRecords.hasNext()) {
            KafkaRecord kafkaRecord = kafkaRecords.next();
            ProducerRecord<byte[], byte[]> producerRecord = this.toProducerRecord(kafkaRecord, publishContext);
            this.producer.send(producerRecord, (Callback)callback);
            callback.send(producerRecord.topic());
        }
    }

    public abstract void commit();

    public abstract void abort();

    private ProducerRecord<byte[], byte[]> toProducerRecord(KafkaRecord kafkaRecord, PublishContext publishContext) {
        String topic = Optional.ofNullable(kafkaRecord.getTopic()).orElse(publishContext.getTopic());
        Integer partition = this.getPartition(kafkaRecord, publishContext.getFlowFile(), topic, publishContext.getPartitioner());
        return new ProducerRecord(topic, partition, kafkaRecord.getTimestamp(), (Object)kafkaRecord.getKey(), (Object)kafkaRecord.getValue(), this.toKafkaHeadersNative(kafkaRecord));
    }

    private Integer getPartition(KafkaRecord kafkaRecord, FlowFile flowFile, String topic, KafkaRecordPartitioner partitioner) {
        Integer explicitPartition = kafkaRecord.getPartition();
        if (explicitPartition != null) {
            return explicitPartition;
        }
        if (partitioner == null) {
            return null;
        }
        long partitionerValue = partitioner.partition(topic, flowFile);
        int numPartitions = this.getPartitionCount(topic);
        return (int)(Math.abs(partitionerValue) % (long)numPartitions);
    }

    private int getPartitionCount(String topicName) {
        CachedInteger cachedValue = (CachedInteger)this.partitionCountCache.get(topicName);
        if (cachedValue == null || System.currentTimeMillis() - cachedValue.timestamp() > MAX_PARTITION_CACHE_MILLIS) {
            int partitionCount = this.fetchPartitionCount(topicName);
            this.partitionCountCache.put(topicName, new CachedInteger(partitionCount, System.currentTimeMillis()));
            return partitionCount;
        }
        return cachedValue.value();
    }

    private int fetchPartitionCount(String topicName) {
        return this.producer.partitionsFor(topicName).size();
    }

    private List<Header> toKafkaHeadersNative(KafkaRecord kafkaRecord) {
        return kafkaRecord.getHeaders().stream().map(h -> new RecordHeader(h.key(), h.value())).collect(Collectors.toList());
    }

    private record CachedInteger(int value, long timestamp) {
    }
}

