package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.MessageLockLostException;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.OperationCancelledException;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.SessionLockLostException;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.Timer;
import com.microsoft.azure.servicebus.primitives.TimerType;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/microsoft/azure/servicebus/MessageAndSessionPump.class */
public class MessageAndSessionPump extends InitializableEntity implements IMessageAndSessionPump {
    private static final int UNSET_PREFETCH_COUNT = -1;
    private final ConcurrentHashMap<String, IMessageSession> openSessions;
    private final MessagingFactory factory;
    private final String entityPath;
    private final ReceiveMode receiveMode;
    private final MessagingEntityType entityType;
    private IMessageReceiver innerReceiver;
    private boolean handlerRegistered;
    private IMessageHandler messageHandler;
    private ISessionHandler sessionHandler;
    private MessageHandlerOptions messageHandlerOptions;
    private SessionHandlerOptions sessionHandlerOptions;
    private int prefetchCount;
    private ExecutorService customCodeExecutor;
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessageAndSessionPump.class);
    private static final Duration MINIMUM_MESSAGE_LOCK_VALIDITY = Duration.ofSeconds(4);
    private static final Duration MAXIMUM_RENEW_LOCK_BUFFER = Duration.ofSeconds(10);
    private static final Duration SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION = Duration.ofMinutes(1);
    private static final CompletableFuture<Void> COMPLETED_FUTURE = CompletableFuture.completedFuture(null);

    /* loaded from: input_file:com/microsoft/azure/servicebus/MessageAndSessionPump$MessgeRenewLockLoop.class */
    private static class MessgeRenewLockLoop extends RenewLockLoop {
        private IMessageReceiver innerReceiver;
        private MessageAndSessionPump messageAndSessionPump;
        private IMessage message;
        private Instant stopRenewalAt;
        private String messageIdentifier;
        ScheduledFuture<?> timerFuture;

        MessgeRenewLockLoop(IMessageReceiver iMessageReceiver, MessageAndSessionPump messageAndSessionPump, IMessage iMessage, Instant instant) {
            this.innerReceiver = iMessageReceiver;
            this.messageAndSessionPump = messageAndSessionPump;
            this.message = iMessage;
            this.stopRenewalAt = instant;
            this.messageIdentifier = String.format("message with locktoken : %s, sequence number : %s", this.message.getLockToken(), Long.valueOf(this.message.getSequenceNumber()));
        }

        @Override // com.microsoft.azure.servicebus.MessageAndSessionPump.RenewLockLoop
        protected ScheduledFuture<?> getTimerFuture() {
            return this.timerFuture;
        }

        @Override // com.microsoft.azure.servicebus.MessageAndSessionPump.RenewLockLoop
        protected void loop() {
            Duration nextRenewInterval;
            if (isCancelled() || (nextRenewInterval = getNextRenewInterval()) == null || nextRenewInterval.isNegative()) {
                return;
            }
            this.timerFuture = Timer.schedule(() -> {
                MessageAndSessionPump.TRACE_LOGGER.debug("Renewing lock on '{}'", this.messageIdentifier);
                this.innerReceiver.renewMessageLockAsync(this.message).handleAsync((instant, th) -> {
                    if (th == null) {
                        MessageAndSessionPump.TRACE_LOGGER.debug("Renewed lock on '{}'", this.messageIdentifier);
                        loop();
                        return null;
                    }
                    Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                    MessageAndSessionPump.TRACE_LOGGER.error("Renewing lock on '{}' failed", this.messageIdentifier, extractAsyncCompletionCause);
                    this.messageAndSessionPump.notifyExceptionToMessageHandler(extractAsyncCompletionCause, ExceptionPhase.RENEWMESSAGELOCK);
                    if ((extractAsyncCompletionCause instanceof MessageLockLostException) || (extractAsyncCompletionCause instanceof OperationCancelledException)) {
                        return null;
                    }
                    loop();
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            }, nextRenewInterval, TimerType.OneTimeRun);
        }

        private Duration getNextRenewInterval() {
            if (this.message.getLockedUntilUtc().isBefore(this.stopRenewalAt)) {
                return RenewLockLoop.getNextRenewInterval(this.message.getLockedUntilUtc(), this.messageIdentifier);
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/servicebus/MessageAndSessionPump$RenewLockLoop.class */
    public static abstract class RenewLockLoop {
        private boolean cancelled = false;

        protected RenewLockLoop() {
        }

        protected abstract void loop();

        protected abstract ScheduledFuture<?> getTimerFuture();

        protected boolean isCancelled() {
            return this.cancelled;
        }

        public void startLoop() {
            loop();
        }

        public void cancelLoop() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            ScheduledFuture<?> timerFuture = getTimerFuture();
            if (timerFuture == null || timerFuture.isDone()) {
                return;
            }
            timerFuture.cancel(true);
        }

        protected static Duration getNextRenewInterval(Instant instant, String str) {
            Duration between = Duration.between(Instant.now(), instant);
            if (between.isNegative()) {
                between = MessageAndSessionPump.MINIMUM_MESSAGE_LOCK_VALIDITY;
                MessageAndSessionPump.TRACE_LOGGER.warn("Lock of '{}' already expired. May be there is clock skew. Still trying to renew lock", str);
            }
            Duration dividedBy = between.dividedBy(2L).compareTo(MessageAndSessionPump.MAXIMUM_RENEW_LOCK_BUFFER) > 0 ? MessageAndSessionPump.MAXIMUM_RENEW_LOCK_BUFFER : between.dividedBy(2L);
            MessageAndSessionPump.TRACE_LOGGER.debug("Lock of '{}' is valid for '{}'. It will be renewed '{}' before it expires.", new Object[]{str, between, dividedBy});
            return between.minus(dividedBy);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/servicebus/MessageAndSessionPump$SessionRenewLockLoop.class */
    public static class SessionRenewLockLoop extends RenewLockLoop {
        private IMessageSession session;
        private MessageAndSessionPump messageAndSessionPump;
        private String sessionIdentifier;
        ScheduledFuture<?> timerFuture;

        SessionRenewLockLoop(IMessageSession iMessageSession, MessageAndSessionPump messageAndSessionPump) {
            this.session = iMessageSession;
            this.messageAndSessionPump = messageAndSessionPump;
            this.sessionIdentifier = String.format("session with id:%s", this.session.getSessionId());
        }

        @Override // com.microsoft.azure.servicebus.MessageAndSessionPump.RenewLockLoop
        protected ScheduledFuture<?> getTimerFuture() {
            return this.timerFuture;
        }

        @Override // com.microsoft.azure.servicebus.MessageAndSessionPump.RenewLockLoop
        protected void loop() {
            Duration nextRenewInterval;
            if (isCancelled() || (nextRenewInterval = RenewLockLoop.getNextRenewInterval(this.session.getLockedUntilUtc(), this.sessionIdentifier)) == null || nextRenewInterval.isNegative()) {
                return;
            }
            this.timerFuture = Timer.schedule(() -> {
                MessageAndSessionPump.TRACE_LOGGER.debug("Renewing lock on '{}'", this.sessionIdentifier);
                this.session.renewSessionLockAsync().handleAsync((r6, th) -> {
                    if (th == null) {
                        MessageAndSessionPump.TRACE_LOGGER.debug("Renewed lock on '{}'", this.sessionIdentifier);
                        loop();
                        return null;
                    }
                    Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                    MessageAndSessionPump.TRACE_LOGGER.error("Renewing lock on '{}' failed", this.sessionIdentifier, extractAsyncCompletionCause);
                    this.messageAndSessionPump.notifyExceptionToSessionHandler(extractAsyncCompletionCause, ExceptionPhase.RENEWSESSIONLOCK);
                    if ((extractAsyncCompletionCause instanceof SessionLockLostException) || (extractAsyncCompletionCause instanceof OperationCancelledException)) {
                        return null;
                    }
                    loop();
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            }, nextRenewInterval, TimerType.OneTimeRun);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/servicebus/MessageAndSessionPump$SessionTracker.class */
    public static class SessionTracker {
        private final int numberReceivingThreads;
        private final IMessageSession session;
        private final MessageAndSessionPump messageAndSessionPump;
        private final SessionRenewLockLoop sessionRenewLockLoop;
        private int waitingRetryThreads = 0;
        private CompletableFuture<Boolean> retryFuture;

        SessionTracker(MessageAndSessionPump messageAndSessionPump, IMessageSession iMessageSession, SessionRenewLockLoop sessionRenewLockLoop) {
            this.messageAndSessionPump = messageAndSessionPump;
            this.session = iMessageSession;
            this.sessionRenewLockLoop = sessionRenewLockLoop;
            this.numberReceivingThreads = messageAndSessionPump.sessionHandlerOptions.getMaxConcurrentCallsPerSession();
        }

        public IMessageSession getSession() {
            return this.session;
        }

        synchronized void notifyMessageReceived() {
            MessageAndSessionPump.TRACE_LOGGER.trace("Message received from session '{}'", this.session.getSessionId());
            if (this.retryFuture == null || this.retryFuture.isDone()) {
                return;
            }
            this.waitingRetryThreads = 0;
            this.retryFuture.complete(true);
        }

        synchronized CompletableFuture<Boolean> shouldRetryOnNoMessageOrException() {
            CompletableFuture completableFuture;
            if (this.retryFuture == null || this.retryFuture.isDone()) {
                this.retryFuture = new CompletableFuture<>();
            }
            this.waitingRetryThreads++;
            if (this.waitingRetryThreads == this.numberReceivingThreads) {
                MessageAndSessionPump.TRACE_LOGGER.info("No messages recevied by any receive call from session '{}'. Closing the session.", this.session.getSessionId());
                this.retryFuture.complete(false);
                ScheduledFuture<?> schedule = Timer.schedule(() -> {
                    MessageAndSessionPump.TRACE_LOGGER.warn("Closing session timed out. Cancelling loop to renew lock on session '{}'", this.session.getSessionId());
                    this.sessionRenewLockLoop.cancelLoop();
                }, this.messageAndSessionPump.sessionHandlerOptions.getMaxAutoRenewDuration(), TimerType.OneTimeRun);
                try {
                    completableFuture = MessageAndSessionPump.COMPLETED_FUTURE.thenComposeAsync(r4 -> {
                        return this.messageAndSessionPump.sessionHandler.OnCloseSessionAsync(this.session);
                    }, (Executor) this.messageAndSessionPump.customCodeExecutor);
                } catch (Exception e) {
                    MessageAndSessionPump.TRACE_LOGGER.error("Invocation of onCloseSession on session '{}' threw unexpected exception", this.session.getSessionId(), e);
                    completableFuture = new CompletableFuture();
                    completableFuture.completeExceptionally(e);
                }
                if (completableFuture == null) {
                    completableFuture = MessageAndSessionPump.COMPLETED_FUTURE;
                }
                completableFuture.handleAsync((r7, th) -> {
                    schedule.cancel(true);
                    if (th != null) {
                        Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                        MessageAndSessionPump.TRACE_LOGGER.error("onCloseSession on session '{}' threw exception", this.session.getSessionId(), extractAsyncCompletionCause);
                        this.messageAndSessionPump.notifyExceptionToSessionHandler(extractAsyncCompletionCause, ExceptionPhase.USERCALLBACK);
                    }
                    this.sessionRenewLockLoop.cancelLoop();
                    MessageAndSessionPump.TRACE_LOGGER.debug("Cancelled loop to renew lock on session '{}'", this.session.getSessionId());
                    this.session.closeAsync().handleAsync((r8, th) -> {
                        if (th != null) {
                            Throwable extractAsyncCompletionCause2 = ExceptionUtil.extractAsyncCompletionCause(th);
                            MessageAndSessionPump.TRACE_LOGGER.info("Closing session '{}' from entity '{}' failed", new Object[]{this.session.getSessionId(), this.messageAndSessionPump.entityPath, extractAsyncCompletionCause2});
                            this.messageAndSessionPump.notifyExceptionToSessionHandler(extractAsyncCompletionCause2, ExceptionPhase.SESSIONCLOSE);
                        } else {
                            MessageAndSessionPump.TRACE_LOGGER.info("Closed session '{}' from entity '{}'", this.session.getSessionId(), this.messageAndSessionPump.entityPath);
                        }
                        this.messageAndSessionPump.openSessions.remove(this.session.getSessionId());
                        this.messageAndSessionPump.acceptSessionAndPumpMessages();
                        return null;
                    }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            }
            return this.retryFuture;
        }
    }

    public MessageAndSessionPump(MessagingFactory messagingFactory, String str, MessagingEntityType messagingEntityType, ReceiveMode receiveMode) {
        super(StringUtil.getShortRandomString());
        this.handlerRegistered = false;
        this.factory = messagingFactory;
        this.entityPath = str;
        this.entityType = messagingEntityType;
        this.receiveMode = receiveMode;
        this.openSessions = new ConcurrentHashMap<>();
        this.prefetchCount = -1;
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    @Deprecated
    public void registerMessageHandler(IMessageHandler iMessageHandler) throws InterruptedException, ServiceBusException {
        registerMessageHandler(iMessageHandler, new MessageHandlerOptions());
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void registerMessageHandler(IMessageHandler iMessageHandler, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        registerMessageHandler(iMessageHandler, new MessageHandlerOptions(), executorService);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    @Deprecated
    public void registerMessageHandler(IMessageHandler iMessageHandler, MessageHandlerOptions messageHandlerOptions) throws InterruptedException, ServiceBusException {
        registerMessageHandler(iMessageHandler, messageHandlerOptions, ForkJoinPool.commonPool());
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void registerMessageHandler(IMessageHandler iMessageHandler, MessageHandlerOptions messageHandlerOptions, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        assertNonNulls(iMessageHandler, messageHandlerOptions, executorService);
        TRACE_LOGGER.info("Registering message handler on entity '{}' with '{}'", this.entityPath, messageHandlerOptions);
        setHandlerRegistered();
        this.messageHandler = iMessageHandler;
        this.messageHandlerOptions = messageHandlerOptions;
        this.customCodeExecutor = executorService;
        this.innerReceiver = ClientFactory.createMessageReceiverFromEntityPath(this.factory, this.entityPath, this.entityType, this.receiveMode);
        TRACE_LOGGER.info("Created MessageReceiver to entity '{}'", this.entityPath);
        if (this.prefetchCount != -1) {
            this.innerReceiver.setPrefetchCount(this.prefetchCount);
        }
        for (int i = 0; i < messageHandlerOptions.getMaxConcurrentCalls(); i++) {
            receiveAndPumpMessage();
        }
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    @Deprecated
    public void registerSessionHandler(ISessionHandler iSessionHandler) throws InterruptedException, ServiceBusException {
        registerSessionHandler(iSessionHandler, new SessionHandlerOptions());
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void registerSessionHandler(ISessionHandler iSessionHandler, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        registerSessionHandler(iSessionHandler, new SessionHandlerOptions(), executorService);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    @Deprecated
    public void registerSessionHandler(ISessionHandler iSessionHandler, SessionHandlerOptions sessionHandlerOptions) throws InterruptedException, ServiceBusException {
        registerSessionHandler(iSessionHandler, sessionHandlerOptions, ForkJoinPool.commonPool());
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void registerSessionHandler(ISessionHandler iSessionHandler, SessionHandlerOptions sessionHandlerOptions, ExecutorService executorService) throws InterruptedException, ServiceBusException {
        assertNonNulls(iSessionHandler, sessionHandlerOptions, executorService);
        TRACE_LOGGER.info("Registering session handler on entity '{}' with '{}'", this.entityPath, sessionHandlerOptions);
        setHandlerRegistered();
        this.sessionHandler = iSessionHandler;
        this.sessionHandlerOptions = sessionHandlerOptions;
        this.customCodeExecutor = executorService;
        for (int i = 0; i < sessionHandlerOptions.getMaxConcurrentSessions(); i++) {
            acceptSessionAndPumpMessages();
        }
    }

    private static void assertNonNulls(Object obj, Object obj2, ExecutorService executorService) {
        if (obj == null || obj2 == null || executorService == null) {
            throw new IllegalArgumentException("None of the arguments can be null.");
        }
    }

    private synchronized void setHandlerRegistered() {
        throwIfClosed(null);
        if (this.handlerRegistered) {
            throw new UnsupportedOperationException("MessageHandler or SessionHandler already registered.");
        }
        this.handlerRegistered = true;
    }

    private void receiveAndPumpMessage() {
        if (getIsClosingOrClosed()) {
            return;
        }
        this.innerReceiver.receiveAsync(this.messageHandlerOptions.getMessageWaitDuration()).handleAsync((iMessage, th) -> {
            MessgeRenewLockLoop messgeRenewLockLoop;
            CompletableFuture<Void> completableFuture;
            if (th != null) {
                Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                TRACE_LOGGER.error("Receiving message from entity '{}' failed.", this.entityPath, extractAsyncCompletionCause);
                notifyExceptionToMessageHandler(extractAsyncCompletionCause, ExceptionPhase.RECEIVE);
                receiveAndPumpMessage();
                return null;
            }
            if (iMessage == null) {
                TRACE_LOGGER.debug("Receive from entity '{}' returned no messages.", this.entityPath);
                receiveAndPumpMessage();
                return null;
            }
            TRACE_LOGGER.trace("Message with sequence number '{}' received from entity '{}'.", Long.valueOf(iMessage.getSequenceNumber()), this.entityPath);
            if (this.innerReceiver.getReceiveMode() == ReceiveMode.PEEKLOCK) {
                Instant plus = Instant.now().plus((TemporalAmount) this.messageHandlerOptions.getMaxAutoRenewDuration());
                messgeRenewLockLoop = new MessgeRenewLockLoop(this.innerReceiver, this, iMessage, plus);
                messgeRenewLockLoop.startLoop();
                TRACE_LOGGER.trace("Started loop to renew lock on message with sequence number '{}' until '{}'", Long.valueOf(iMessage.getSequenceNumber()), plus);
            } else {
                messgeRenewLockLoop = null;
            }
            try {
                TRACE_LOGGER.debug("Invoking onMessage with message containing sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
                completableFuture = COMPLETED_FUTURE.thenComposeAsync(r5 -> {
                    return this.messageHandler.onMessageAsync(iMessage);
                }, (Executor) this.customCodeExecutor);
            } catch (Exception e) {
                TRACE_LOGGER.error("Invocation of onMessage with message containing sequence number '{}' threw unexpected exception", Long.valueOf(iMessage.getSequenceNumber()), e);
                completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(e);
            }
            if (completableFuture == null) {
                completableFuture = COMPLETED_FUTURE;
            }
            MessgeRenewLockLoop messgeRenewLockLoop2 = messgeRenewLockLoop;
            completableFuture.handleAsync((BiFunction<? super Void, Throwable, ? extends U>) (r8, th) -> {
                ExceptionPhase exceptionPhase;
                CompletableFuture<Void> completedFuture;
                if (th != null) {
                    th = ExceptionUtil.extractAsyncCompletionCause(th);
                    TRACE_LOGGER.error("onMessage with message containing sequence number '{}' threw exception", Long.valueOf(iMessage.getSequenceNumber()), th);
                    notifyExceptionToMessageHandler(th, ExceptionPhase.USERCALLBACK);
                }
                if (this.innerReceiver.getReceiveMode() != ReceiveMode.PEEKLOCK) {
                    receiveAndPumpMessage();
                    return null;
                }
                if (messgeRenewLockLoop2 != null) {
                    messgeRenewLockLoop2.cancelLoop();
                    TRACE_LOGGER.trace("Cancelled loop to renew lock on message with sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
                }
                if (th == null) {
                    exceptionPhase = ExceptionPhase.COMPLETE;
                    if (this.messageHandlerOptions.isAutoComplete()) {
                        TRACE_LOGGER.debug("Completing message with sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
                        completedFuture = this.innerReceiver.completeAsync(iMessage.getLockToken());
                    } else {
                        completedFuture = CompletableFuture.completedFuture(null);
                    }
                } else {
                    exceptionPhase = ExceptionPhase.ABANDON;
                    if (this.messageHandlerOptions.isAutoComplete()) {
                        TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
                        completedFuture = this.innerReceiver.abandonAsync(iMessage.getLockToken());
                    } else {
                        completedFuture = CompletableFuture.completedFuture(null);
                    }
                }
                ExceptionPhase exceptionPhase2 = exceptionPhase;
                completedFuture.handleAsync((r11, th) -> {
                    if (th != null) {
                        Throwable extractAsyncCompletionCause2 = ExceptionUtil.extractAsyncCompletionCause(th);
                        Logger logger = TRACE_LOGGER;
                        Object[] objArr = new Object[3];
                        objArr[0] = exceptionPhase2 == ExceptionPhase.COMPLETE ? "Completing" : "Abandoning";
                        objArr[1] = Long.valueOf(iMessage.getSequenceNumber());
                        objArr[2] = extractAsyncCompletionCause2;
                        logger.error("{} message with sequence number '{}' failed", objArr);
                        notifyExceptionToMessageHandler(extractAsyncCompletionCause2, exceptionPhase2);
                    }
                    receiveAndPumpMessage();
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
                return null;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            return null;
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void acceptSessionAndPumpMessages() {
        if (getIsClosingOrClosed()) {
            return;
        }
        TRACE_LOGGER.debug("Accepting a session from entity '{}'", this.entityPath);
        ClientFactory.acceptSessionFromEntityPathAsync(this.factory, this.entityPath, this.entityType, (String) null, this.receiveMode).handleAsync((iMessageSession, th) -> {
            if (th != null) {
                Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                if (!(extractAsyncCompletionCause instanceof TimeoutException)) {
                    TRACE_LOGGER.error("Accepting a session from entity '{}' failed.", this.entityPath, extractAsyncCompletionCause);
                    notifyExceptionToSessionHandler(extractAsyncCompletionCause, ExceptionPhase.ACCEPTSESSION);
                }
                if (extractAsyncCompletionCause instanceof OperationCancelledException) {
                    return null;
                }
                TRACE_LOGGER.debug("AcceptSession from entity '{}' will be retried after '{}'.", this.entityPath, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION);
                Timer.schedule(() -> {
                    acceptSessionAndPumpMessages();
                }, SLEEP_DURATION_ON_ACCEPT_SESSION_EXCEPTION, TimerType.OneTimeRun);
                return null;
            }
            TRACE_LOGGER.debug("Accepted a session '{}' from entity '{}'", iMessageSession.getSessionId(), this.entityPath);
            if (this.prefetchCount != -1) {
                try {
                    iMessageSession.setPrefetchCount(this.prefetchCount);
                } catch (ServiceBusException e) {
                }
            }
            this.openSessions.put(iMessageSession.getSessionId(), iMessageSession);
            SessionRenewLockLoop sessionRenewLockLoop = new SessionRenewLockLoop(iMessageSession, this);
            sessionRenewLockLoop.startLoop();
            TRACE_LOGGER.debug("Started loop to renew lock on session '{}'", iMessageSession.getSessionId());
            SessionTracker sessionTracker = new SessionTracker(this, iMessageSession, sessionRenewLockLoop);
            for (int i = 0; i < this.sessionHandlerOptions.getMaxConcurrentCallsPerSession(); i++) {
                receiveFromSessionAndPumpMessage(sessionTracker);
            }
            return null;
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) {
        if (getIsClosingOrClosed()) {
            return;
        }
        IMessageSession session = sessionTracker.getSession();
        session.receiveAsync(this.sessionHandlerOptions.getMessageWaitDuration()).handleAsync((iMessage, th) -> {
            CompletableFuture<Void> completableFuture;
            if (th != null) {
                Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                TRACE_LOGGER.error("Receiving message from session '{}' on entity '{}' failed.", new Object[]{session.getSessionId(), this.entityPath, extractAsyncCompletionCause});
                notifyExceptionToSessionHandler(extractAsyncCompletionCause, ExceptionPhase.RECEIVE);
                sessionTracker.shouldRetryOnNoMessageOrException().thenAcceptAsync(bool -> {
                    if (bool.booleanValue()) {
                        receiveFromSessionAndPumpMessage(sessionTracker);
                    }
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
                return null;
            }
            if (iMessage == null) {
                TRACE_LOGGER.debug("Receive from from session '{}' on entity '{}' returned no messages.", session.getSessionId(), this.entityPath);
                sessionTracker.shouldRetryOnNoMessageOrException().thenAcceptAsync(bool2 -> {
                    if (bool2.booleanValue()) {
                        receiveFromSessionAndPumpMessage(sessionTracker);
                    }
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
                return null;
            }
            TRACE_LOGGER.trace("Message with sequence number '{}' received from session '{}' on entity '{}'.", new Object[]{Long.valueOf(iMessage.getSequenceNumber()), session.getSessionId(), this.entityPath});
            sessionTracker.notifyMessageReceived();
            ScheduledFuture<?> schedule = Timer.schedule(() -> {
                TRACE_LOGGER.warn("onMessage task timed out. Cancelling loop to renew lock on session '{}'", session.getSessionId());
                sessionTracker.sessionRenewLockLoop.cancelLoop();
            }, this.sessionHandlerOptions.getMaxAutoRenewDuration(), TimerType.OneTimeRun);
            TRACE_LOGGER.debug("Invoking onMessage with message containing sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
            try {
                completableFuture = COMPLETED_FUTURE.thenComposeAsync(r7 -> {
                    return this.sessionHandler.onMessageAsync(session, iMessage);
                }, (Executor) this.customCodeExecutor);
            } catch (Exception e) {
                TRACE_LOGGER.error("Invocation of onMessage with message containing sequence number '{}' threw unexpected exception", Long.valueOf(iMessage.getSequenceNumber()), e);
                completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(e);
            }
            if (completableFuture == null) {
                completableFuture = COMPLETED_FUTURE;
            }
            completableFuture.handleAsync((BiFunction<? super Void, Throwable, ? extends U>) (r11, th) -> {
                ExceptionPhase exceptionPhase;
                CompletableFuture<Void> completedFuture;
                schedule.cancel(true);
                if (th != null) {
                    th = ExceptionUtil.extractAsyncCompletionCause(th);
                    TRACE_LOGGER.error("onMessage with message containing sequence number '{}' threw exception", Long.valueOf(iMessage.getSequenceNumber()), th);
                    notifyExceptionToSessionHandler(th, ExceptionPhase.USERCALLBACK);
                }
                if (this.receiveMode != ReceiveMode.PEEKLOCK) {
                    receiveFromSessionAndPumpMessage(sessionTracker);
                    return null;
                }
                if (th == null) {
                    exceptionPhase = ExceptionPhase.COMPLETE;
                    if (this.sessionHandlerOptions.isAutoComplete()) {
                        TRACE_LOGGER.debug("Completing message with sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
                        completedFuture = session.completeAsync(iMessage.getLockToken());
                    } else {
                        completedFuture = CompletableFuture.completedFuture(null);
                    }
                } else {
                    exceptionPhase = ExceptionPhase.ABANDON;
                    if (this.sessionHandlerOptions.isAutoComplete()) {
                        TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", Long.valueOf(iMessage.getSequenceNumber()));
                        completedFuture = session.abandonAsync(iMessage.getLockToken());
                    } else {
                        completedFuture = CompletableFuture.completedFuture(null);
                    }
                }
                ExceptionPhase exceptionPhase2 = exceptionPhase;
                completedFuture.handleAsync((r12, th) -> {
                    if (th != null) {
                        Throwable extractAsyncCompletionCause2 = ExceptionUtil.extractAsyncCompletionCause(th);
                        Logger logger = TRACE_LOGGER;
                        Object[] objArr = new Object[3];
                        objArr[0] = exceptionPhase2 == ExceptionPhase.COMPLETE ? "Completing" : "Abandoning";
                        objArr[1] = Long.valueOf(iMessage.getSequenceNumber());
                        objArr[2] = extractAsyncCompletionCause2;
                        logger.error("{} message with sequence number '{}' failed", objArr);
                        notifyExceptionToSessionHandler(extractAsyncCompletionCause2, exceptionPhase2);
                    }
                    receiveFromSessionAndPumpMessage(sessionTracker);
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
                return null;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            return null;
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.microsoft.azure.servicebus.InitializableEntity
    public CompletableFuture<Void> initializeAsync() {
        return CompletableFuture.completedFuture(null);
    }

    @Override // com.microsoft.azure.servicebus.primitives.ClientEntity
    protected CompletableFuture<Void> onClose() {
        TRACE_LOGGER.info("Closing message and session pump on entity '{}'", this.entityPath);
        CompletableFuture[] completableFutureArr = new CompletableFuture[this.openSessions.size() + 1];
        int i = 0;
        Iterator<IMessageSession> it = this.openSessions.values().iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            completableFutureArr[i2] = it.next().closeAsync();
        }
        completableFutureArr[i] = this.innerReceiver == null ? CompletableFuture.completedFuture(null) : this.innerReceiver.closeAsync();
        return CompletableFuture.allOf(completableFutureArr);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void abandon(UUID uuid) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.abandon(uuid);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void abandon(UUID uuid, Map<String, Object> map) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.abandon(uuid, map);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> abandonAsync(UUID uuid) {
        checkInnerReceiveCreated();
        return this.innerReceiver.abandonAsync(uuid);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> abandonAsync(UUID uuid, Map<String, Object> map) {
        checkInnerReceiveCreated();
        return this.innerReceiver.abandonAsync(uuid, map);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void complete(UUID uuid) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.complete(uuid);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> completeAsync(UUID uuid) {
        checkInnerReceiveCreated();
        return this.innerReceiver.completeAsync(uuid);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void defer(UUID uuid) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.defer(uuid);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void defer(UUID uuid, Map<String, Object> map) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.defer(uuid, map);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void deadLetter(UUID uuid) throws InterruptedException, ServiceBusException {
        this.innerReceiver.deadLetter(uuid);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void deadLetter(UUID uuid, Map<String, Object> map) throws InterruptedException, ServiceBusException {
        this.innerReceiver.deadLetter(uuid, map);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void deadLetter(UUID uuid, String str, String str2) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.deadLetter(uuid, str, str2);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void deadLetter(UUID uuid, String str, String str2, Map<String, Object> map) throws InterruptedException, ServiceBusException {
        checkInnerReceiveCreated();
        this.innerReceiver.deadLetter(uuid, str, str2, map);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> deadLetterAsync(UUID uuid) {
        checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(uuid);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> deadLetterAsync(UUID uuid, Map<String, Object> map) {
        checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(uuid, map);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> deadLetterAsync(UUID uuid, String str, String str2) {
        checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(uuid, str, str2);
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public CompletableFuture<Void> deadLetterAsync(UUID uuid, String str, String str2, Map<String, Object> map) {
        checkInnerReceiveCreated();
        return this.innerReceiver.deadLetterAsync(uuid, str, str2, map);
    }

    private void checkInnerReceiveCreated() {
        if (this.innerReceiver == null) {
            throw new UnsupportedOperationException("Receiver not created. Registering a MessageHandler creates a receiver.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyExceptionToSessionHandler(Throwable th, ExceptionPhase exceptionPhase) {
        if ((th instanceof IllegalStateException) && getIsClosingOrClosed()) {
            return;
        }
        this.customCodeExecutor.execute(() -> {
            this.sessionHandler.notifyException(th, exceptionPhase);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyExceptionToMessageHandler(Throwable th, ExceptionPhase exceptionPhase) {
        if ((th instanceof IllegalStateException) && getIsClosingOrClosed()) {
            return;
        }
        this.customCodeExecutor.execute(() -> {
            this.messageHandler.notifyException(th, exceptionPhase);
        });
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public int getPrefetchCount() {
        return this.prefetchCount;
    }

    @Override // com.microsoft.azure.servicebus.IMessageAndSessionPump
    public void setPrefetchCount(int i) throws ServiceBusException {
        if (i < 0) {
            throw new IllegalArgumentException("Prefetch count cannot be negative.");
        }
        this.prefetchCount = i;
        if (this.innerReceiver != null) {
            this.innerReceiver.setPrefetchCount(i);
        }
        for (IMessageSession iMessageSession : (IMessageSession[]) this.openSessions.values().toArray(new IMessageSession[0])) {
            try {
                iMessageSession.setPrefetchCount(i);
            } catch (IllegalStateException e) {
            }
        }
    }
}
