/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.openwire.amq;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.wireformat.WireFormat;

public class AMQConsumer {
    private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
    private final AMQSession session;
    private final ActiveMQDestination openwireDestination;
    private final boolean hasNotificationDestination;
    private final ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private ServerConsumer serverConsumer;
    private int prefetchSize;
    private final AtomicInteger currentWindow;
    private long messagePullSequence = 0L;
    private MessagePullHandler messagePullHandler;
    private boolean internalAddress = false;
    private volatile Set<MessageReference> rolledbackMessageRefs;

    public AMQConsumer(AMQSession amqSession, ActiveMQDestination d, ConsumerInfo info, ScheduledExecutorService scheduledPool, boolean internalAddress) {
        this.session = amqSession;
        this.openwireDestination = d;
        this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
        this.info = info;
        this.scheduledPool = scheduledPool;
        this.prefetchSize = info.getPrefetchSize();
        this.currentWindow = new AtomicInteger(this.prefetchSize);
        if (this.prefetchSize == 0) {
            this.messagePullHandler = new MessagePullHandler();
        }
        this.internalAddress = internalAddress;
        this.rolledbackMessageRefs = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
        AMQConsumer aMQConsumer = this;
        synchronized (aMQConsumer) {
            Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
            if (rollbackedMessageRefs == null) {
                this.rolledbackMessageRefs = rollbackedMessageRefs = new ConcurrentSkipListSet<MessageReference>(Comparator.comparingLong(MessageReference::getMessageID));
            }
            return rollbackedMessageRefs;
        }
    }

