/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class ConsumerNetworkThread
extends KafkaThread
implements Closeable {
    static final long MAX_POLL_TIMEOUT_MS = 5000L;
    private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
    private final Time time;
    private final Logger log;
    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
    private final CompletableEventReaper applicationEventReaper;
    private final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier;
    private final Supplier<NetworkClientDelegate> networkClientDelegateSupplier;
    private final Supplier<RequestManagers> requestManagersSupplier;
    private final AsyncConsumerMetrics asyncConsumerMetrics;
    private ApplicationEventProcessor applicationEventProcessor;
    private NetworkClientDelegate networkClientDelegate;
    private RequestManagers requestManagers;
    private volatile boolean running;
    private final IdempotentCloser closer = new IdempotentCloser();
    private volatile Duration closeTimeout = Duration.ofMillis(30000L);
    private volatile long cachedMaximumTimeToWait = 5000L;
    private long lastPollTimeMs = 0L;

    public ConsumerNetworkThread(LogContext logContext, Time time, BlockingQueue<ApplicationEvent> applicationEventQueue, CompletableEventReaper applicationEventReaper, Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier, Supplier<NetworkClientDelegate> networkClientDelegateSupplier, Supplier<RequestManagers> requestManagersSupplier, AsyncConsumerMetrics asyncConsumerMetrics) {
        super(BACKGROUND_THREAD_NAME, true);
        this.time = time;
        this.log = logContext.logger(this.getClass());
        this.applicationEventQueue = applicationEventQueue;
        this.applicationEventReaper = applicationEventReaper;
        this.applicationEventProcessorSupplier = applicationEventProcessorSupplier;
        this.networkClientDelegateSupplier = networkClientDelegateSupplier;
        this.requestManagersSupplier = requestManagersSupplier;
        this.running = true;
        this.asyncConsumerMetrics = asyncConsumerMetrics;
    }

    @Override
    public void run() {
        try {
            this.log.debug("Consumer network thread started");
            this.initializeResources();
            while (this.running) {
                try {
                    this.runOnce();
                }
                catch (Throwable e) {
                    this.log.error("Unexpected error caught in consumer network thread", e);
                }
            }
        }
        catch (Throwable e) {
            this.log.error("Failed to initialize resources for consumer network thread", e);
        }
        finally {
            this.cleanup();
        }
    }

    void initializeResources() {
        this.applicationEventProcessor = this.applicationEventProcessorSupplier.get();
        this.networkClientDelegate = this.networkClientDelegateSupplier.get();
        this.requestManagers = this.requestManagersSupplier.get();
    }

    void runOnce() {
        this.processApplicationEvents();
        long currentTimeMs = this.time.milliseconds();
        if (this.lastPollTimeMs != 0L) {
            this.asyncConsumerMetrics.recordTimeBetweenNetworkThreadPoll(currentTimeMs - this.lastPollTimeMs);
        }
        this.lastPollTimeMs = currentTimeMs;
        long pollWaitTimeMs = 5000L;
        for (RequestManager rm : this.requestManagers.entries()) {
            NetworkClientDelegate.PollResult pollResult = rm.poll(currentTimeMs);
            long timeoutMs = this.networkClientDelegate.addAll(pollResult);
            pollWaitTimeMs = Math.min(pollWaitTimeMs, timeoutMs);
        }
        this.networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
        long maxTimeToWaitMs = Long.MAX_VALUE;
        for (RequestManager rm : this.requestManagers.entries()) {
            long waitMs = rm.maximumTimeToWait(currentTimeMs);
            maxTimeToWaitMs = Math.min(maxTimeToWaitMs, waitMs);
        }
        this.cachedMaximumTimeToWait = maxTimeToWaitMs;
        this.reapExpiredApplicationEvents(currentTimeMs);
        List<CompletableEvent<?>> uncompletedEvents = this.applicationEventReaper.uncompletedEvents();
        this.maybeFailOnMetadataError(uncompletedEvents);
    }

    private void processApplicationEvents() {
        LinkedList events = new LinkedList();
        this.applicationEventQueue.drainTo(events);
        if (events.isEmpty()) {
            return;
        }
        this.asyncConsumerMetrics.recordApplicationEventQueueSize(0);
        long startMs = this.time.milliseconds();
        for (ApplicationEvent event : events) {
            this.asyncConsumerMetrics.recordApplicationEventQueueTime(this.time.milliseconds() - event.enqueuedMs());
            try {
                if (event instanceof CompletableEvent) {
                    this.applicationEventReaper.add((CompletableEvent)((Object)event));
                    this.maybeFailOnMetadataError(List.of((CompletableEvent)((Object)event)));
                }
                this.applicationEventProcessor.process(event);
            }
            catch (Throwable t) {
                this.log.warn("Error processing event {}", (Object)t.getMessage(), (Object)t);
            }
        }
        this.asyncConsumerMetrics.recordApplicationEventQueueProcessingTime(this.time.milliseconds() - startMs);
    }

    private void reapExpiredApplicationEvents(long currentTimeMs) {
        this.asyncConsumerMetrics.recordApplicationEventExpiredSize(this.applicationEventReaper.reap(currentTimeMs));
    }

    static void runAtClose(Collection<RequestManager> requestManagers, NetworkClientDelegate networkClientDelegate, long currentTimeMs) {
        for (RequestManager rm : requestManagers) {
            NetworkClientDelegate.PollResult pollResult = rm.pollOnClose(currentTimeMs);
            networkClientDelegate.addAll(pollResult);
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void wakeup() {
        if (this.networkClientDelegate != null) {
            this.networkClientDelegate.wakeup();
        }
    }

    public long maximumTimeToWait() {
        return this.cachedMaximumTimeToWait;
    }

    @Override
    public void close() {
        this.close(this.closeTimeout);
    }

    public void close(Duration timeout) {
        Objects.requireNonNull(timeout, "Close timeout for consumer network thread must be non-null");
        this.closer.close(() -> this.closeInternal(timeout), () -> this.log.warn("The consumer network thread was already closed"));
    }

    private void closeInternal(Duration timeout) {
        long timeoutMs = timeout.toMillis();
        this.log.trace("Signaling the consumer network thread to close in {}ms", (Object)timeoutMs);
        this.running = false;
        this.closeTimeout = timeout;
        this.wakeup();
        try {
            this.join();
        }
        catch (InterruptedException e) {
            this.log.error("Interrupted while waiting for consumer network thread to complete", e);
        }
    }

    private void sendUnsentRequests(Timer timer) {
        if (!this.networkClientDelegate.hasAnyPendingRequests()) {
            return;
        }
        do {
            this.networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs());
            timer.update();
        } while (timer.notExpired() && this.networkClientDelegate.hasAnyPendingRequests());
        if (this.networkClientDelegate.hasAnyPendingRequests()) {
            this.log.warn("Close timeout of {} ms expired before the consumer network thread was able to complete pending requests. Inflight request count: {}, Unsent request count: {}", timer.timeoutMs(), this.networkClientDelegate.inflightRequestCount(), this.networkClientDelegate.unsentRequests().size());
        }
    }

    void cleanup() {
        this.log.trace("Closing the consumer network thread");
        Timer timer = this.time.timer(this.closeTimeout);
        try {
            if (this.requestManagers != null && this.networkClientDelegate != null) {
                ConsumerNetworkThread.runAtClose(this.requestManagers.entries(), this.networkClientDelegate, this.time.milliseconds());
            }
        }
        catch (Exception e) {
            this.log.error("Unexpected error during shutdown. Proceed with closing.", e);
        }
        finally {
            if (this.networkClientDelegate != null) {
                this.sendUnsentRequests(timer);
            }
            this.asyncConsumerMetrics.recordApplicationEventExpiredSize(this.applicationEventReaper.reap(this.applicationEventQueue));
            Utils.closeQuietly(this.requestManagers, "request managers");
            Utils.closeQuietly(this.networkClientDelegate, "network client delegate");
            this.log.debug("Closed the consumer network thread");
        }
    }

    private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
        ArrayList<CompletableApplicationEvent> subscriptionMetadataEvent = new ArrayList<CompletableApplicationEvent>();
        for (CompletableEvent<?> ce : events) {
            if (!(ce instanceof CompletableApplicationEvent) || !((CompletableApplicationEvent)ce).requireSubscriptionMetadata()) continue;
            subscriptionMetadataEvent.add((CompletableApplicationEvent)ce);
        }
        if (subscriptionMetadataEvent.isEmpty()) {
            return;
        }
        this.networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally((Throwable)metadataError)));
    }
}

