/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.parallelconsumer.ConsumerManager;
import io.confluent.parallelconsumer.ConsumerOffsetCommitter;
import io.confluent.parallelconsumer.DrainingCloseable;
import io.confluent.parallelconsumer.InternalRuntimeError;
import io.confluent.parallelconsumer.OffsetCommitter;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.RateLimiter;
import io.confluent.parallelconsumer.WorkManager;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class BrokerPollSystem<K, V>
implements OffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(BrokerPollSystem.class);
    private final ConsumerManager<K, V> consumerManager;
    private ParallelEoSStreamProcessor.State state = ParallelEoSStreamProcessor.State.running;
    private Optional<Future<Boolean>> pollControlThreadFuture;
    private volatile boolean paused = false;
    private final ParallelEoSStreamProcessor<K, V> pc;
    private Optional<ConsumerOffsetCommitter<K, V>> committer = Optional.empty();
    private static Duration longPollTimeout = Duration.ofMillis(2000L);
    private final WorkManager<K, V> wm;
    private final RateLimiter pauseLimiter = new RateLimiter(1);

    public BrokerPollSystem(ConsumerManager<K, V> consumerMgr, WorkManager<K, V> wm, ParallelEoSStreamProcessor<K, V> pc, ParallelConsumerOptions options) {
        this.wm = wm;
        this.pc = pc;
        this.consumerManager = consumerMgr;
        switch (options.getCommitMode()) {
            case PERIODIC_CONSUMER_SYNC: 
            case PERIODIC_CONSUMER_ASYNCHRONOUS: {
                ConsumerOffsetCommitter<K, V> consumerCommitter = new ConsumerOffsetCommitter<K, V>(consumerMgr, wm, options);
                this.committer = Optional.of(consumerCommitter);
            }
        }
    }

    public void start() {
        Future<Boolean> submit = Executors.newSingleThreadExecutor().submit(this::controlLoop);
        this.pollControlThreadFuture = Optional.of(submit);
    }

    public void supervise() {
        Future<Boolean> booleanFuture;
        if (this.pollControlThreadFuture.isPresent() && ((booleanFuture = this.pollControlThreadFuture.get()).isCancelled() || booleanFuture.isDone())) {
            try {
                booleanFuture.get();
            }
            catch (Exception e) {
                throw new InternalRuntimeError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
            }
        }
    }

    private boolean controlLoop() {
        Thread.currentThread().setName("pc-broker-poll");
        this.pc.myId.ifPresent(id -> MDC.put((String)"pcId", (String)id));
        log.trace("Broker poll control loop start");
        this.committer.ifPresent(x -> x.claim());
        try {
            while (this.state != ParallelEoSStreamProcessor.State.closed) {
                log.trace("Loop: Broker poller: ({})", (Object)this.state);
                if (this.state == ParallelEoSStreamProcessor.State.running) {
                    ConsumerRecords<K, V> polledRecords = this.pollBrokerForRecords();
                    log.debug("Got {} records in poll result", (Object)polledRecords.count());
                    if (!polledRecords.isEmpty()) {
                        log.trace("Loop: Register work");
                        this.wm.registerWork(polledRecords);
                        if (!this.wm.hasWorkInFlight()) {
                            log.trace("Apparently no work is being done, make sure Control is awake to receive messages");
                            this.pc.notifyNewWorkRegistered();
                        }
                    }
                }
                this.maybeDoCommit();
                switch (this.state) {
                    case draining: {
                        this.doPause();
                        this.transitionToCloseMaybe();
                        break;
                    }
                    case closing: {
                        this.doClose();
                    }
                }
            }
            log.debug("Broker poll thread finished, returning true to future");
            return true;
        }
        catch (Exception e) {
            log.error("Unknown error", (Throwable)e);
            throw e;
        }
    }

    private void transitionToCloseMaybe() {
        if (this.isResponsibleForCommits() && !this.wm.isRecordsAwaitingToBeCommitted()) {
            this.state = ParallelEoSStreamProcessor.State.closing;
        } else {
            log.trace("Draining, but work still needs to be committed. Yielding thread to avoid busy wait.");
            try {
                Thread.sleep(1L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void doClose() {
        this.doPause();
        this.maybeCloseConsumer();
        this.state = ParallelEoSStreamProcessor.State.closed;
    }

    private void maybeCloseConsumer() {
        if (this.isResponsibleForCommits()) {
            log.debug("Closing {}, first closing consumer...", (Object)this.getClass().getSimpleName());
            this.consumerManager.close(DrainingCloseable.DEFAULT_TIMEOUT);
            log.debug("Consumer closed.");
        }
    }

    private boolean isResponsibleForCommits() {
        return this.committer.isPresent();
    }

    private ConsumerRecords<K, V> pollBrokerForRecords() {
        this.managePauseOfSubscription();
        log.debug("Subscriptions are paused: {}", (Object)this.paused);
        Duration thisLongPollTimeout = this.state == ParallelEoSStreamProcessor.State.running ? longPollTimeout : Duration.ofMillis(1L);
        log.debug("Long polling broker with timeout {} seconds, might appear to sleep here if subs are paused, or no data available on broker.", (Object)BackportUtils.toSeconds(thisLongPollTimeout));
        return this.consumerManager.poll(thisLongPollTimeout);
    }

    public void drain() {
        if (this.state != ParallelEoSStreamProcessor.State.draining) {
            log.debug("Signaling poll system to drain, waking up consumer...");
            this.state = ParallelEoSStreamProcessor.State.draining;
            this.consumerManager.wakeup();
        }
    }

    private void doPause() {
        if (this.paused) {
            log.trace("Already paused");
        } else if (this.pauseLimiter.couldPerform()) {
            this.pauseLimiter.performIfNotLimited(() -> {
                this.paused = true;
                log.debug("Pausing subs");
                Set<TopicPartition> assignment = this.consumerManager.assignment();
                this.consumerManager.pause(assignment);
            });
        } else if (log.isDebugEnabled()) {
            log.debug("Should pause but pause rate limit exceeded {} vs {}. Queued: {}", new Object[]{this.pauseLimiter.getElapsedDuration(), this.pauseLimiter.getRate(), this.wm.getWorkQueuedInMailboxCount()});
        }
    }

    public void closeAndWait() throws TimeoutException, ExecutionException {
        log.debug("Requesting broker polling system to close...");
        this.transitionToClosing();
        if (this.pollControlThreadFuture.isPresent()) {
            log.debug("Wait for loop to finish ending...");
            Future<Boolean> pollControlResult = this.pollControlThreadFuture.get();
            boolean interrupted = true;
            while (interrupted) {
                try {
                    Boolean pollShutdownSuccess = pollControlResult.get(DrainingCloseable.DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                    interrupted = false;
                    if (pollShutdownSuccess.booleanValue()) continue;
                    log.warn("Broker poll control thread not closed cleanly.");
                }
                catch (InterruptedException e) {
                    log.debug("Interrupted waiting for broker poller thread to finish", (Throwable)e);
                }
                catch (ExecutionException | TimeoutException e) {
                    log.error("Execution or timeout exception waiting for broker poller thread to finish", (Throwable)e);
                    throw e;
                }
            }
        }
        log.debug("Broker poll system finished closing");
    }

    private void transitionToClosing() {
        log.debug("Poller transitioning to closing, waking up consumer");
        this.state = ParallelEoSStreamProcessor.State.closing;
        this.consumerManager.wakeup();
    }

    private void managePauseOfSubscription() {
        boolean throttle = this.shouldThrottle();
        log.trace("Need to throttle: {}", (Object)throttle);
        if (throttle) {
            this.doPause();
        } else {
            this.resumeIfPaused();
        }
    }

    private void resumeIfPaused() {
        if (this.paused) {
            log.debug("Resuming consumer, waking up");
            Set<TopicPartition> pausedTopics = this.consumerManager.paused();
            this.consumerManager.resume(pausedTopics);
            this.consumerManager.wakeup();
            this.paused = false;
        }
    }

    private boolean shouldThrottle() {
        return this.wm.shouldThrottle();
    }

    @Override
    public void retrieveOffsetsAndCommit() {
        ConsumerOffsetCommitter<K, V> committer = this.committer.orElseThrow(() -> {
            throw new IllegalStateException("No committer configured");
        });
        committer.commit();
    }

    private void maybeDoCommit() {
        this.committer.ifPresent(ConsumerOffsetCommitter::maybeDoCommit);
    }

    public void wakeupIfPaused() {
        if (this.paused) {
            this.consumerManager.wakeup();
        }
    }

    public boolean isPaused() {
        return this.paused;
    }

    public static void setLongPollTimeout(Duration longPollTimeout) {
        BrokerPollSystem.longPollTimeout = longPollTimeout;
    }

    public static Duration getLongPollTimeout() {
        return longPollTimeout;
    }
}

