/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

public class ConsumerManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final Consumer<K, V> consumer;
    private final Duration offsetCommitTimeout;
    private final Duration saslAuthenticationRetryTimeout;
    private final Duration saslAuthenticationRetryBackOff;
    private final AtomicBoolean pollingBroker = new AtomicBoolean(false);
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private final AtomicLong pendingRequests = new AtomicLong(0L);
    private ConsumerGroupMetadata metaCache;
    private volatile int pausedPartitionSizeCache = 0;
    private int erroneousWakups = 0;
    private int correctPollWakeups = 0;
    private int noWakeups = 0;
    private boolean commitRequested;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ConsumerRecords<K, V> poll(Duration requestedLongPollTimeout) {
        Duration timeoutToUse = requestedLongPollTimeout;
        ConsumerRecords records = null;
        try {
            block18: {
                long tryCount;
                block16: {
                    if (this.commitRequested) {
                        log.debug("Commit requested, so will not long poll as need to perform the commit");
                        timeoutToUse = Duration.ofMillis(1L);
                        this.commitRequested = false;
                    }
                    this.pollingBroker.set(true);
                    this.updateCache();
                    log.debug("Poll starting with timeout: {}", (Object)timeoutToUse);
                    Instant pollStarted = Instant.now();
                    tryCount = 0L;
                    boolean polledSuccessfully = false;
                    try {
                        this.pendingRequests.addAndGet(1L);
                        while (!this.shutdownRequested.get()) {
                            ++tryCount;
                            try {
                                records = this.consumer.poll(timeoutToUse);
                                polledSuccessfully = true;
                                break;
                            }
                            catch (SaslAuthenticationException authenticationException) {
                                boolean shouldRetry;
                                Instant now = Instant.now();
                                Duration elapsed = Duration.between(pollStarted, now);
                                boolean bl = shouldRetry = elapsed.toMillis() < this.saslAuthenticationRetryTimeout.toMillis();
                                if (shouldRetry) {
                                    log.warn("Poll error: SaslAuthenticationException. Retrying ({})", (Object)tryCount);
                                    try {
                                        this.retryBackOff(this.saslAuthenticationRetryBackOff.toMillis());
                                        continue;
                                    }
                                    catch (InterruptedException ex) {
                                        throw new RuntimeException("Poll interrupted", ex);
                                    }
                                }
                                log.error("Poll error: SaslAuthenticationException. {} tries attempted, since {}", new Object[]{tryCount, pollStarted, authenticationException});
                                throw authenticationException;
                            }
                        }
                        if (!polledSuccessfully) break block16;
                    }
                    catch (Throwable throwable) {
                        if (polledSuccessfully) {
                            log.debug("Poll completed normally (after timeout of {} on try {}) and returned {}...", new Object[]{timeoutToUse, tryCount, records.count()});
                        } else {
                            log.debug("Poll did not completed (after timeout of {} and tries {}), shutdownRequested {}", new Object[]{timeoutToUse, tryCount, this.shutdownRequested.get()});
                        }
                        this.pendingRequests.addAndGet(-1L);
                        throw throwable;
                    }
                    log.debug("Poll completed normally (after timeout of {} on try {}) and returned {}...", new Object[]{timeoutToUse, tryCount, records.count()});
                    break block18;
                }
                log.debug("Poll did not completed (after timeout of {} and tries {}), shutdownRequested {}", new Object[]{timeoutToUse, tryCount, this.shutdownRequested.get()});
            }
            this.pendingRequests.addAndGet(-1L);
            this.updateCache();
        }
        catch (WakeupException w) {
            ++this.correctPollWakeups;
            log.debug("Awoken from broker poll");
            log.trace("Wakeup caller is:", (Throwable)w);
            records = new ConsumerRecords(UniMaps.of());
        }
        finally {
            this.pollingBroker.set(false);
        }
        return records != null ? records : new ConsumerRecords(UniMaps.of());
    }

    protected void updateCache() {
        this.metaCache = this.consumer.groupMetadata();
        this.pausedPartitionSizeCache = this.consumer.paused().size();
    }

    public void wakeup() {
        if (this.pollingBroker.get()) {
            log.debug("Waking up consumer");
            this.consumer.wakeup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        boolean inProgress = true;
        ++this.noWakeups;
        while (inProgress) {
            try {
                this.pendingRequests.addAndGet(1L);
                for (long tryCount = 0L; tryCount == 0L || !this.shutdownRequested.get(); ++tryCount) {
                    boolean shouldRetry;
                    Duration elapsed;
                    Instant now;
                    Instant startedTime = Instant.now();
                    try {
                        this.consumer.commitSync(offsetsToSend);
                        break;
                    }
                    catch (CommitFailedException commitFailedException) {
                        log.warn("Failed to commit offset due to group rebalancing. Will ignore the error for now.", (Throwable)commitFailedException);
                        break;
                    }
                    catch (TimeoutException timeoutException) {
                        now = Instant.now();
                        elapsed = Duration.between(startedTime, now);
                        boolean bl = shouldRetry = elapsed.toMillis() <= this.offsetCommitTimeout.toMillis();
                        if (shouldRetry) {
                            log.warn("Encountered timeout while committing offset. Retrying ({})", (Object)tryCount);
                            continue;
                        }
                        log.error("Offset commit took too long due to TimeoutException (tried {} times)", (Object)tryCount);
                        throw timeoutException;
                    }
                    catch (SaslAuthenticationException authenticationException) {
                        now = Instant.now();
                        elapsed = Duration.between(startedTime, now);
                        boolean bl = shouldRetry = elapsed.toMillis() <= this.saslAuthenticationRetryTimeout.toMillis();
                        if (shouldRetry) {
                            log.warn("Encountered SaslAuthenticationException while committing offset. Retrying ({})", (Object)tryCount);
                            try {
                                this.retryBackOff(this.saslAuthenticationRetryBackOff.toMillis());
                                continue;
                            }
                            catch (InterruptedException ex) {
                                log.warn("Offset Commit was interrupted", (Throwable)ex);
                                throw new RuntimeException("Offset Commit was interrupted");
                            }
                        }
                        log.error("Offset commit failed due to SaslAuthenticationException (tried {} times)", (Object)tryCount);
                        throw authenticationException;
                    }
                }
                inProgress = false;
            }
            catch (WakeupException w) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, (Throwable)w);
                ++this.erroneousWakups;
            }
            finally {
                this.pendingRequests.addAndGet(-1L);
            }
        }
    }

    private boolean retryBackOff(long backOffTimeMs) throws InterruptedException {
        int interval = 100;
        long started = System.currentTimeMillis();
        long deadLine = started + backOffTimeMs;
        while (System.currentTimeMillis() < deadLine) {
            Thread.sleep(interval);
            if (!this.shutdownRequested.get()) continue;
            return false;
        }
        return true;
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        boolean inProgress = true;
        ++this.noWakeups;
        while (inProgress) {
            try {
                this.consumer.commitAsync(offsets, callback);
                inProgress = false;
            }
            catch (WakeupException w) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, (Throwable)w);
                ++this.erroneousWakups;
            }
        }
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.metaCache;
    }

    public void signalStop() {
        if (!this.shutdownRequested.get()) {
            log.info("Signaling Consumer Manager to stop");
            this.shutdownRequested.set(true);
        }
    }

    public void close(Duration defaultTimeout) {
        long deadline = System.currentTimeMillis() + defaultTimeout.toMillis();
        log.debug("Consumer Manager Closing...");
        this.shutdownRequested.set(true);
        log.debug("ConsumerManager close waiting for max of {} for pending requests to complete", (Object)defaultTimeout);
        while (this.pendingRequests.get() > 0L && System.currentTimeMillis() < deadline) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ex) {
                throw new RuntimeException("Wait interrupted");
            }
        }
        log.debug("ConsumerManager close wait completed.");
        this.consumer.close(defaultTimeout);
        log.debug("ConsumerManager closed");
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public void pause(Set<TopicPartition> assignment) {
        this.consumer.pause(assignment);
    }

    public Set<TopicPartition> paused() {
        return this.consumer.paused();
    }

    public int getPausedPartitionSize() {
        return this.pausedPartitionSizeCache;
    }

    public void resume(Set<TopicPartition> pausedTopics) {
        this.consumer.resume(pausedTopics);
    }

    public void onCommitRequested() {
        this.commitRequested = true;
    }

    public ConsumerManager(Consumer<K, V> consumer, Duration offsetCommitTimeout, Duration saslAuthenticationRetryTimeout, Duration saslAuthenticationRetryBackOff) {
        this.consumer = consumer;
        this.offsetCommitTimeout = offsetCommitTimeout;
        this.saslAuthenticationRetryTimeout = saslAuthenticationRetryTimeout;
        this.saslAuthenticationRetryBackOff = saslAuthenticationRetryBackOff;
    }
}

