/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class BasicMessageConsumer<U>
extends Closeable
implements MessageConsumer {
    private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
    protected final AMQConnection _connection;
    protected final String _messageSelector;
    private final boolean _noLocal;
    protected AMQDestination _destination;
    private final AtomicBoolean _receiving = new AtomicBoolean(false);
    private final AtomicReference<MessageListener> _messageListener = new AtomicReference();
    protected int _consumerTag;
    protected final int _channelId;
    protected final BlockingQueue _synchronousQueue;
    protected final MessageFactoryRegistry _messageFactory;
    protected final AMQSession _session;
    protected final AMQProtocolHandler _protocolHandler;
    private final FieldTable _arguments;
    private final int _prefetchHigh;
    private final int _prefetchLow;
    protected boolean _exclusive;
    protected final int _acknowledgeMode;
    private int _outstanding;
    private boolean _dups_ok_acknowledge_send;
    private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue();
    private long _lastAcked;
    private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
    private final Object _commitLock = new Object();
    private Thread _receivingThread;
    private AMQShortString _queuename;
    private final boolean _autoClose;
    private final boolean _noConsume;
    private List<StackTraceElement> _closedStack = null;

    protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) {
        this._channelId = channelId;
        this._connection = connection;
        this._messageSelector = messageSelector;
        this._noLocal = noLocal;
        this._destination = destination;
        this._messageFactory = messageFactory;
        this._session = session;
        this._protocolHandler = protocolHandler;
        this._arguments = arguments;
        this._prefetchHigh = prefetchHigh;
        this._prefetchLow = prefetchLow;
        this._exclusive = exclusive;
        this._synchronousQueue = new LinkedBlockingQueue();
        this._autoClose = autoClose;
        this._noConsume = noConsume;
        this._acknowledgeMode = this._noConsume ? 257 : acknowledgeMode;
    }

    public AMQDestination getDestination() {
        return this._destination;
    }

    public String getMessageSelector() throws JMSException {
        this.checkPreConditions();
        return this._messageSelector;
    }

    public MessageListener getMessageListener() throws JMSException {
        this.checkPreConditions();
        return this._messageListener.get();
    }

    public int getAcknowledgeMode() {
        return this._acknowledgeMode;
    }

    protected boolean isMessageListenerSet() {
        return this._messageListener.get() != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setMessageListener(MessageListener messageListener) throws JMSException {
        this.checkPreConditions();
        if (!this._session.getAMQConnection().started()) {
            this._messageListener.set(messageListener);
            this._session.setHasMessageListeners();
            if (_logger.isDebugEnabled()) {
                _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + this._destination);
            }
        } else {
            if (this._receiving.get()) {
                throw new IllegalStateException("Another thread is already receiving synchronously.");
            }
            if (!this._messageListener.compareAndSet(null, messageListener)) {
                throw new IllegalStateException("Attempt to alter listener while session is started.");
            }
            _logger.debug("Message listener set for destination " + this._destination);
            if (messageListener != null) {
                AMQSession aMQSession = this._session;
                synchronized (aMQSession) {
                    this._messageListener.set(messageListener);
                    this._session.setHasMessageListeners();
                    this._session.startDispatcherIfNecessary();
                    Object o = this._synchronousQueue.poll();
                    while (o != null) {
                        this.notifyMessage((AbstractJMSMessage)o);
                        o = this._synchronousQueue.poll();
                    }
                }
            }
        }
    }

    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException {
        if (this._session.getAcknowledgeMode() == 2) {
            this._session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
        }
        this._session.setInRecovery(false);
        this.preDeliver(jmsMsg);
    }

    private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException {
        if (this._connection.isFailingOver()) {
            if (immediate) {
                return false;
            }
            this._connection.blockUntilNotFailingOver();
        }
        if (!this._receiving.compareAndSet(false, true)) {
            throw new IllegalStateException("Another thread is already receiving.");
        }
        if (this.isMessageListenerSet()) {
            throw new IllegalStateException("A listener has already been set.");
        }
        this._receivingThread = Thread.currentThread();
        return true;
    }

    private void releaseReceiving() {
        this._receiving.set(false);
        this._receivingThread = null;
    }

    public FieldTable getArguments() {
        return this._arguments;
    }

    public int getPrefetch() {
        return this._prefetchHigh;
    }

    public int getPrefetchHigh() {
        return this._prefetchHigh;
    }

    public int getPrefetchLow() {
        return this._prefetchLow;
    }

    public boolean isNoLocal() {
        return this._noLocal;
    }

    public boolean isExclusive() {
        return this._exclusive;
    }

    public boolean isReceiving() {
        return this._receiving.get();
    }

    public Message receive() throws JMSException {
        return this.receive(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receive(long l) throws JMSException {
        block8: {
            this.checkPreConditions();
            try {
                this.acquireReceiving(false);
            }
            catch (InterruptedException e) {
                _logger.warn("Interrupted acquire: " + e);
                if (!this.isClosed()) break block8;
                return null;
            }
        }
        this._session.startDispatcherIfNecessary();
        try {
            Object o = this.getMessageFromQueue(l);
            AbstractJMSMessage m = this.returnMessageOrThrow(o);
            if (m != null) {
                this.preApplicationProcessing(m);
                this.postDeliver(m);
            }
            AbstractJMSMessage abstractJMSMessage = m;
            return abstractJMSMessage;
        }
        catch (InterruptedException e) {
            _logger.warn("Interrupted: " + e);
            Message message = null;
            return message;
        }
        finally {
            this.releaseReceiving();
        }
    }

    public Object getMessageFromQueue(long l) throws InterruptedException {
        Object o = l > 0L ? this._synchronousQueue.poll(l, TimeUnit.MILLISECONDS) : (l < 0L ? this._synchronousQueue.poll() : this._synchronousQueue.take());
        return o;
    }

    abstract Message receiveBrowse() throws JMSException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Message receiveNoWait() throws JMSException {
        this.checkPreConditions();
        try {
            if (!this.acquireReceiving(true)) {
                return null;
            }
        }
        catch (InterruptedException e) {
            return null;
        }
        this._session.startDispatcherIfNecessary();
        try {
            Object o = this.getMessageFromQueue(-1L);
            AbstractJMSMessage m = this.returnMessageOrThrow(o);
            if (m != null) {
                this.preApplicationProcessing(m);
                this.postDeliver(m);
            }
            AbstractJMSMessage abstractJMSMessage = m;
            return abstractJMSMessage;
        }
        catch (InterruptedException e) {
            _logger.warn("Interrupted: " + e);
            Message message = null;
            return message;
        }
        finally {
            this.releaseReceiving();
        }
    }

    private AbstractJMSMessage returnMessageOrThrow(Object o) throws JMSException {
        if (o instanceof Throwable) {
            JMSException e = new JMSException("Message consumer forcibly closed due to error: " + o);
            e.initCause((Throwable)o);
            if (o instanceof Exception) {
                e.setLinkedException((Exception)o);
            }
            throw e;
        }
        if (o instanceof CloseConsumerMessage) {
            this._closed.set(true);
            this.deregisterConsumer();
            return null;
        }
        return (AbstractJMSMessage)o;
    }

    @Override
    public void close() throws JMSException {
        this.close(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(boolean sendClose) throws JMSException {
        if (_logger.isInfoEnabled()) {
            _logger.info("Closing consumer:" + this.debugIdentity());
        }
        if (!this._closed.getAndSet(true)) {
            this._closing.set(true);
            if (_logger.isDebugEnabled()) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                if (this._closedStack != null) {
                    _logger.debug(this._consumerTag + " previously:" + this._closedStack.toString());
                } else {
                    this._closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
                }
            }
            if (sendClose) {
                Object object = this._connection.getFailoverMutex();
                synchronized (object) {
                    try {
                        if (!this._session.isClosed() || this._session.isClosing()) {
                            this.sendCancel();
                            this.cleanupQueue();
                        }
                    }
                    catch (AMQException e) {
                        throw new JMSAMQException("Error closing consumer: " + (Object)((Object)e), (Exception)((Object)e));
                    }
                    catch (FailoverException e) {
                        throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
                    }
                }
            }
            this.deregisterConsumer();
            if (this._messageListener != null && this._receiving.get()) {
                if (_logger.isInfoEnabled()) {
                    _logger.info("Interrupting thread: " + this._receivingThread);
                }
                this._receivingThread.interrupt();
            }
        }
    }

    abstract void sendCancel() throws AMQException, FailoverException;

    abstract void cleanupQueue() throws AMQException, FailoverException;

    void markClosed() {
        this._closed.set(true);
        if (_logger.isDebugEnabled()) {
            if (this._closedStack != null) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                _logger.debug(this._consumerTag + " markClosed():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
                _logger.debug(this._consumerTag + " previously:" + this._closedStack.toString());
            } else {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                this._closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
            }
        }
        this.deregisterConsumer();
    }

    public void notifyCloseMessage(CloseConsumerMessage closeMessage) {
        if (this.isMessageListenerSet()) {
            _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
        } else {
            try {
                this._synchronousQueue.put(closeMessage);
            }
            catch (InterruptedException e) {
                _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,but we shouldn't have close yet");
            }
        }
    }

    void notifyMessage(U messageFrame) {
        if (messageFrame instanceof CloseConsumerMessage) {
            this.notifyCloseMessage((CloseConsumerMessage)messageFrame);
            return;
        }
        try {
            AbstractJMSMessage jmsMessage = this.createJMSMessageFromUnprocessedMessage(this._session.getMessageDelegateFactory(), messageFrame);
            if (_logger.isDebugEnabled()) {
                _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
            }
            this.notifyMessage(jmsMessage);
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("SynchronousQueue.put interupted. Usually result of connection closing");
            }
            _logger.error("Caught exception (dump follows) - ignoring...", (Throwable)e);
        }
    }

    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory var1, U var2) throws Exception;

    public void notifyMessage(AbstractJMSMessage jmsMessage) {
        try {
            if (this.isMessageListenerSet()) {
                this.preApplicationProcessing(jmsMessage);
                this.getMessageListener().onMessage((Message)jmsMessage);
                this.postDeliver(jmsMessage);
            } else {
                this._synchronousQueue.put(jmsMessage);
            }
        }
        catch (Exception e) {
            if (e instanceof InterruptedException) {
                _logger.info("reNotification : SynchronousQueue.put interupted. Usually result of connection closing");
            }
            _logger.error("reNotification : Caught exception (dump follows) - ignoring...", (Throwable)e);
        }
    }

    void preDeliver(AbstractJMSMessage msg) {
        switch (this._acknowledgeMode) {
            case 258: {
                this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
                break;
            }
            case 2: {
                msg.setAMQSession(this._session);
                break;
            }
            case 0: {
                if (this.isNoConsume()) {
                    this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
                    break;
                }
                this._session.addDeliveredMessage(msg.getDeliveryTag());
                this._session.markDirty();
            }
        }
    }

    void postDeliver(AbstractJMSMessage msg) throws JMSException {
        switch (this._acknowledgeMode) {
            case 2: {
                if (this.isNoConsume()) {
                    this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
                }
                this._session.markDirty();
                break;
            }
            case 1: 
            case 3: {
                if (this._session.isInRecovery()) break;
                this._session.acknowledgeMessage(msg.getDeliveryTag(), false);
            }
        }
    }

    Long getLastDelivered() {
        if (!this._receivedDeliveryTags.isEmpty()) {
            Long lastDeliveryTag = this._receivedDeliveryTags.poll();
            while (!this._receivedDeliveryTags.isEmpty()) {
                lastDeliveryTag = this._receivedDeliveryTags.poll();
            }
            assert (this._receivedDeliveryTags.isEmpty());
            return lastDeliveryTag;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void acknowledgeDelivered() {
        Object object = this._commitLock;
        synchronized (object) {
            ArrayList<Long> tagsToAck = new ArrayList<Long>();
            while (!this._receivedDeliveryTags.isEmpty()) {
                tagsToAck.add(this._receivedDeliveryTags.poll());
            }
            Collections.sort(tagsToAck);
            long prevAcked = this._lastAcked;
            long oldAckPoint = -1L;
            while (oldAckPoint != prevAcked) {
                oldAckPoint = prevAcked;
                Iterator tagsToAckIterator = tagsToAck.iterator();
                while (tagsToAckIterator.hasNext() && (Long)tagsToAckIterator.next() == prevAcked + 1L) {
                    tagsToAckIterator.remove();
                    ++prevAcked;
                }
                Iterator previousAckIterator = this._previouslyAcked.iterator();
                while (previousAckIterator.hasNext() && (Long)previousAckIterator.next() == prevAcked + 1L) {
                    previousAckIterator.remove();
                    ++prevAcked;
                }
            }
            if (prevAcked != this._lastAcked) {
                this._session.acknowledgeMessage(prevAcked, true);
                this._lastAcked = prevAcked;
            }
            for (Long tag : tagsToAck) {
                this._session.acknowledgeMessage(tag, false);
                this._previouslyAcked.add(tag);
            }
        }
    }

    void notifyError(Throwable cause) {
        this._closed.set(true);
        if (_logger.isDebugEnabled()) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (this._closedStack != null) {
                _logger.debug(this._consumerTag + " notifyError():" + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
                _logger.debug(this._consumerTag + " previously" + this._closedStack.toString());
            } else {
                this._closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
            }
        }
        if (!this.isMessageListenerSet() && this._synchronousQueue.offer(cause)) {
            _logger.debug("Passed exception to synchronous queue for propagation to receive()");
        }
        this.deregisterConsumer();
    }

    private void deregisterConsumer() {
        this._session.deregisterConsumer(this);
    }

    public int getConsumerTag() {
        return this._consumerTag;
    }

    public void setConsumerTag(int consumerTag) {
        this._consumerTag = consumerTag;
    }

    public AMQSession getSession() {
        return this._session;
    }

    private void checkPreConditions() throws JMSException {
        this.checkNotClosed();
        if (this._session == null || this._session.isClosed()) {
            throw new IllegalStateException("Invalid Session");
        }
    }

    public boolean isAutoClose() {
        return this._autoClose;
    }

    public boolean isNoConsume() {
        return this._noConsume || this._destination.isBrowseOnly();
    }

    public void rollback() {
        this.rollbackPendingMessages();
    }

    public void rollbackPendingMessages() {
        if (this._synchronousQueue.size() > 0) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting the messages(" + this._synchronousQueue.size() + ") in _syncQueue (PRQ)" + "for consumer with tag:" + this._consumerTag);
            }
            Iterator iterator = this._synchronousQueue.iterator();
            int initialSize = this._synchronousQueue.size();
            boolean removed = false;
            while (iterator.hasNext()) {
                Object o = iterator.next();
                if (o instanceof AbstractJMSMessage) {
                    this._session.rejectMessage((AbstractJMSMessage)o, true);
                    if (_logger.isDebugEnabled()) {
                        _logger.debug("Rejected message:" + ((AbstractJMSMessage)o).getDeliveryTag());
                    }
                    iterator.remove();
                    removed = true;
                    continue;
                }
                _logger.error("Queue contained a :" + o.getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                iterator.remove();
                removed = true;
            }
            if (removed && initialSize == this._synchronousQueue.size()) {
                _logger.error("Queue had content removed but didn't change in size." + initialSize);
            }
            if (this._synchronousQueue.size() != 0) {
                _logger.warn("Queue was not empty after rejecting all messages Remaining:" + this._synchronousQueue.size());
                this.rollback();
            }
            this.clearReceiveQueue();
        }
    }

    public String debugIdentity() {
        return String.valueOf(this._consumerTag) + "[" + System.identityHashCode(this) + "]";
    }

    public void clearReceiveQueue() {
        this._synchronousQueue.clear();
    }

    public List<Long> drainReceiverQueueAndRetrieveDeliveryTags() {
        Iterator iterator = this._synchronousQueue.iterator();
        ArrayList<Long> tags = new ArrayList<Long>(this._synchronousQueue.size());
        while (iterator.hasNext()) {
            AbstractJMSMessage msg = (AbstractJMSMessage)iterator.next();
            tags.add(msg.getDeliveryTag());
            iterator.remove();
        }
        return tags;
    }

    public AMQShortString getQueuename() {
        return this._queuename;
    }

    public void setQueuename(AMQShortString queuename) {
        this._queuename = queuename;
    }

    public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException {
        this._session.addBindingKey(this, amqd, routingKey);
    }

    public void failedOverPre() {
        this.clearReceiveQueue();
    }

    public void failedOverPost() {
    }
}

