/*
 * Decompiled with CFR 0.152.
 */
package com.amazon.sqs.javamessaging;

import com.amazon.sqs.javamessaging.SQSMessageConsumer;
import com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazon.sqs.javamessaging.acknowledge.AcknowledgeMode;
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQSSessionCallbackScheduler
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SQSSessionCallbackScheduler.class);
    protected ArrayDeque<SQSSession.CallbackEntry> callbackQueue;
    private AcknowledgeMode acknowledgeMode;
    private SQSSession session;
    private NegativeAcknowledger negativeAcknowledger;
    private final Acknowledger acknowledger;
    private SQSMessageConsumer consumerCloseAfterCallback;
    private volatile boolean closed = false;

    SQSSessionCallbackScheduler(SQSSession session, AcknowledgeMode acknowledgeMode, Acknowledger acknowledger, NegativeAcknowledger negativeAcknowledger) {
        this.session = session;
        this.acknowledgeMode = acknowledgeMode;
        this.acknowledger = acknowledger;
        this.negativeAcknowledger = negativeAcknowledger;
        this.callbackQueue = new ArrayDeque();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void close() {
        this.closed = true;
        ArrayDeque<SQSSession.CallbackEntry> arrayDeque = this.callbackQueue;
        synchronized (arrayDeque) {
            this.callbackQueue.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    @Override
    public void run() {
        block36: {
            callbackEntry = null;
            try {
                while (true) {
                    try {
                        while (true) lbl-1000:
                        // 6 sources

                        {
                            if (this.closed) {
                                break block36;
                            }
                            var2_2 = this.callbackQueue;
                            synchronized (var2_2) {
                                callbackEntry = this.callbackQueue.pollFirst();
                                if (callbackEntry == null) {
                                    block37: {
                                        try {
                                            this.callbackQueue.wait();
                                        }
                                        catch (InterruptedException e) {
                                            if (!SQSSessionCallbackScheduler.LOG.isDebugEnabled()) break block37;
                                            SQSSessionCallbackScheduler.LOG.debug("wait on empty callback queue interrupted: " + e.getMessage());
                                        }
                                    }
                                    continue;
                                }
                            }
                            messageListener = callbackEntry.getMessageListener();
                            messageManager = callbackEntry.getMessageManager();
                            message = (SQSMessage)messageManager.getMessage();
                            messageConsumer = messageManager.getPrefetchManager().getMessageConsumer();
                            if (messageConsumer.isClosed()) {
                                this.nackReceivedMessage(message);
                                continue;
                            }
                            try {
                                this.session.startingCallback(messageConsumer);
                            }
                            catch (JMSException e) {
                                if (SQSSessionCallbackScheduler.LOG.isDebugEnabled()) {
                                    SQSSessionCallbackScheduler.LOG.debug("Not running callback: " + e.getMessage());
                                }
                                break block36;
                            }
                            try {
                                block38: {
                                    messageManager.getPrefetchManager().messageDispatched();
                                    ackMode = this.acknowledgeMode.getOriginalAcknowledgeMode();
                                    tryNack = true;
                                    try {
                                        block39: {
                                            if (messageListener == null) break block38;
                                            if (ackMode != 1) {
                                                this.acknowledger.notifyMessageReceived(message);
                                            }
                                            callbackFailed = false;
                                            try {
                                                messageListener.onMessage((Message)message);
                                                if (callbackFailed) break block38;
                                                if (ackMode != 1) break block39;
                                            }
                                            catch (Throwable ex) {
                                                try {
                                                    SQSSessionCallbackScheduler.LOG.info("Exception thrown from onMessage callback for message " + message.getSQSMessageId(), ex);
                                                    callbackFailed = true;
                                                    break block38;
                                                }
                                                catch (Throwable var10_15) {
                                                    throw var10_15;
                                                }
                                                finally {
                                                    if (!callbackFailed) {
                                                        if (ackMode == 1) {
                                                            message.acknowledge();
                                                        }
                                                        tryNack = false;
                                                    }
                                                }
                                            }
                                            message.acknowledge();
                                        }
                                        tryNack = false;
                                    }
                                    catch (JMSException ex) {
                                        SQSSessionCallbackScheduler.LOG.warn("Unable to complete message dispatch for the message " + message.getSQSMessageId(), (Throwable)ex);
                                    }
                                    finally {
                                        if (tryNack) {
                                            this.nackReceivedMessage(message);
                                        }
                                    }
                                }
                                if (this.consumerCloseAfterCallback == null) ** GOTO lbl-1000
                                this.consumerCloseAfterCallback.doClose();
                                this.consumerCloseAfterCallback = null;
                            }
                            finally {
                                this.session.finishedCallback();
                                messageManager.getPrefetchManager().messageListenerReady();
                                continue;
                            }
                            break;
                        }
                    }
                    catch (Throwable ex) {
                        SQSSessionCallbackScheduler.LOG.error("Unexpected exception thrown during the run of the scheduled callback", ex);
                        continue;
                    }
                    ** GOTO lbl-1000
                    break;
                }
            }
            finally {
                if (callbackEntry != null) {
                    this.nackReceivedMessage((SQSMessage)callbackEntry.getMessageManager().getMessage());
                }
                this.nackQueuedMessages();
            }
        }
    }

    void setConsumerCloseAfterCallback(SQSMessageConsumer messageConsumer) {
        this.consumerCloseAfterCallback = messageConsumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void scheduleCallBacks(MessageListener messageListener, List<SQSMessageConsumerPrefetch.MessageManager> messageManagers) {
        ArrayDeque<SQSSession.CallbackEntry> arrayDeque = this.callbackQueue;
        synchronized (arrayDeque) {
            try {
                for (SQSMessageConsumerPrefetch.MessageManager messageManager : messageManagers) {
                    SQSSession.CallbackEntry callbackEntry = new SQSSession.CallbackEntry(messageListener, messageManager);
                    this.callbackQueue.addLast(callbackEntry);
                }
            }
            finally {
                this.callbackQueue.notify();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void nackQueuedMessages() {
        ArrayDeque<SQSSession.CallbackEntry> arrayDeque = this.callbackQueue;
        synchronized (arrayDeque) {
            try {
                ArrayList<SQSMessageIdentifier> nackMessageIdentifiers = new ArrayList<SQSMessageIdentifier>();
                while (!this.callbackQueue.isEmpty()) {
                    SQSMessage nackMessage = (SQSMessage)this.callbackQueue.pollFirst().getMessageManager().getMessage();
                    nackMessageIdentifiers.add(SQSMessageIdentifier.fromSQSMessage(nackMessage));
                }
                if (!nackMessageIdentifiers.isEmpty()) {
                    this.negativeAcknowledger.bulkAction(nackMessageIdentifiers, nackMessageIdentifiers.size());
                }
            }
            catch (JMSException e) {
                LOG.warn("Caught exception while nacking the remaining messages on session callback queue", (Throwable)e);
            }
        }
    }

    private void nackReceivedMessage(SQSMessage message) {
        try {
            SQSMessageIdentifier messageIdentifier = SQSMessageIdentifier.fromSQSMessage(message);
            ArrayList<SQSMessageIdentifier> nackMessageIdentifiers = new ArrayList<SQSMessageIdentifier>();
            nackMessageIdentifiers.add(messageIdentifier);
            if (messageIdentifier.getGroupId() != null) {
                Map<String, Set<String>> queueToGroupsMapping = Collections.singletonMap(messageIdentifier.getQueueUrl(), Collections.singleton(messageIdentifier.getGroupId()));
                nackMessageIdentifiers.addAll(this.purgeScheduledCallbacksForQueuesAndGroups(queueToGroupsMapping));
            }
            this.negativeAcknowledger.bulkAction(nackMessageIdentifiers, nackMessageIdentifiers.size());
        }
        catch (JMSException e) {
            LOG.warn("Unable to nack the message " + message.getSQSMessageId(), (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<SQSMessageIdentifier> purgeScheduledCallbacksForQueuesAndGroups(Map<String, Set<String>> queueToGroupsMapping) throws JMSException {
        ArrayList<SQSMessageIdentifier> purgedCallbacks = new ArrayList<SQSMessageIdentifier>();
        ArrayDeque<SQSSession.CallbackEntry> arrayDeque = this.callbackQueue;
        synchronized (arrayDeque) {
            Iterator<SQSSession.CallbackEntry> callbackIterator = this.callbackQueue.iterator();
            while (callbackIterator.hasNext()) {
                SQSSession.CallbackEntry callbackEntry = callbackIterator.next();
                SQSMessageIdentifier pendingCallbackIdentifier = SQSMessageIdentifier.fromSQSMessage((SQSMessage)callbackEntry.getMessageManager().getMessage());
                Set<String> affectedGroupsInQueue = queueToGroupsMapping.get(pendingCallbackIdentifier.getQueueUrl());
                if (affectedGroupsInQueue == null || !affectedGroupsInQueue.contains(pendingCallbackIdentifier.getGroupId())) continue;
                purgedCallbacks.add(pendingCallbackIdentifier);
                callbackIterator.remove();
                callbackEntry.getMessageManager().getPrefetchManager().messageDispatched();
            }
        }
        return purgedCallbacks;
    }
}

