/*
 * Decompiled with CFR 0.152.
 */
package org.sdase.commons.server.kafka.consumer;

import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.sdase.commons.server.kafka.config.ListenerConfig;
import org.sdase.commons.server.kafka.consumer.StopListenerException;
import org.sdase.commons.server.kafka.consumer.strategies.MessageListenerStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageListener<K, V>
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageListener.class);
    private final long configuredPollIntervalMillis;
    private final long maxPollIntervalMillis;
    private final AtomicLong currentPollIntervalMillis;
    private final long pollIntervalFactorOnError;
    private final long topicMissingRetryMs;
    private final MessageListenerStrategy<K, V> strategy;
    private final Collection<String> topics;
    private final String joinedTopics;
    private final AtomicBoolean shouldStop = new AtomicBoolean(false);
    private final KafkaConsumer<K, V> consumer;

    public MessageListener(Collection<String> topics, KafkaConsumer<K, V> consumer, ListenerConfig listenerConfig, MessageListenerStrategy<K, V> strategy) {
        this.topics = topics;
        this.joinedTopics = String.join((CharSequence)",", topics);
        this.consumer = consumer;
        consumer.subscribe(topics);
        this.strategy = strategy;
        this.configuredPollIntervalMillis = listenerConfig.getPollInterval();
        this.topicMissingRetryMs = listenerConfig.getTopicMissingRetryMs();
        this.currentPollIntervalMillis = new AtomicLong(this.configuredPollIntervalMillis);
        this.pollIntervalFactorOnError = listenerConfig.getPollIntervalFactorOnError();
        this.maxPollIntervalMillis = listenerConfig.getMaxPollInterval();
    }

    @Override
    public void run() {
        this.waitForTopic(this.joinedTopics);
        while (!this.shouldStop.get()) {
            try {
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(this.currentPollIntervalMillis.get()));
                if (records.count() > 0) {
                    LOGGER.debug("Received {} messages from topics [{}]", (Object)records.count(), (Object)this.joinedTopics);
                } else {
                    LOGGER.trace("Received {} messages from topics [{}]", (Object)records.count(), (Object)this.joinedTopics);
                }
                this.strategy.resetOffsetsToCommitOnClose();
                this.strategy.processRecords(records, this.consumer);
                this.configureAfterSuccess();
            }
            catch (WakeupException w) {
                if (this.shouldStop.get()) {
                    LOGGER.info("Woke up to stop consuming.");
                    continue;
                }
                LOGGER.warn("Woke up before polling returned but shouldStop is {}.", (Object)this.shouldStop.get(), (Object)w);
            }
            catch (StopListenerException e) {
                LOGGER.error("Stopping listener for topics [{}] due to exception", (Object)this.joinedTopics, (Object)e);
                break;
            }
            catch (RuntimeException re) {
                LOGGER.error("Unauthorized or other runtime exception.", (Throwable)re);
                this.configureAfterError();
            }
        }
        LOGGER.info("MessageListener closing Consumer for [{}]", (Object)this.joinedTopics);
        try {
            this.strategy.commitOnClose(this.consumer);
        }
        finally {
            this.consumer.close();
        }
    }

    private void waitForTopic(String joinedTopics) {
        if (this.topicMissingRetryMs > 0L) {
            while (!this.shouldStop.get() && !this.topicsReady()) {
                LOGGER.warn("Topics {} are not ready yet. Waiting {} ms for retry", (Object)joinedTopics, (Object)this.topicMissingRetryMs);
                try {
                    Thread.sleep(this.topicMissingRetryMs);
                }
                catch (InterruptedException e) {
                    LOGGER.error("Thread interrupted when waiting for topic to come up");
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    boolean topicsReady() {
        return !this.topics.stream().map(arg_0 -> this.consumer.partitionsFor(arg_0)).filter(Objects::nonNull).flatMap(Collection::stream).filter(Objects::nonNull).collect(Collectors.toSet()).isEmpty();
    }

    public void stopConsumer() {
        this.shouldStop.set(true);
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    public KafkaConsumer<K, V> getConsumer() {
        return this.consumer;
    }

    public String toString() {
        return "ML ".concat(String.join((CharSequence)"", this.topics));
    }

    private void configureAfterSuccess() {
        long oldValue = this.currentPollIntervalMillis.getAndSet(this.configuredPollIntervalMillis);
        if (oldValue != this.configuredPollIntervalMillis) {
            LOGGER.info("Resetting poll interval to {}ms after success", (Object)this.currentPollIntervalMillis);
        }
    }

    private void configureAfterError() {
        long nextPollIntervalAfterError = this.currentPollIntervalMillis.get() * this.pollIntervalFactorOnError;
        this.currentPollIntervalMillis.set(Math.min(this.maxPollIntervalMillis, nextPollIntervalAfterError));
        LOGGER.info("Setting poll interval to {}ms after error", (Object)this.currentPollIntervalMillis);
    }
}

