/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.service.producer;

import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.nifi.kafka.service.api.common.PartitionState;
import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
import org.apache.nifi.kafka.service.api.producer.PublishContext;
import org.apache.nifi.kafka.service.api.producer.RecordSummary;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.kafka.service.producer.ProducerCallback;
import org.apache.nifi.kafka.service.producer.transaction.KafkaNonTransactionalProducerWrapper;
import org.apache.nifi.kafka.service.producer.transaction.KafkaProducerWrapper;
import org.apache.nifi.kafka.service.producer.transaction.KafkaTransactionalProducerWrapper;
import org.apache.nifi.processor.exception.ProcessException;

public class Kafka3ProducerService
implements KafkaProducerService {
    private final Producer<byte[], byte[]> producer;
    private final List<ProducerCallback> callbacks;
    private final ServiceConfiguration serviceConfiguration;
    private final KafkaProducerWrapper wrapper;
    private volatile boolean closed = false;

    public Kafka3ProducerService(Properties properties, ServiceConfiguration serviceConfiguration, ProducerConfiguration producerConfiguration) {
        ByteArraySerializer serializer = new ByteArraySerializer();
        try {
            this.producer = new KafkaProducer(properties, (Serializer)serializer, (Serializer)serializer);
        }
        catch (Exception e) {
            throw new ProcessException("Failed to create Kafka Producer", (Throwable)e);
        }
        this.callbacks = new ArrayList<ProducerCallback>();
        this.serviceConfiguration = serviceConfiguration;
        try {
            this.wrapper = producerConfiguration.getTransactionsEnabled() ? new KafkaTransactionalProducerWrapper(this.producer) : new KafkaNonTransactionalProducerWrapper(this.producer);
        }
        catch (Exception e) {
            try {
                this.close();
            }
            catch (Exception closeEx) {
                e.addSuppressed(closeEx);
            }
            throw new ProcessException("Failed to initialize Kafka Producer", (Throwable)e);
        }
    }

    public void close() {
        this.closed = true;
        this.producer.close(Duration.ofSeconds(30L));
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void send(Iterator<KafkaRecord> kafkaRecords, PublishContext publishContext) {
        ProducerCallback callback = new ProducerCallback(publishContext.getFlowFile());
        this.callbacks.add(callback);
        List<Exception> callbackExceptions = callback.getExceptions();
        Exception publishException = publishContext.getException();
        if (publishException != null) {
            callbackExceptions.add(publishException);
        }
        if (callbackExceptions.isEmpty()) {
            try {
                this.wrapper.send(kafkaRecords, publishContext, callback);
            }
            catch (UncheckedIOException e) {
                callbackExceptions.add(e);
            }
            catch (Exception e) {
                callbackExceptions.add(e);
                throw e;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RecordSummary complete() {
        try {
            boolean shouldCommit = this.callbacks.stream().noneMatch(ProducerCallback::isFailure);
            if (shouldCommit) {
                this.producer.flush();
                this.wrapper.commit();
            } else {
                this.wrapper.abort();
            }
            RecordSummary recordSummary = new RecordSummary();
            List flowFileResults = recordSummary.getFlowFileResults();
            for (ProducerCallback callback : this.callbacks) {
                if (callback.isFailure()) {
                    flowFileResults.add(callback.toFailureResult());
                    continue;
                }
                flowFileResults.add(callback.waitComplete(this.serviceConfiguration.getMaxAckWait().toMillis()));
            }
            RecordSummary recordSummary2 = recordSummary;
            return recordSummary2;
        }
        finally {
            this.callbacks.clear();
        }
    }

    public List<PartitionState> getPartitionStates(String topic) {
        List partitionInfos = this.producer.partitionsFor(topic);
        return partitionInfos.stream().map(p -> new PartitionState(p.topic(), p.partition())).collect(Collectors.toList());
    }
}

