/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.AbstractOffsetCommitter;
import io.confluent.parallelconsumer.internal.ConsumerManager;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.OffsetCommitter;
import io.confluent.parallelconsumer.internal.ProducerWrapper;
import io.confluent.parallelconsumer.state.WorkManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerManager<K, V>
extends AbstractOffsetCommitter<K, V>
implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(ProducerManager.class);
    protected final ProducerWrapper<K, V> producerWrapper;
    private final ParallelConsumerOptions<K, V> options;
    private ReentrantReadWriteLock producerTransactionLock;

    public ProducerManager(ProducerWrapper<K, V> newProducer, ConsumerManager<K, V> newConsumer, WorkManager<K, V> wm, ParallelConsumerOptions<K, V> options) {
        super(newConsumer, wm);
        this.producerWrapper = newProducer;
        this.options = options;
        this.initProducer();
    }

    private void initProducer() {
        this.producerTransactionLock = new ReentrantReadWriteLock(true);
        if (this.options.isUsingTransactionalProducer()) {
            if (!this.producerWrapper.isConfiguredForTransactions()) {
                throw new IllegalArgumentException("Using transactional option, yet Producer doesn't have a transaction ID - Producer needs a transaction id");
            }
            try {
                log.debug("Initialising producer transaction session...");
                this.producerWrapper.initTransactions();
            }
            catch (KafkaException e) {
                log.error("Make sure your producer is setup for transactions - specifically make sure it's {} is set.", (Object)"transactional.id", (Object)e);
                throw e;
            }
        } else if (this.producerWrapper.isConfiguredForTransactions()) {
            throw new IllegalArgumentException("Using non-transactional producer option, but Producer has a transaction ID - the Producer must not have a transaction ID for this option. This is because having such an ID forces the Producer into transactional mode - i.e. you cannot use it without using transactions.");
        }
    }

    public List<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>> produceMessages(List<ProducerRecord<K, V>> outMsgs) {
        this.ensureProduceStarted();
        this.lazyMaybeBeginTransaction();
        Callback callback = (metadata, exception) -> {
            if (exception != null) {
                log.error("Error producing result message", (Throwable)exception);
                throw new InternalRuntimeException("Error producing result message", exception);
            }
        };
        ArrayList<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>> futures = new ArrayList<ParallelConsumer.Tuple<ProducerRecord<K, V>, Future<RecordMetadata>>>(outMsgs.size());
        for (ProducerRecord<K, V> rec : outMsgs) {
            log.trace("Producing {}", rec);
            Future<RecordMetadata> future = this.producerWrapper.send(rec, callback);
            futures.add(ParallelConsumer.Tuple.pairOf(rec, future));
        }
        return futures;
    }

    private void lazyMaybeBeginTransaction() {
        if (this.options.isUsingTransactionCommitMode()) {
            boolean txNotBegunAlready;
            boolean bl = txNotBegunAlready = !this.producerWrapper.isTransactionOpen();
            if (txNotBegunAlready) {
                this.syncBeginTransaction();
            }
        }
    }

    private synchronized void syncBeginTransaction() {
        boolean txNotBegunAlready;
        boolean bl = txNotBegunAlready = !this.producerWrapper.isTransactionOpen();
        if (txNotBegunAlready) {
            this.beginTransaction();
        }
    }

    protected void releaseProduceLock(ProducingLock lock) {
        lock.unlock();
    }

    protected ProducingLock acquireProduceLock(PollContextInternal<K, V> context) throws java.util.concurrent.TimeoutException {
        ReentrantReadWriteLock.ReadLock readLock = this.producerTransactionLock.readLock();
        Duration produceLockTimeout = this.options.getProduceLockAcquisitionTimeout();
        log.debug("Acquiring produce lock (timeout: {})...", (Object)produceLockTimeout);
        boolean lockAcquired = false;
        try {
            lockAcquired = readLock.tryLock(produceLockTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new InternalRuntimeException("Interrupted while waiting to get produce lock (timeout was set to {})", (Throwable)e, produceLockTimeout);
        }
        if (!lockAcquired) {
            throw new java.util.concurrent.TimeoutException(StringUtils.msg("Timeout while waiting to get produce lock (was set to {}). Commit taking too long? Try increasing the produce lock timeout.", produceLockTimeout));
        }
        log.debug("Produce lock acquired (context: {}).", context.getOffsets());
        log.trace("Produce lock acquired.");
        return new ProducingLock(context, readLock);
    }

    @Override
    protected void preAcquireOffsetsToCommit() throws java.util.concurrent.TimeoutException, InterruptedException {
        this.acquireCommitLock();
        this.flush();
    }

    private void flush() {
        this.producerWrapper.flush();
    }

    @Override
    protected void postCommit() {
        if (this.producerTransactionLock.getWriteHoldCount() > 1) {
            throw new ConcurrentModificationException("Lock held too many times, won't be released problem and will cause deadlock");
        }
        this.releaseCommitLock();
    }

    @Override
    protected void commitOffsets(@NonNull Map<TopicPartition, OffsetAndMetadata> offsetsToSend, @NonNull ConsumerGroupMetadata groupMetadata) {
        if (offsetsToSend == null) {
            throw new NullPointerException("offsetsToSend is marked non-null but is null");
        }
        if (groupMetadata == null) {
            throw new NullPointerException("groupMetadata is marked non-null but is null");
        }
        log.debug("Transactional offset commit starting");
        if (!this.options.isUsingTransactionalProducer()) {
            throw new IllegalStateException("Bug: cannot use if not using transactional producer");
        }
        this.ensureCommitLockHeld();
        this.lazyMaybeBeginTransaction();
        try {
            this.producerWrapper.sendOffsetsToTransaction(offsetsToSend, groupMetadata);
        }
        catch (ProducerFencedException e) {
            throw new InternalRuntimeException(e);
        }
        boolean committed = false;
        int retryCount = 0;
        int arbitrarilyChosenLimitForArbitraryErrorSituation = 200;
        Throwable lastErrorSavedForRethrow = null;
        while (!committed) {
            if (retryCount > arbitrarilyChosenLimitForArbitraryErrorSituation) {
                String msg = StringUtils.msg("Retired too many times ({} > limit of {}), giving up. See error above.", retryCount, arbitrarilyChosenLimitForArbitraryErrorSituation);
                log.error(msg, lastErrorSavedForRethrow);
                throw new InternalRuntimeException(msg, lastErrorSavedForRethrow);
            }
            try {
                if (this.producerWrapper.isMockProducer()) {
                    this.commitTransaction();
                } else {
                    boolean retrying;
                    boolean bl = retrying = retryCount > 0;
                    if (retrying) {
                        boolean transactionModeIsReady;
                        if (this.producerWrapper.isTransactionCompleting()) {
                            this.commitTransaction();
                        }
                        boolean bl2 = transactionModeIsReady = lastErrorSavedForRethrow == null || !lastErrorSavedForRethrow.getMessage().contains("Invalid transition attempted from state READY to state COMMITTING_TRANSACTION");
                        if (transactionModeIsReady) {
                            log.error("Transaction was already in READY state - tx completed between interrupt and retry");
                        }
                    } else {
                        this.commitTransaction();
                    }
                }
                committed = true;
                if (retryCount <= 0) continue;
                log.warn("Commit success, but took {} tries.", (Object)retryCount);
            }
            catch (InterruptException | TimeoutException e) {
                log.warn("Commit exception, will retry, have tried {} times (see KafkaProducer#commit)", (Object)retryCount, (Object)e);
                lastErrorSavedForRethrow = e;
                ++retryCount;
            }
        }
    }

    private void commitTransaction() {
        this.producerWrapper.commitTransaction();
    }

    private void beginTransaction() {
        this.producerWrapper.beginTransaction();
    }

    public void close(Duration timeout) {
        log.debug("Closing producer, assuming no more in flight...");
        if (this.options.isUsingTransactionalProducer() && !this.producerWrapper.isTransactionReady()) {
            try {
                this.acquireCommitLock();
            }
            catch (InterruptedException | java.util.concurrent.TimeoutException e) {
                log.error("Exception acquiring commit lock, will try to abort anyway", (Throwable)e);
            }
            try {
                this.abortTransaction();
            }
            finally {
                this.releaseCommitLock();
            }
        }
        this.closeProducer(timeout);
    }

    private void closeProducer(Duration timeout) {
        this.producerWrapper.close(timeout);
    }

    private void abortTransaction() {
        this.producerWrapper.abortTransaction();
    }

    private void acquireCommitLock() throws java.util.concurrent.TimeoutException, InterruptedException {
        log.debug("Acquiring commit - checking lock state...");
        if (this.producerTransactionLock.isWriteLocked() && this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            log.debug("Lock already held, returning with-out reentering to avoid write lock layers...");
            return;
        }
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (this.producerTransactionLock.isWriteLocked() && !this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new ConcurrentModificationException(this.getClass().getSimpleName() + " is not safe for multi-threaded access - write lock already held by another thread");
        }
        Duration commitLockTimeout = this.options.getCommitLockAcquisitionTimeout();
        log.debug("Acquiring commit lock (timeout: {})...", (Object)commitLockTimeout);
        boolean gotLock = writeLock.tryLock(commitLockTimeout.toMillis(), TimeUnit.MILLISECONDS);
        if (!gotLock) {
            String msg = StringUtils.msg("Timeout getting commit lock (which was set to {}). Slow processing or too many records being ack'd? Try increasing the commit lock timeout ({}), or reduce your record processing time.", commitLockTimeout, "commitLockAcquisitionTimeout");
            throw new java.util.concurrent.TimeoutException(msg);
        }
        log.debug("Commit lock acquired.");
    }

    private void releaseCommitLock() {
        log.debug("Releasing commit lock...");
        ReentrantReadWriteLock.WriteLock writeLock = this.producerTransactionLock.writeLock();
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Not held be me");
        }
        writeLock.unlock();
        log.debug("Commit lock released.");
    }

    private void ensureCommitLockHeld() {
        if (!this.producerTransactionLock.isWriteLockedByCurrentThread()) {
            throw new IllegalStateException("Expected commit lock to be held");
        }
    }

    public boolean isTransactionCommittingInProgress() {
        return this.producerTransactionLock.isWriteLocked();
    }

    public ProducingLock beginProducing(PollContextInternal<K, V> context) throws java.util.concurrent.TimeoutException {
        return this.acquireProduceLock(context);
    }

    public void finishProducing(@NonNull ProducingLock produceLock) {
        if (produceLock == null) {
            throw new NullPointerException("produceLock is marked non-null but is null");
        }
        this.ensureProduceStarted();
        this.releaseProduceLock(produceLock);
    }

    private void ensureProduceStarted() {
        if (this.options.isUsingTransactionCommitMode() && this.producerTransactionLock.getReadHoldCount() < 1) {
            throw new InternalRuntimeException("Need to call #beginProducing first");
        }
    }

    public String toString() {
        return "ProducerManager()";
    }

    public ProducerWrapper<K, V> getProducerWrapper() {
        return this.producerWrapper;
    }

    public ReentrantReadWriteLock getProducerTransactionLock() {
        return this.producerTransactionLock;
    }

    public class ProducingLock {
        private final PollContextInternal<K, V> context;
        private final ReentrantReadWriteLock.ReadLock produceLock;

        protected void unlock() {
            this.produceLock.unlock();
            log.debug("Unlocking produce lock (context: {}).", this.context.getOffsets());
        }

        public ProducingLock(PollContextInternal<K, V> context, ReentrantReadWriteLock.ReadLock produceLock) {
            this.context = context;
            this.produceLock = produceLock;
        }
    }
}

