package com.mulesoft.connectors.kafka.internal.source;

import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.internal.connection.ConsumerConnection;
import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.apache.kafka.common.errors.WakeupException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/source/PollingTask.class */
public class PollingTask<P, A, I> implements Runnable, Closeable {
    private static final Logger logger = LoggerFactory.getLogger(PollingTask.class);
    public static final String SESSION_KEY = "sessionKey";
    public static final String UNRECOVERABLE_EXCEPTION_MESSAGE = "Got an unrecoverable exception while running the polling task";
    public static final String RECOVERABLE_EXCEPTION_MESSAGE = "Got a recoverable exception while running the polling task ({}})";
    private final SourceCallback<P, A> sourceCallback;
    private final BiFunction<String, I, Result<P, A>> parser;
    private final BiFunction<MuleConsumer, Duration, I> pollOperation;
    private final ConsumerConnection consumerConnection;
    private final AckMode ackMode;
    private final Duration pollTimeout;
    private boolean running = true;
    private CountDownLatch closeCountDown = new CountDownLatch(1);

    public PollingTask(ConsumerConnection consumerConnection, AckMode ackMode, Duration duration, BiFunction<MuleConsumer, Duration, I> biFunction, BiFunction<String, I, Result<P, A>> biFunction2, SourceCallback<P, A> sourceCallback) {
        this.consumerConnection = consumerConnection;
        this.ackMode = ackMode;
        this.pollTimeout = duration;
        this.parser = biFunction2;
        this.pollOperation = biFunction;
        this.sourceCallback = sourceCallback;
    }

    @Override // java.lang.Runnable
    public void run() {
        logger.info("Starting the PollingTask with ackMode={}, pollTimeout={}", this.ackMode, this.pollTimeout);
        while (this.running) {
            try {
                try {
                    try {
                        try {
                            logger.trace("Listening for messages.");
                            if (logger.isDebugEnabled()) {
                                logger.debug("Using source connection {}.", this.consumerConnection);
                            }
                            Map.Entry poll = this.consumerConnection.poll(this.ackMode, this.pollTimeout, this.pollOperation);
                            if (poll != null) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Polled returned a value {}", poll);
                                }
                                String str = (String) poll.getKey();
                                SourceCallbackContext createContext = this.sourceCallback.createContext();
                                if (str != null) {
                                    createContext.addVariable(SESSION_KEY, str);
                                }
                                this.sourceCallback.handle((Result) this.parser.apply(str, poll.getValue()), createContext);
                                if (str == null) {
                                    logger.trace("Message(s) sent to flow.");
                                } else {
                                    logger.trace("Message(s) with key '{}' sent to flow.", str);
                                }
                            }
                        } catch (ConnectionException e) {
                            logger.info(UNRECOVERABLE_EXCEPTION_MESSAGE, e);
                            this.running = false;
                            this.sourceCallback.onConnectionException(e);
                        }
                    } catch (OperationTimeoutException e2) {
                        if (logger.isDebugEnabled()) {
                            logger.debug(RECOVERABLE_EXCEPTION_MESSAGE, e2, "Operation Timed out");
                        }
                    } catch (RuntimeException e3) {
                        logger.info(UNRECOVERABLE_EXCEPTION_MESSAGE, e3);
                        this.running = false;
                        this.sourceCallback.onConnectionException(new ConnectionException(UNRECOVERABLE_EXCEPTION_MESSAGE, e3, (ErrorType) null, this.consumerConnection));
                    }
                } catch (WakeupException e4) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(RECOVERABLE_EXCEPTION_MESSAGE, e4, "Wakeup");
                    }
                } catch (NotFoundException e5) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Did not get any results from Kafka for the last poll invocation");
                    }
                }
            } catch (Throwable th) {
                this.closeCountDown.countDown();
                throw th;
            }
        }
        logger.info("Finished the PollingTask normally with ackMode={}, pollTimeout={}", this.ackMode, this.pollTimeout);
        this.closeCountDown.countDown();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        logger.debug("Stopping running state {}", Boolean.valueOf(this.running));
        this.running = false;
        try {
            if (!this.closeCountDown.await(30L, TimeUnit.SECONDS)) {
                logger.warn("Polling task timeout while waiting to be closed");
            }
        } catch (InterruptedException e) {
            logger.warn("PollingTask was interrupted while closing");
        }
        logger.debug("Stopped");
    }
}
