package com.mulesoft.connectors.kafka.internal.connection;

import com.mulesoft.connectors.commons.template.connection.ConnectorConnection;
import com.mulesoft.connectors.kafka.api.KafkaRecordAttributes;
import com.mulesoft.connectors.kafka.api.source.AckMode;
import com.mulesoft.connectors.kafka.api.source.TopicPartition;
import com.mulesoft.connectors.kafka.internal.error.KafkaErrorType;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidAckModeException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidOffsetException;
import com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicNameException;
import com.mulesoft.connectors.kafka.internal.error.exception.NegativeDurationException;
import com.mulesoft.connectors.kafka.internal.error.exception.NotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationInterruptedException;
import com.mulesoft.connectors.kafka.internal.error.exception.OperationTimeoutException;
import com.mulesoft.connectors.kafka.internal.error.exception.SessionNotFoundException;
import com.mulesoft.connectors.kafka.internal.error.exception.TimeoutTooLargeException;
import com.mulesoft.connectors.kafka.internal.error.exception.UnassignedConsumerException;
import com.mulesoft.connectors.kafka.internal.error.exception.UnexpectedException;
import com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPool;
import com.mulesoft.connectors.kafka.internal.model.consumer.ConsumerPoolClosedException;
import com.mulesoft.connectors.kafka.internal.model.consumer.MuleConsumer;
import com.mulesoft.connectors.kafka.internal.model.consumer.Session;
import com.mulesoft.connectors.kafka.internal.source.PollingTask;
import java.io.InputStream;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionValidationResult;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.util.IOUtils;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.FlowListener;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/connectors/kafka/internal/connection/ConsumerConnection.class */
public class ConsumerConnection implements ConnectorConnection {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConnection.class);
    private static final String SESSION_KEY_TEMPLATE = "%s-%s";
    private final ConsumerPool consumerPool;
    private final Scheduler workerScheduler;
    private final Map<String, Session<MuleConsumer>> openSessions = new ConcurrentHashMap();

    public ConsumerConnection(ConsumerPool consumerPool, Scheduler scheduler) {
        this.consumerPool = consumerPool;
        this.workerScheduler = scheduler;
    }

    public void seek(String str, int i, long j, Duration duration) throws ConnectionException {
        try {
            MuleConsumer checkOut = this.consumerPool.checkOut(str, i, duration);
            ConsumerPool consumerPool = this.consumerPool;
            consumerPool.getClass();
            Session session = new Session(checkOut, consumerPool::checkIn);
            Throwable th = null;
            try {
                try {
                    session.run(muleConsumer -> {
                        muleConsumer.seek(str, i, j);
                    });
                    if (session != null) {
                        if (0 != 0) {
                            try {
                                session.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            session.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (session != null) {
                    if (th != null) {
                        try {
                            session.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        session.close();
                    }
                }
                throw th3;
            }
        } catch (ConsumerPoolClosedException e) {
            throw new ConnectionException("The consumer Pool was closed when trying to execute the seek operation", e, (ErrorType) null, this);
        } catch (IllegalArgumentException e2) {
            throw new InvalidOffsetException(String.format("The seek operation used an invalid offset %s", Long.valueOf(j)), j, e2);
        } catch (IllegalStateException e3) {
            throw new NotFoundException(String.format("The topic:%s partition:%d is not currently assigned to this consumer", str, Integer.valueOf(i)), e3);
        }
    }

    public Result<InputStream, KafkaRecordAttributes> consume(BiFunction<String, ConsumerRecord<InputStream, InputStream>, Result<InputStream, KafkaRecordAttributes>> biFunction, Duration duration, Duration duration2, AckMode ackMode, FlowListener flowListener) throws ConnectionException {
        Session<MuleConsumer> createSession = createSession(duration2);
        AtomicReference atomicReference = new AtomicReference();
        try {
            try {
                Result<InputStream, KafkaRecordAttributes> result = (Result) createSession.apply(muleConsumer -> {
                    try {
                        atomicReference.set(muleConsumer.singleElementPoll(duration));
                        return (Result) biFunction.apply(handleConsumeResultAccordingToAckMode(ackMode, flowListener, createSession, atomicReference, muleConsumer), atomicReference.get());
                    } catch (org.apache.kafka.clients.consumer.InvalidOffsetException e) {
                        throw new InvalidOffsetException(e);
                    } catch (IllegalArgumentException e2) {
                        throw new NegativeDurationException(duration, e2);
                    } catch (IllegalStateException e3) {
                        throw new UnassignedConsumerException(e3);
                    } catch (InterruptException e4) {
                        throw new OperationInterruptedException((Throwable) e4);
                    } catch (AuthenticationException e5) {
                        throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e5, this);
                    } catch (KafkaException e6) {
                        throw new UnexpectedException(e6);
                    } catch (ArithmeticException e7) {
                        throw new TimeoutTooLargeException(duration, e7);
                    } catch (CommitFailedException e8) {
                        throw new com.mulesoft.connectors.kafka.internal.error.exception.CommitFailedException(e8.getMessage(), KafkaErrorType.COMMIT_FAILED, e8.getCause());
                    } catch (InvalidTopicException e9) {
                        throw new InvalidTopicNameException(e9);
                    } catch (AuthorizationException e10) {
                        throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e10);
                    }
                });
                if (atomicReference.get() == null || AckMode.IMMEDIATE == ackMode || AckMode.DUPS_OK == ackMode) {
                    createSession.close();
                }
                return result;
            } catch (com.mulesoft.connectors.kafka.internal.error.exception.CommitFailedException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Throwing ConnectionException because commit failed for connection: {}.", e, this);
                }
                createSession.close();
                throw new ConnectionException(e, this);
            }
        } catch (Throwable th) {
            if (atomicReference.get() == null || AckMode.IMMEDIATE == ackMode || AckMode.DUPS_OK == ackMode) {
                createSession.close();
            }
            throw th;
        }
    }

    private String handleConsumeResultAccordingToAckMode(AckMode ackMode, FlowListener flowListener, Session<MuleConsumer> session, AtomicReference<ConsumerRecord<InputStream, InputStream>> atomicReference, MuleConsumer muleConsumer) {
        if (atomicReference.get() != null) {
            switch (ackMode) {
                case AUTO:
                    flowListener.onSuccess(message -> {
                        logger.debug("Flow finished with success, about to commit offset for processed messages.");
                        muleConsumer.commit();
                        logger.debug("Successfully committed offset for processed messages.");
                        session.close();
                    });
                    flowListener.onError(exc -> {
                        logger.debug("Flow execution resulted in error. Same message will be consumed again.");
                        muleConsumer.resetBuffer();
                        session.close();
                    });
                    break;
                case IMMEDIATE:
                    muleConsumer.commit();
                    break;
                case DUPS_OK:
                    muleConsumer.asyncCommit();
                    break;
                case MANUAL:
                    String uuid = session.getId().toString();
                    addSession(ackMode, uuid, session);
                    flowListener.onError(exc2 -> {
                        logger.debug("Flow execution resulted in error. Same message will be consumed again.");
                        muleConsumer.resetBuffer();
                        removeSession(ackMode, uuid);
                    });
                    flowListener.onComplete(() -> {
                        logger.debug("Flow has finished, closing consumer session.");
                        session.close();
                    });
                    break;
            }
        }
        return session.getId().toString();
    }

    public void commit(AckMode ackMode, String str) throws ConnectionException {
        try {
            getSession(ackMode, str).run((v0) -> {
                v0.commit();
            });
        } catch (TimeoutException e) {
            logger.warn("The commit timeout for seesionId {}", str);
            throw new UnexpectedException(e);
        } catch (AuthenticationException e2) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e2, this);
        } catch (AuthorizationException e3) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e3);
        } catch (CommitFailedException e4) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.CommitFailedException(e4.getMessage(), KafkaErrorType.COMMIT_FAILED, e4.getCause());
        } catch (IllegalArgumentException | KafkaException e5) {
            logger.debug("There was an unexpected exception while doing a commit of the sessionId {}", str);
            throw new UnexpectedException(e5);
        } catch (InterruptException e6) {
            throw new OperationInterruptedException((Throwable) e6);
        }
    }

    public void subscribe(Duration duration, List<String> list) throws ConnectionException {
        try {
            Session session = new Session(this.consumerPool.checkoutAll(duration), set -> {
                Stream stream = set.stream();
                ConsumerPool consumerPool = this.consumerPool;
                consumerPool.getClass();
                stream.forEach(consumerPool::checkIn);
            });
            Throwable th = null;
            try {
                session.run(set2 -> {
                    set2.stream().forEach(muleConsumer -> {
                        muleConsumer.subscribe(list);
                    });
                });
                if (session != null) {
                    if (0 != 0) {
                        try {
                            session.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        session.close();
                    }
                }
            } catch (Throwable th3) {
                if (session != null) {
                    if (0 != 0) {
                        try {
                            session.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        session.close();
                    }
                }
                throw th3;
            }
        } catch (ConsumerPoolClosedException e) {
            throw new ConnectionException("The consumer Pool is closed, can't subscribe", e, (ErrorType) null, this);
        } catch (TimeoutException e2) {
            throw new OperationTimeoutException("subscribe", (Throwable) e2);
        } catch (AuthenticationException e3) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e3, this);
        } catch (AuthorizationException e4) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e4);
        }
    }

    public void assign(Duration duration, List<TopicPartition> list) throws ConnectionException {
        try {
            Session session = new Session(this.consumerPool.checkoutAll(duration), set -> {
                Stream stream = set.stream();
                ConsumerPool consumerPool = this.consumerPool;
                consumerPool.getClass();
                stream.forEach(consumerPool::checkIn);
            });
            Throwable th = null;
            try {
                session.run(set2 -> {
                    Iterator<List<TopicPartition>> it = dividePartitions(list, set2.size()).iterator();
                    set2.stream().forEach(muleConsumer -> {
                        muleConsumer.assign((List) it.next());
                    });
                });
                if (session != null) {
                    if (0 != 0) {
                        try {
                            session.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        session.close();
                    }
                }
            } catch (Throwable th3) {
                if (session != null) {
                    if (0 != 0) {
                        try {
                            session.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        session.close();
                    }
                }
                throw th3;
            }
        } catch (TimeoutException e) {
            throw new OperationTimeoutException("assignment", (Throwable) e);
        } catch (AuthorizationException e2) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e2);
        } catch (AuthenticationException e3) {
            throw new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e3, this);
        } catch (ConsumerPoolClosedException e4) {
            throw new ConnectionException("The consumer Pool was closed when trying to execute the assign operation", e4, (ErrorType) null, this);
        }
    }

    public Future<?> startPolling(PollingTask<?, ?, ?> pollingTask) {
        return this.workerScheduler.submit(pollingTask);
    }

    public <T> Map.Entry<String, T> poll(AckMode ackMode, Duration duration, BiFunction<MuleConsumer, Duration, T> biFunction) throws ConnectionException {
        Session<MuleConsumer> createSession = createSession(Duration.ofMillis(-1L));
        return (Map.Entry) createSession.apply(muleConsumer -> {
            try {
                return handlePollResultAccordingToAckMode(ackMode, createSession, muleConsumer, biFunction.apply(muleConsumer, duration));
            } catch (IllegalStateException e) {
                throw ((UnassignedConsumerException) ensureClosedSession(new UnassignedConsumerException(e), createSession));
            } catch (RuntimeException e2) {
                throw ((RuntimeException) ensureClosedSession(e2, createSession));
            } catch (InterruptException e3) {
                throw new OperationInterruptedException((Throwable) e3);
            } catch (org.apache.kafka.clients.consumer.InvalidOffsetException e4) {
                throw ((InvalidOffsetException) ensureClosedSession(new InvalidOffsetException(e4), createSession));
            } catch (AuthenticationException e5) {
                throw ((com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException) ensureClosedSession(new com.mulesoft.connectors.kafka.internal.error.exception.AuthenticationException(e5, this), createSession));
            } catch (NotFoundException e6) {
                logger.trace("No messages found.");
                return (AbstractMap.SimpleEntry) ensureClosedSession(null, createSession);
            } catch (InvalidTopicException e7) {
                throw ((com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicException) ensureClosedSession(new com.mulesoft.connectors.kafka.internal.error.exception.InvalidTopicException("An invalid topic name was provided ", e7), createSession));
            } catch (AuthorizationException e8) {
                throw ((com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException) ensureClosedSession(new com.mulesoft.connectors.kafka.internal.error.exception.AuthorizationException(e8), createSession));
            }
        });
    }

    private <T, E> E ensureClosedSession(E e, Session<T> session) {
        session.close();
        return e;
    }

    private <T> AbstractMap.SimpleEntry<String, T> handlePollResultAccordingToAckMode(AckMode ackMode, Session<MuleConsumer> session, MuleConsumer muleConsumer, T t) {
        if (t == null) {
            session.close();
            return null;
        }
        String str = null;
        switch (ackMode) {
            case AUTO:
            case MANUAL:
                str = session.getId().toString();
                addSession(ackMode, str, session);
                break;
            case IMMEDIATE:
                muleConsumer.commit();
                session.close();
                break;
            case DUPS_OK:
                muleConsumer.asyncCommit();
                session.close();
                break;
        }
        return new AbstractMap.SimpleEntry<>(str, t);
    }

    public void disconnect() {
        IOUtils.closeQuietly(this.consumerPool);
        if (logger.isDebugEnabled()) {
            logger.debug("Disconnected connection {}!", this);
        }
    }

    @Deprecated
    public void validate() {
        if (logger.isDebugEnabled()) {
            logger.debug("Validating connection {}!", this);
        }
        if (!this.consumerPool.isValid()) {
            throw new ModuleException(KafkaErrorType.INVALID_CONNECTION, new ConnectionException((Throwable) null, this));
        }
    }

    private List<List<TopicPartition>> dividePartitions(List<TopicPartition> list, int i) {
        ArrayList arrayList = new ArrayList();
        IntStream.range(0, i).forEach(i2 -> {
            arrayList.add(new ArrayList());
        });
        Iterator<TopicPartition> it = list.iterator();
        while (it.hasNext()) {
            for (int i3 = 0; it.hasNext() && i3 < i; i3++) {
                ((List) arrayList.get(i3)).add(it.next());
            }
        }
        return arrayList;
    }

    public void release(AckMode ackMode, String str) {
        getSession(ackMode, str).close();
        removeSession(ackMode, str);
    }

    private void removeSession(AckMode ackMode, String str) {
        this.openSessions.remove(String.format(SESSION_KEY_TEMPLATE, ackMode, str));
    }

    private void addSession(AckMode ackMode, String str, Session<MuleConsumer> session) {
        this.openSessions.put(String.format(SESSION_KEY_TEMPLATE, ackMode, str), session);
    }

    public void refreshBuffer(AckMode ackMode, String str) {
        getSession(ackMode, str).apply(muleConsumer -> {
            muleConsumer.resetBuffer();
            return null;
        });
    }

    private Session<MuleConsumer> getSession(AckMode ackMode, String str) {
        if (StringUtils.isBlank(str)) {
            throw new SessionNotFoundException(ackMode, str);
        }
        Optional<String> findFirst = this.openSessions.keySet().stream().filter(str2 -> {
            return str2.endsWith(str);
        }).peek(str3 -> {
            if (!str3.startsWith(ackMode.name())) {
                throw new InvalidAckModeException(ackMode);
            }
        }).findFirst();
        Map<String, Session<MuleConsumer>> map = this.openSessions;
        map.getClass();
        return (Session) findFirst.map((v1) -> {
            return r1.get(v1);
        }).orElseThrow(() -> {
            return new SessionNotFoundException(ackMode, str);
        });
    }

    private Session<MuleConsumer> createSession(Duration duration) throws ConnectionException {
        try {
            MuleConsumer checkOut = this.consumerPool.checkOut(duration);
            ConsumerPool consumerPool = this.consumerPool;
            consumerPool.getClass();
            return new Session<>(checkOut, consumerPool::checkIn);
        } catch (ConsumerPoolClosedException e) {
            throw new ConnectionException(e, this);
        }
    }

    public ConnectionValidationResult validateWithResult() {
        if (logger.isDebugEnabled()) {
            logger.debug("Validating connection {}!", this);
        }
        return !this.consumerPool.isValid() ? ConnectionValidationResult.failure("Invalid Connection", new ConnectionException("Invalid Connection", (Throwable) null, (ErrorType) null, this)) : ConnectionValidationResult.success();
    }
}
