/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.openwire.amq;

import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConstants;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AMQSession session;
    private final ActiveMQDestination openwireDestination;
    private final ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private ServerConsumer serverConsumer;
    private int prefetchSize;
    private final AtomicInteger currentWindow;
    private int deliveredAcksCreditExtension = 0;
    private long messagePullSequence = 0L;
    private final AtomicReference<MessagePullHandler> messagePullHandler = new AtomicReference<Object>(null);
    private boolean internalAddress = false;
    private volatile Set<MessageReference> rolledbackMessageRefs;
    private ScheduledFuture<?> delayedDispatchPrompter;
    private AtomicLong deliveredSequenceId = new AtomicLong(0L);

    public AMQConsumer(AMQSession amqSession, ActiveMQDestination d, ConsumerInfo info, ScheduledExecutorService scheduledPool, boolean internalAddress) {
        this.session = amqSession;
        this.openwireDestination = d;
        this.info = info;
        this.scheduledPool = scheduledPool;
        this.prefetchSize = info.getPrefetchSize();
        this.currentWindow = new AtomicInteger(this.prefetchSize);
        if (this.prefetchSize == 0) {
            this.messagePullHandler.set(new MessagePullHandler());
        }
        this.internalAddress = internalAddress;
        this.rolledbackMessageRefs = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
        AMQConsumer aMQConsumer = this;
        synchronized (aMQConsumer) {
            Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
            if (rollbackedMessageRefs == null) {
                this.rolledbackMessageRefs = rollbackedMessageRefs = new ConcurrentSkipListSet<MessageReference>(Comparator.comparingLong(MessageReference::getMessageID));
            }
            return rollbackedMessageRefs;
        }
    }

    private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
        Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
        if (rolledbackMessageRefs == null) {
            rolledbackMessageRefs = this.guardedInitializationOfRolledBackMessageRefs();
        }
        return rolledbackMessageRefs;
    }

    protected Set<MessageReference> getRolledbackMessageRefs() {
        return this.rolledbackMessageRefs;
    }

    private static String convertOpenWireToActiveMQFilterString(String selectorString) {
        if (selectorString == null) {
            return null;
        }
        String filterString = SelectorTranslator.convertToActiveMQFilterString((String)selectorString);
        filterString = SelectorTranslator.parse((String)filterString, (String)"AMQUserID", (String)OpenWireConstants.AMQ_MSG_MESSAGE_ID.toString());
        return filterString;
    }

    public void start() {
        if (this.serverConsumer == null) {
            throw new IllegalStateException("Cannot start the AMQConsumer until it has been initialized");
        }
        this.serverConsumer.setStarted(true);
    }

    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
        Long delayBeforeDispatch;
        SimpleString selector = this.info.getSelector() == null ? null : SimpleString.of((String)AMQConsumer.convertOpenWireToActiveMQFilterString(this.info.getSelector()));
        boolean preAck = false;
        if (this.info.isNoLocal()) {
            if (!AdvisorySupport.isAdvisoryTopic((ActiveMQDestination)this.openwireDestination)) {
                this.session.getConnection().setNoLocal(true);
            } else {
                preAck = true;
            }
            String id = this.info.getClientId() != null ? this.info.getClientId() : this.getId().getConnectionId();
            String noLocalSelector = String.valueOf(MessageUtil.CONNECTION_ID_PROPERTY_NAME) + "<>'" + id + "'";
            selector = selector == null ? SimpleString.of((String)noLocalSelector) : SimpleString.of((String)(this.info.getSelector() + " AND " + noLocalSelector));
        }
        SimpleString destinationName = SimpleString.of((String)this.session.convertWildcard(this.openwireDestination));
        if (this.openwireDestination.isTopic()) {
            SimpleString queueName = this.createTopicSubscription(this.info.isDurable(), this.info.getClientId(), destinationName.toString(), this.info.getSubscriptionName(), selector, destinationName);
            this.serverConsumer = this.session.getCoreSession().createConsumer(nativeId, queueName, (SimpleString)(CompositeAddress.isFullyQualified((String)destinationName.toString()) ? selector : null), (int)this.info.getPriority(), this.info.isBrowser(), false, Integer.valueOf(-1));
            this.serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
            ((ServerConsumerImpl)this.serverConsumer).setPreAcknowledge(preAck);
        } else {
            this.serverConsumer = this.session.getCoreSession().createConsumer(nativeId, destinationName, selector, (int)this.info.getPriority(), this.info.isBrowser(), false, Integer.valueOf(-1));
            this.serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
            AddressSettings addrSettings = (AddressSettings)this.session.getCoreServer().getAddressSettingsRepository().getMatch(destinationName.toString());
            if (addrSettings != null && this.info.getPrefetchSize() != 0 && addrSettings.getQueuePrefetch() == 0) {
                ConsumerControl cc = new ConsumerControl();
                cc.setConsumerId(this.info.getConsumerId());
                cc.setPrefetch(0);
                this.session.getConnection().dispatch((Command)cc);
            }
        }
        if (this.serverConsumer != null && this.serverConsumer.getQueue() != null && this.serverConsumer.getQueue().getQueueConfiguration() != null && (delayBeforeDispatch = this.serverConsumer.getQueue().getQueueConfiguration().getDelayBeforeDispatch()) != null && delayBeforeDispatch > 0L) {
            Long schedule = delayBeforeDispatch / 2L;
            this.delayedDispatchPrompter = this.scheduledPool.scheduleAtFixedRate(() -> this.serverConsumer.promptDelivery(), schedule, schedule, TimeUnit.MILLISECONDS);
        }
        this.serverConsumer.setProtocolData((Object)this);
    }

    private SimpleString createTopicSubscription(boolean isDurable, String clientID, String physicalName, String subscriptionName, SimpleString selector, SimpleString address) throws Exception {
        SimpleString queueName;
        if (isDurable) {
            queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription((boolean)true, (String)clientID, (String)subscriptionName);
            if (this.info.getDestination().isComposite()) {
                queueName = queueName.concat(physicalName);
            }
            QueueConfiguration queueConfiguration = QueueConfiguration.of((SimpleString)queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selector).setInternal(Boolean.valueOf(this.internalAddress));
            QueueQueryResult result = this.session.getCoreSession().executeQueueQuery(queueName);
            if (result.isExists()) {
                boolean topicChanged;
                if (result.getConsumerCount() > 0) {
                    throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
                }
                SimpleString oldFilterString = result.getFilterString();
                boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || oldFilterString != null && selector != null && !oldFilterString.equals((Object)selector);
                SimpleString oldTopicName = result.getAddress();
                boolean bl = topicChanged = !oldTopicName.equals((Object)address);
                if ((selectorChanged || topicChanged) && !result.isConfigurationManaged().booleanValue()) {
                    this.session.getCoreSession().deleteQueue(queueName);
                    this.session.getCoreSession().createQueue(queueConfiguration);
                }
            } else {
                this.session.getCoreSession().createQueue(queueConfiguration);
            }
        } else {
            if (CompositeAddress.isFullyQualified((String)physicalName)) {
                queueName = CompositeAddress.extractQueueName((SimpleString)SimpleString.of((String)physicalName));
                if (this.session.getCoreServer().locateQueue(queueName) != null) {
                    return queueName;
                }
            } else {
                queueName = SimpleString.of((String)UUID.randomUUID().toString());
            }
            this.session.getCoreSession().createQueue(QueueConfiguration.of((SimpleString)queueName).setAddress(address).setRoutingType(RoutingType.MULTICAST).setFilterString(selector).setDurable(Boolean.valueOf(false)).setTemporary(Boolean.valueOf(true)).setInternal(Boolean.valueOf(this.internalAddress)));
        }
        return queueName;
    }

    public ConsumerId getId() {
        return this.info.getConsumerId();
    }

    public void acquireCredit(int n, boolean delivered) {
        boolean promptDelivery;
        if (this.messagePullHandler.get() != null) {
            return;
        }
        if (delivered) {
            this.deliveredAcksCreditExtension += n;
        } else if (this.deliveredAcksCreditExtension > 0) {
            if (this.deliveredAcksCreditExtension < n) {
                n -= this.deliveredAcksCreditExtension;
                this.deliveredAcksCreditExtension = 0;
            } else {
                this.deliveredAcksCreditExtension -= n;
                return;
            }
        }
        int oldwindow = this.currentWindow.getAndAdd(n);
        boolean bl = promptDelivery = oldwindow < this.prefetchSize;
        if (promptDelivery) {
            this.serverConsumer.promptDelivery();
        }
    }

    public int handleDeliver(MessageReference reference, ICoreMessage message) {
        try {
            MessagePullHandler pullHandler = this.messagePullHandler.get();
            if (pullHandler != null && !pullHandler.checkForcedConsumer((Message)message)) {
                return 0;
            }
            if (this.session.getConnection().isNoLocal() || this.session.isInternal() && AdvisorySupport.isAdvisoryTopic((ActiveMQDestination)this.openwireDestination)) {
                message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
            }
            MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, (WireFormat)this.session.wireFormat(), this, this.session.getCoreServer().getNodeManager().getUUID(), this.deliveredSequenceId.getAndIncrement());
            int size = dispatch.getMessage().getSize();
            reference.setProtocolData(MessageId.class, (Object)dispatch.getMessage().getMessageId());
            this.session.deliverMessage(dispatch);
            this.currentWindow.decrementAndGet();
            return size;
        }
        catch (Throwable t) {
            logger.warn("Error during message dispatch", t);
            return 0;
        }
    }

    public void handleDeliverNullDispatch() {
        MessageDispatch md = new MessageDispatch();
        md.setConsumerId(this.getId());
        md.setDestination(this.openwireDestination);
        this.session.deliverMessage(md);
    }

    public void acknowledge(MessageAck ack) throws Exception {
        if (ack.isRedeliveredAck()) {
            return;
        }
        int ackMessageCount = ack.getMessageCount();
        if (ack.isDeliveredAck()) {
            this.acquireCredit(ackMessageCount, true);
            return;
        }
        MessageId lastID = ack.getLastMessageId();
        MessageId startID = ack.getFirstMessageId() == null ? lastID : ack.getFirstMessageId();
        boolean removeReferences = !this.serverConsumer.isBrowseOnly() && !this.serverConsumer.getQueue().isNonDestructive();
        List ackList = this.serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData(MessageId.class)), reference -> lastID.equals(reference.getProtocolData(MessageId.class)));
        if (!ackList.isEmpty() || !removeReferences || this.serverConsumer.getQueue().isTemporary()) {
            this.acquireCredit(ackMessageCount, false);
            if (ack.isExpiredAck()) {
                for (MessageReference ref : ackList) {
                    ref.getQueue().expire(ref, this.serverConsumer, true);
                }
            } else if (removeReferences) {
                Transaction originalTX = this.session.getCoreSession().getCurrentTransaction();
                Transaction transaction = originalTX == null ? this.session.getCoreSession().newTransaction() : originalTX;
                if (ack.isIndividualAck() || ack.isStandardAck()) {
                    for (MessageReference ref : ackList) {
                        ref.acknowledge(transaction, this.serverConsumer);
                        this.serverConsumer.metricsAcknowledge(ref, transaction);
                        this.removeRolledback(ref);
                    }
                } else if (ack.isPoisonAck()) {
                    for (MessageReference ref : ackList) {
                        Throwable poisonCause = ack.getPoisonCause();
                        if (poisonCause != null) {
                            ((QueueImpl)ref.getQueue()).decDelivering(ref);
                            ref.getMessage().putStringProperty(OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, SimpleString.of((String)poisonCause.toString()));
                            ((QueueImpl)ref.getQueue()).incDelivering(ref);
                        }
                        ref.getQueue().sendToDeadLetterAddress(transaction, ref);
                        this.removeRolledback(ref);
                    }
                }
                if (originalTX == null) {
                    transaction.commit(true);
                }
            }
        }
    }

    public void browseFinished() {
        MessageDispatch md = new MessageDispatch();
        md.setConsumerId(this.info.getConsumerId());
        md.setMessage(null);
        md.setDestination(null);
        this.session.deliverMessage(md);
    }

    public ConsumerInfo getInfo() {
        return this.info;
    }

    public boolean hasCredits() {
        return this.currentWindow.get() > 0;
    }

    public void processMessagePull(MessagePull messagePull) throws Exception {
        this.currentWindow.incrementAndGet();
        MessagePullHandler pullHandler = this.messagePullHandler.get();
        if (pullHandler != null) {
            pullHandler.nextSequence(this.messagePullSequence++, messagePull.getTimeout());
        }
    }

    public void removeConsumer() throws Exception {
        this.serverConsumer.close(false);
        if (this.delayedDispatchPrompter != null) {
            this.delayedDispatchPrompter.cancel(false);
        }
        if (this.info.getPrefetchSize() > 1) {
            this.session.getCoreSession().getSessionContext().waitCompletion();
        }
    }

    public ActiveMQDestination getOpenwireDestination() {
        return this.openwireDestination;
    }

    public void setPrefetchSize(int prefetchSize) {
        this.prefetchSize = prefetchSize;
        this.currentWindow.set(prefetchSize);
        this.info.setPrefetchSize(prefetchSize);
        if (this.prefetchSize == 0) {
            this.messagePullHandler.compareAndSet(null, new MessagePullHandler());
        } else {
            this.messagePullHandler.set(null);
        }
        if (this.prefetchSize > 0) {
            this.serverConsumer.promptDelivery();
        }
    }

    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
        if (-2L == this.info.getLastDeliveredSequenceId()) {
            return true;
        }
        return ((MessageId)ref.getProtocolData(MessageId.class)).getBrokerSequenceId() <= this.info.getLastDeliveredSequenceId() && !this.isRolledBack(ref);
    }

    public void removeRolledback(MessageReference messageReference) {
        Set<MessageReference> rolledbackMessageRefs = this.getRolledbackMessageRefs();
        if (rolledbackMessageRefs != null) {
            rolledbackMessageRefs.remove(messageReference);
        }
    }

    public void addRolledback(MessageReference messageReference) {
        this.currentWindow.decrementAndGet();
        this.getRolledbackMessageRefsOrCreate().add(messageReference);
    }

    private boolean isRolledBack(MessageReference messageReference) {
        Set<MessageReference> rollbackedMessageRefs = this.getRolledbackMessageRefs();
        if (rollbackedMessageRefs == null) {
            return false;
        }
        return rollbackedMessageRefs.contains(messageReference);
    }

    private class MessagePullHandler {
        private long next = -1L;
        private long timeout;
        private CountDownLatch latch = new CountDownLatch(1);
        private ScheduledFuture<?> messagePullFuture;

        private MessagePullHandler() {
        }

        public void nextSequence(long next, long timeout) throws Exception {
            this.next = next;
            this.timeout = timeout;
            this.latch = new CountDownLatch(1);
            AMQConsumer.this.serverConsumer.forceDelivery(AMQConsumer.this.messagePullSequence);
            if (timeout <= 0L) {
                this.latch.await(10L, TimeUnit.SECONDS);
                if (this.next >= 0L) {
                    AMQConsumer.this.handleDeliverNullDispatch();
                }
            }
        }

        public boolean checkForcedConsumer(Message message) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
                if (this.next >= 0L) {
                    if (this.timeout <= 0L) {
                        this.latch.countDown();
                    } else {
                        this.messagePullFuture = AMQConsumer.this.scheduledPool.schedule(() -> {
                            if (this.next >= 0L) {
                                AMQConsumer.this.handleDeliverNullDispatch();
                            }
                        }, this.timeout, TimeUnit.MILLISECONDS);
                    }
                }
                return false;
            }
            this.next = -1L;
            if (this.messagePullFuture != null) {
                this.messagePullFuture.cancel(true);
            }
            this.latch.countDown();
            return true;
        }
    }
}

