/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

public class ConsumerManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final Consumer<K, V> consumer;
    private final AtomicBoolean pollingBroker = new AtomicBoolean(false);
    private ConsumerGroupMetadata metaCache;
    private int erroneousWakups = 0;
    private int correctPollWakeups = 0;
    private int noWakeups = 0;
    private boolean commitRequested;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ConsumerRecords<K, V> poll(Duration requestedLongPollTimeout) {
        ConsumerRecords records;
        Duration timeoutToUse = requestedLongPollTimeout;
        try {
            if (this.commitRequested) {
                log.debug("Commit requested, so will not long poll as need to perform the commit");
                timeoutToUse = Duration.ofMillis(1L);
                this.commitRequested = false;
            }
            this.pollingBroker.set(true);
            this.updateMetadataCache();
            log.debug("Poll starting with timeout: {}", (Object)timeoutToUse);
            records = this.consumer.poll(timeoutToUse);
            log.debug("Poll completed normally (after timeout of {}) and returned {}...", (Object)timeoutToUse, (Object)records.count());
            this.updateMetadataCache();
        }
        catch (WakeupException w) {
            ++this.correctPollWakeups;
            log.debug("Awoken from broker poll");
            log.trace("Wakeup caller is:", (Throwable)w);
            records = new ConsumerRecords(UniMaps.of());
        }
        finally {
            this.pollingBroker.set(false);
        }
        return records;
    }

    protected void updateMetadataCache() {
        this.metaCache = this.consumer.groupMetadata();
    }

    public void wakeup() {
        if (this.pollingBroker.get()) {
            log.debug("Waking up consumer");
            this.consumer.wakeup();
        }
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
        boolean inProgress = true;
        ++this.noWakeups;
        while (inProgress) {
            try {
                this.consumer.commitSync(offsetsToSend);
                inProgress = false;
            }
            catch (WakeupException w) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, (Throwable)w);
                ++this.erroneousWakups;
            }
        }
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        boolean inProgress = true;
        ++this.noWakeups;
        while (inProgress) {
            try {
                this.consumer.commitAsync(offsets, callback);
                inProgress = false;
            }
            catch (WakeupException w) {
                log.debug("Got woken up, retry. errors: " + this.erroneousWakups + " none: " + this.noWakeups + " correct:" + this.correctPollWakeups, (Throwable)w);
                ++this.erroneousWakups;
            }
        }
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.metaCache;
    }

    public void close(Duration defaultTimeout) {
        this.consumer.close(defaultTimeout);
    }

    public Set<TopicPartition> assignment() {
        return this.consumer.assignment();
    }

    public void pause(Set<TopicPartition> assignment) {
        this.consumer.pause(assignment);
    }

    public Set<TopicPartition> paused() {
        return this.consumer.paused();
    }

    public void resume(Set<TopicPartition> pausedTopics) {
        this.consumer.resume(pausedTopics);
    }

    public void onCommitRequested() {
        this.commitRequested = true;
    }

    public ConsumerManager(Consumer<K, V> consumer) {
        this.consumer = consumer;
    }
}