    private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
        Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
        if (rolledbackMessageRefs == null) {
            rolledbackMessageRefs = this.guardedInitializationOfRolledBackMessageRefs();
        }
        return rolledbackMessageRefs;
    }

    private Set<MessageReference> getRolledbackMessageRefs() {
        return this.rolledbackMessageRefs;
    }

    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
        SimpleString selector = this.info.getSelector() == null ? null : new SimpleString(this.info.getSelector());
        boolean preAck = false;
        if (this.info.isNoLocal()) {
            if (!AdvisorySupport.isAdvisoryTopic((ActiveMQDestination)this.openwireDestination)) {
                this.session.getConnection().setNoLocal(true);
            } else {
                preAck = true;
            }
            String id = this.info.getClientId() != null ? this.info.getClientId() : this.getId().getConnectionId();
            String noLocalSelector = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + id + "'";
            selector = selector == null ? new SimpleString(noLocalSelector) : new SimpleString(this.info.getSelector() + " AND " + noLocalSelector);
        }
        SimpleString destinationName = new SimpleString(this.session.convertWildcard(this.openwireDestination.getPhysicalName()));
        if (this.openwireDestination.isTopic()) {
            SimpleString queueName = this.createTopicSubscription(this.info.isDurable(), this.info.getClientId(), destinationName.toString(), this.info.getSubscriptionName(), selector, destinationName);
            this.serverConsumer = this.session.getCoreSession().createConsumer(nativeId, queueName, null, this.info.isBrowser(), false, Integer.valueOf(-1));
            this.serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
            ((ServerConsumerImpl)this.serverConsumer).setPreAcknowledge(preAck);
        } else {
            try {
                this.session.getCoreServer().createQueue(destinationName, RoutingType.ANYCAST, destinationName, null, true, false);
            }
            catch (ActiveMQQueueExistsException queueName) {
                // empty catch block
            }
            this.serverConsumer = this.session.getCoreSession().createConsumer(nativeId, destinationName, selector, this.info.isBrowser(), false, Integer.valueOf(-1));
            this.serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
            AddressSettings addrSettings = (AddressSettings)this.session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
            if (addrSettings != null && this.info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
                ConsumerControl cc = new ConsumerControl();
                cc.setConsumerId(this.info.getConsumerId());
                cc.setPrefetch(0);
                this.session.getConnection().dispatch((Command)cc);
            }
        }
        this.serverConsumer.setProtocolData((Object)this);
    }

    private SimpleString createTopicSubscription(boolean isDurable, String clientID, String physicalName, String subscriptionName, SimpleString selector, SimpleString address) throws Exception {
        SimpleString queueName;
        AddressInfo addressInfo = this.session.getCoreServer().getAddressInfo(address);
        if (addressInfo != null) {
            addressInfo.addRoutingType(RoutingType.MULTICAST);
        } else {
            addressInfo = new AddressInfo(address, RoutingType.MULTICAST);
        }
        addressInfo.setInternal(this.internalAddress);
        if (isDurable) {
            QueueQueryResult result;
            queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription((boolean)true, (String)clientID, (String)subscriptionName);
            if (this.info.getDestination().isComposite()) {
                queueName = queueName.concat(physicalName);
            }
            if ((result = this.session.getCoreSession().executeQueueQuery(queueName)).isExists()) {
                boolean topicChanged;
                if (result.getConsumerCount() > 0) {
                    throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
                }
                SimpleString oldFilterString = result.getFilterString();
                boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals((Object)selector);
                SimpleString oldTopicName = result.getAddress();
                boolean bl = topicChanged = !oldTopicName.equals((Object)address);
                if (selectorChanged || topicChanged) {
                    this.session.getCoreSession().deleteQueue(queueName);
                    this.session.getCoreSession().createQueue(addressInfo, queueName, selector, false, true);
                }
            } else {
                this.session.getCoreSession().createQueue(addressInfo, queueName, selector, false, true);
            }
        } else {
            queueName = new SimpleString(UUID.randomUUID().toString());
            this.session.getCoreSession().createQueue(addressInfo, queueName, selector, true, false);
        }
        return queueName;
    }

    public ConsumerId getId() {
        return this.info.getConsumerId();
    }

    public void acquireCredit(int n) throws Exception {
        boolean promptDelivery;
        if (this.messagePullHandler != null) {
            return;
        }
        int oldwindow = this.currentWindow.getAndAdd(n);
        boolean bl = promptDelivery = oldwindow < this.prefetchSize;
        if (promptDelivery) {
            this.serverConsumer.promptDelivery();
        }
    }

    public int handleDeliver(MessageReference reference, ICoreMessage message, int deliveryCount) {
        try {
            if (this.messagePullHandler != null && !this.messagePullHandler.checkForcedConsumer((Message)message)) {
                return 0;
            }
            if (this.session.getConnection().isNoLocal() || this.session.isInternal()) {
                message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
            }
            MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, (WireFormat)this.session.wireFormat(), this);
            int size = dispatch.getMessage().getSize();
            reference.setProtocolData((Object)dispatch.getMessage().getMessageId());
            this.session.deliverMessage(dispatch);
            this.currentWindow.decrementAndGet();
            return size;
        }
        catch (IOException e) {
            ActiveMQServerLogger.LOGGER.warn((Object)"Error during message dispatch", (Throwable)e);
            return 0;
        }
        catch (Throwable t) {
            ActiveMQServerLogger.LOGGER.warn((Object)"Error during message dispatch", t);
            return 0;
        }
    }

    public void handleDeliverNullDispatch() {
        MessageDispatch md = new MessageDispatch();
        md.setConsumerId(this.getId());
        md.setDestination(this.openwireDestination);
        this.session.deliverMessage(md);
    }

    public void acknowledge(MessageAck ack) throws Exception {
        boolean removeReferences;
        MessageId first = ack.getFirstMessageId();
        MessageId last = ack.getLastMessageId();
        if (first == null) {
            first = last;
        }
        boolean bl = removeReferences = !this.serverConsumer.isBrowseOnly();
        if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
            removeReferences = false;
        }
        List ackList = this.serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, (Object)first, (Object)last);
        this.acquireCredit(ack.getMessageCount());
        if (removeReferences) {
            Transaction originalTX = this.session.getCoreSession().getCurrentTransaction();
            Transaction transaction = originalTX == null ? this.session.getCoreSession().newTransaction() : originalTX;
            if (ack.isIndividualAck() || ack.isStandardAck()) {
                for (MessageReference ref : ackList) {
                    ref.acknowledge(transaction, this.serverConsumer);
                }
            } else if (ack.isPoisonAck()) {
                for (MessageReference ref : ackList) {
                    Throwable poisonCause = ack.getPoisonCause();
                    if (poisonCause != null) {
                        ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
                    }
                    ref.getQueue().sendToDeadLetterAddress(transaction, ref);
                }
            }
            if (originalTX == null) {
                transaction.commit(true);
            }
        }
        if (ack.isExpiredAck()) {
            for (MessageReference ref : ackList) {
                ref.getQueue().expire(ref, this.serverConsumer);
            }
        }
    }

    public void browseFinished() {
        MessageDispatch md = new MessageDispatch();
        md.setConsumerId(this.info.getConsumerId());
        md.setMessage(null);
        md.setDestination(null);
        this.session.deliverMessage(md);
    }

    public ConsumerInfo getInfo() {
        return this.info;
    }

    public boolean hasCredits() {
        return this.currentWindow.get() > 0;
    }

    public void processMessagePull(MessagePull messagePull) throws Exception {
        this.currentWindow.incrementAndGet();
        if (this.messagePullHandler != null) {
            this.messagePullHandler.nextSequence(this.messagePullSequence++, messagePull.getTimeout());
        }
    }

    public void removeConsumer() throws Exception {
        this.serverConsumer.close(false);
    }

    public boolean hasNotificationDestination() {
        return this.hasNotificationDestination;
    }

    public ActiveMQDestination getOpenwireDestination() {
        return this.openwireDestination;
    }

    public void setPrefetchSize(int prefetchSize) {
        this.prefetchSize = prefetchSize;
        this.currentWindow.set(prefetchSize);
        this.info.setPrefetchSize(prefetchSize);
        if (this.prefetchSize > 0) {
            this.serverConsumer.promptDelivery();
        }
    }

    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
        long lastDelSeqId = this.info.getLastDeliveredSequenceId();
        if (this.info.isDurable() && this.getOpenwireDestination().isTopic()) {
            return true;
        }
        ref.decrementDeliveryCount();
        if (lastDelSeqId == -2L) {
            ref.incrementDeliveryCount();
        } else if (lastDelSeqId == -1L && !this.isRolledBack(ref)) {
            ref.incrementDeliveryCount();
        }
        return true;
    }

    public boolean removeRolledback(MessageReference messageReference) {
        Set<MessageReference> rolledbackMessageRefs = this.getRolledbackMessageRefs();
        if (rolledbackMessageRefs == null) {
            return false;
        }
        return rolledbackMessageRefs.remove(messageReference);
    }

    public void addRolledback(MessageReference messageReference) {
        this.getRolledbackMessageRefsOrCreate().add(messageReference);
    }

    private boolean isRolledBack(MessageReference messageReference) {
        Set<MessageReference> rollbackedMessageRefs = this.getRolledbackMessageRefs();
        if (rollbackedMessageRefs == null) {
            return false;
        }
        return rollbackedMessageRefs.contains(messageReference);
    }

    private class MessagePullHandler {
        private long next = -1L;
        private long timeout;
        private CountDownLatch latch = new CountDownLatch(1);
        private ScheduledFuture<?> messagePullFuture;

        private MessagePullHandler() {
        }

        public void nextSequence(long next, long timeout) throws Exception {
            this.next = next;
            this.timeout = timeout;
            this.latch = new CountDownLatch(1);
            AMQConsumer.this.serverConsumer.forceDelivery(AMQConsumer.this.messagePullSequence);
            if (timeout <= 0L) {
                this.latch.await(10L, TimeUnit.SECONDS);
                if (this.next >= 0L) {
                    AMQConsumer.this.handleDeliverNullDispatch();
                }
            }
        }

        public boolean checkForcedConsumer(Message message) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
                if (this.next >= 0L) {
                    if (this.timeout <= 0L) {
                        this.latch.countDown();
                    } else {
                        this.messagePullFuture = AMQConsumer.this.scheduledPool.schedule(new Runnable(){

                            @Override
                            public void run() {
                                if (MessagePullHandler.this.next >= 0L) {
                                    AMQConsumer.this.handleDeliverNullDispatch();
                                }
                            }
                        }, this.timeout, TimeUnit.MILLISECONDS);
                    }
                }
                return false;
            }
            this.next = -1L;
            if (this.messagePullFuture != null) {
                this.messagePullFuture.cancel(true);
            }
            this.latch.countDown();
            return true;
        }
    }
}

