/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.kafka.service.producer.transaction;

import java.util.Iterator;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.nifi.kafka.service.api.producer.PublishContext;
import org.apache.nifi.kafka.service.api.record.KafkaRecord;
import org.apache.nifi.kafka.service.producer.ProducerCallback;
import org.apache.nifi.kafka.service.producer.transaction.KafkaProducerWrapper;

public class KafkaTransactionalProducerWrapper
extends KafkaProducerWrapper {
    private volatile boolean inTransaction = false;

    public KafkaTransactionalProducerWrapper(Producer<byte[], byte[]> producer) {
        super(producer);
        producer.initTransactions();
    }

    @Override
    public void send(Iterator<KafkaRecord> kafkaRecords, PublishContext publishContext, ProducerCallback callback) {
        if (!this.inTransaction) {
            this.producer.beginTransaction();
            this.inTransaction = true;
        }
        super.send(kafkaRecords, publishContext, callback);
    }

    @Override
    public void commit() {
        try {
            boolean failure = false;
            for (int i = 0; i < 3; ++i) {
                try {
                    this.producer.commitTransaction();
                    if (!failure) break;
                    this.logger.info("Successfully commited producer transaction after {} retries", (Object)i);
                    break;
                }
                catch (TimeoutException te) {
                    failure = true;
                    if (i == 2) {
                        this.logger.warn("Failed to commit producer transaction after 3 attempts, each timing out. Aborting transaction.");
                        throw te;
                    }
                    this.logger.warn("Timed out while committing producer transaction. Retrying...");
                    continue;
                }
            }
            this.inTransaction = false;
        }
        catch (Exception e) {
            this.logger.error("Failed to commit producer transaction", (Throwable)e);
            this.abort();
        }
    }

    @Override
    public void abort() {
        if (!this.inTransaction) {
            return;
        }
        this.inTransaction = false;
        this.producer.abortTransaction();
    }
}

