/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.server.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.NotificationType;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionFactory;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.TypedProperties;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.json.JSONArray;
import org.apache.activemq.artemis.utils.json.JSONObject;
import org.jboss.logging.Logger;

public class ServerSessionImpl
implements ServerSession,
FailureListener {
    private static final Logger logger = Logger.getLogger(ServerSessionImpl.class);
    protected final String username;
    protected final String password;
    private final int minLargeMessageSize;
    protected boolean autoCommitSends;
    protected boolean autoCommitAcks;
    protected final boolean preAcknowledge;
    protected final boolean strictUpdateDeliveryCount;
    protected final RemotingConnection remotingConnection;
    protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
    protected Transaction tx;
    protected boolean xa;
    protected final StorageManager storageManager;
    private final ResourceManager resourceManager;
    public final PostOffice postOffice;
    private final SecurityStore securityStore;
    protected final ManagementService managementService;
    protected volatile boolean started = false;
    protected final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>();
    protected final String name;
    protected final ActiveMQServer server;
    private final SimpleString managementAddress;
    private volatile LargeServerMessage currentLargeMessage;
    protected final RoutingContext routingContext = new RoutingContextImpl(null);
    protected final SessionCallback callback;
    private volatile SimpleString defaultAddress;
    private volatile int timeoutSeconds;
    private Map<String, String> metaData;
    private final OperationContext context;
    private QueueCreator queueCreator;
    protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
    private final long creationTime = System.currentTimeMillis();
    private volatile boolean closed = false;
    private final TransactionFactory transactionFactory;

    public ServerSessionImpl(String name, String username, String password, int minLargeMessageSize, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean strictUpdateDeliveryCount, boolean xa, RemotingConnection remotingConnection, StorageManager storageManager, PostOffice postOffice, ResourceManager resourceManager, SecurityStore securityStore, ManagementService managementService, ActiveMQServer server, SimpleString managementAddress, SimpleString defaultAddress, SessionCallback callback, OperationContext context, QueueCreator queueCreator) throws Exception {
        this(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, strictUpdateDeliveryCount, xa, remotingConnection, storageManager, postOffice, resourceManager, securityStore, managementService, server, managementAddress, defaultAddress, callback, context, null, queueCreator);
    }

    public ServerSessionImpl(String name, String username, String password, int minLargeMessageSize, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean strictUpdateDeliveryCount, boolean xa, RemotingConnection remotingConnection, StorageManager storageManager, PostOffice postOffice, ResourceManager resourceManager, SecurityStore securityStore, ManagementService managementService, ActiveMQServer server, SimpleString managementAddress, SimpleString defaultAddress, SessionCallback callback, OperationContext context, TransactionFactory transactionFactory, QueueCreator queueCreator) throws Exception {
        this.username = username;
        this.password = password;
        this.minLargeMessageSize = minLargeMessageSize;
        this.autoCommitSends = autoCommitSends;
        this.autoCommitAcks = autoCommitAcks;
        this.preAcknowledge = preAcknowledge;
        this.remotingConnection = remotingConnection;
        this.storageManager = storageManager;
        this.postOffice = postOffice;
        this.resourceManager = resourceManager;
        this.securityStore = securityStore;
        this.timeoutSeconds = resourceManager.getTimeoutSeconds();
        this.xa = xa;
        this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
        this.managementService = managementService;
        this.name = name;
        this.server = server;
        this.managementAddress = managementAddress;
        this.callback = callback;
        this.defaultAddress = defaultAddress;
        remotingConnection.addFailureListener((FailureListener)this);
        this.context = context;
        this.queueCreator = queueCreator;
        this.transactionFactory = transactionFactory == null ? new DefaultTransactionFactory() : transactionFactory;
        if (!xa) {
            this.tx = this.newTransaction();
        }
    }

    @Override
    public OperationContext getSessionContext() {
        return this.context;
    }

    @Override
    public String getUsername() {
        return this.username;
    }

    @Override
    public String getPassword() {
        return this.password;
    }

    @Override
    public int getMinLargeMessageSize() {
        return this.minLargeMessageSize;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public Object getConnectionID() {
        return this.remotingConnection.getID();
    }

    @Override
    public Set<ServerConsumer> getServerConsumers() {
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        return Collections.unmodifiableSet(consumersClone);
    }

    @Override
    public void markTXFailed(Throwable e) {
        Transaction currentTX = this.tx;
        if (currentTX != null) {
            if (e instanceof ActiveMQException) {
                currentTX.markAsRollbackOnly((ActiveMQException)e);
            } else {
                ActiveMQException exception = new ActiveMQException(e.getMessage());
                exception.initCause(e);
                currentTX.markAsRollbackOnly(exception);
            }
        }
    }

    @Override
    public boolean removeConsumer(long consumerID) throws Exception {
        return this.consumers.remove(consumerID) != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doClose(boolean failed) throws Exception {
        ServerSessionImpl serverSessionImpl = this;
        synchronized (serverSessionImpl) {
            if (this.closed) {
                return;
            }
            if (this.tx != null && this.tx.getXid() == null) {
                try {
                    this.rollback(failed, false);
                }
                catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
                }
            }
        }
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        for (ServerConsumer consumer : consumersClone) {
            try {
                consumer.close(failed);
            }
            catch (Throwable e) {
                ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
                try {
                    consumer.removeItself();
                }
                catch (Throwable e2) {
                    ActiveMQServerLogger.LOGGER.warn(e2.getMessage(), e2);
                }
            }
        }
        this.consumers.clear();
        if (this.currentLargeMessage != null) {
            try {
                this.currentLargeMessage.deleteFile();
            }
            catch (Throwable error) {
                ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
            }
        }
        ServerSessionImpl serverSessionImpl2 = this;
        synchronized (serverSessionImpl2) {
            this.server.removeSession(this.name);
            this.remotingConnection.removeFailureListener((FailureListener)this);
            this.callback.closed();
            this.closed = true;
        }
    }

    @Override
    public QueueCreator getQueueCreator() {
        return this.queueCreator;
    }

    @Override
    public ServerConsumer createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly) throws Exception {
        return this.createConsumer(consumerID, queueName, filterString, browseOnly, true, null);
    }

    public ServerConsumer createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly, boolean supportLargeMessage, Integer credits) throws Exception {
        Binding binding = this.postOffice.getBinding(queueName);
        if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
        }
        this.securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
        Filter filter = FilterImpl.createFilter(filterString);
        ServerConsumer consumer = this.newConsumer(consumerID, this, (QueueBinding)binding, filter, this.started, browseOnly, this.storageManager, this.callback, this.preAcknowledge, this.strictUpdateDeliveryCount, this.managementService, supportLargeMessage, credits);
        this.consumers.put(consumer.getID(), consumer);
        if (!browseOnly) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
            Queue theQueue = (Queue)binding.getBindable();
            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
            props.putSimpleStringProperty(ManagementHelper.HDR_USER, SimpleString.toSimpleString((String)this.username));
            props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString((String)this.remotingConnection.getRemoteAddress()));
            props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString((String)this.name));
            if (filterString != null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
            }
            Notification notification = new Notification(null, (NotificationType)CoreNotificationType.CONSUMER_CREATED, props);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Session with user=" + this.username + ", connection=" + this.remotingConnection + " created a consumer on queue " + queueName + ", filter = " + filterString));
            }
            this.managementService.sendNotification(notification);
        }
        return consumer;
    }

    protected ServerConsumer newConsumer(long consumerID, ServerSessionImpl serverSessionImpl, QueueBinding binding, Filter filter, boolean started2, boolean browseOnly, StorageManager storageManager2, SessionCallback callback2, boolean preAcknowledge2, boolean strictUpdateDeliveryCount2, ManagementService managementService2, boolean supportLargeMessage, Integer credits) throws Exception {
        return new ServerConsumerImpl(consumerID, this, binding, filter, this.started, browseOnly, this.storageManager, this.callback, this.preAcknowledge, this.strictUpdateDeliveryCount, this.managementService, supportLargeMessage, credits);
    }

    @Override
    public Queue createQueue(SimpleString address, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
        if (durable) {
            this.securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
        } else {
            this.securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
        }
        this.server.checkQueueCreationLimit(this.getUsername());
        Queue queue = !temporary && address.toString().startsWith("jms.queue.") && address.equals((Object)name) ? this.server.createQueue(address, name, filterString, SimpleString.toSimpleString((String)this.getUsername()), durable, temporary, true) : this.server.createQueue(address, name, filterString, SimpleString.toSimpleString((String)this.getUsername()), durable, temporary);
        if (temporary) {
            TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(this.server, name);
            this.remotingConnection.addCloseListener((CloseListener)cleaner);
            this.remotingConnection.addFailureListener((FailureListener)cleaner);
            this.tempQueueCleannerUppers.put(name, cleaner);
        }
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Queue " + name + " created on address " + address + " with filter=" + filterString + " temporary = " + temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection));
        }
        return queue;
    }

    @Override
    public void createSharedQueue(SimpleString address, SimpleString name, boolean durable, SimpleString filterString) throws Exception {
        this.securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
        this.server.checkQueueCreationLimit(this.getUsername());
        this.server.createSharedQueue(address, name, filterString, SimpleString.toSimpleString((String)this.getUsername()), durable);
    }

    @Override
    public RemotingConnection getRemotingConnection() {
        return this.remotingConnection;
    }

    @Override
    public void deleteQueue(SimpleString queueToDelete) throws Exception {
        Binding binding = this.postOffice.getBinding(queueToDelete);
        if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE) {
            throw new ActiveMQNonExistentQueueException();
        }
        this.server.destroyQueue(queueToDelete, this, true);
        TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(queueToDelete);
        if (cleaner != null) {
            this.remotingConnection.removeCloseListener((CloseListener)cleaner);
            this.remotingConnection.removeFailureListener((FailureListener)cleaner);
        }
    }

    @Override
    public QueueQueryResult executeQueueQuery(SimpleString name) throws Exception {
        QueueQueryResult response;
        boolean autoCreateJmsQueues;
        boolean bl = autoCreateJmsQueues = name.toString().startsWith("jms.queue.") && this.server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
        if (name == null) {
            throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
        }
        Binding binding = this.postOffice.getBinding(name);
        if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
            Queue queue = (Queue)binding.getBindable();
            Filter filter = queue.getFilter();
            SimpleString filterString = filter == null ? null : filter.getFilterString();
            response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
        } else {
            response = name.equals((Object)this.managementAddress) ? new QueueQueryResult(name, this.managementAddress, true, false, null, -1, -1L, autoCreateJmsQueues) : (autoCreateJmsQueues ? new QueueQueryResult(name, name, true, false, null, 0, 0L, true, false) : new QueueQueryResult(null, null, false, false, null, 0, 0L, false, false));
        }
        return response;
    }

    @Override
    public BindingQueryResult executeBindingQuery(SimpleString address) throws Exception {
        boolean autoCreateJmsQueues;
        boolean bl = autoCreateJmsQueues = address.toString().startsWith("jms.queue.") && this.server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
        if (address == null) {
            throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
        }
        ArrayList<SimpleString> names = new ArrayList<SimpleString>();
        if (address.equals((Object)this.managementAddress)) {
            return new BindingQueryResult(true, names, autoCreateJmsQueues);
        }
        Bindings bindings = this.postOffice.getMatchingBindings(address);
        for (Binding binding : bindings.getBindings()) {
            if (binding.getType() != BindingType.LOCAL_QUEUE && binding.getType() != BindingType.REMOTE_QUEUE) continue;
            names.add(binding.getUniqueName());
        }
        return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
    }

    @Override
    public void forceConsumerDelivery(long consumerID, long sequence) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.forceDelivery(sequence);
        }
    }

    public void promptDelivery(long consumerID) {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.promptDelivery();
        }
    }

    @Override
    public void acknowledge(long consumerID, long messageID) throws Exception {
        ServerConsumer consumer = this.findConsumer(consumerID);
        if (this.tx != null && this.tx.getState() == Transaction.State.ROLLEDBACK) {
            Transaction newTX = this.newTransaction();
            try {
                consumer.acknowledge(newTX, messageID);
            }
            catch (Exception e) {
                logger.debug((Object)("Ignored exception while acking messageID " + messageID + " on a rolledback TX"), (Throwable)e);
            }
            newTX.rollback();
        } else {
            consumer.acknowledge(this.autoCommitAcks ? null : this.tx, messageID);
        }
    }

    private ServerConsumer findConsumer(long consumerID) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer == null) {
            Transaction currentTX = this.tx;
            ActiveMQIllegalStateException exception = ActiveMQMessageBundle.BUNDLE.consumerDoesntExist(consumerID);
            if (currentTX != null) {
                currentTX.markAsRollbackOnly((ActiveMQException)exception);
            }
            throw exception;
        }
        return consumer;
    }

    @Override
    public void individualAcknowledge(long consumerID, long messageID) throws Exception {
        ServerConsumer consumer = this.findConsumer(consumerID);
        if (this.tx != null && this.tx.getState() == Transaction.State.ROLLEDBACK) {
            Transaction newTX = this.newTransaction();
            consumer.individualAcknowledge(this.tx, messageID);
            newTX.rollback();
        } else {
            consumer.individualAcknowledge(this.autoCommitAcks ? null : this.tx, messageID);
        }
    }

    @Override
    public void individualCancel(long consumerID, long messageID, boolean failed) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.individualCancel(messageID, failed);
        }
    }

    @Override
    public void expire(long consumerID, long messageID) throws Exception {
        MessageReference ref = this.consumers.get(consumerID).removeReferenceByID(messageID);
        if (ref != null) {
            ref.getQueue().expire(ref);
        }
    }

    @Override
    public synchronized void commit() throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace((Object)"Calling commit");
        }
        try {
            if (this.tx != null) {
                this.tx.commit();
            }
        }
        finally {
            this.tx = this.xa ? null : this.newTransaction();
        }
    }

    @Override
    public void rollback(boolean considerLastMessageAsDelivered) throws Exception {
        this.rollback(false, considerLastMessageAsDelivered);
    }

    private synchronized void rollback(boolean clientFailed, boolean considerLastMessageAsDelivered) throws Exception {
        if (this.tx == null) {
            this.tx = this.newTransaction();
        }
        this.doRollback(clientFailed, considerLastMessageAsDelivered, this.tx);
        this.tx = this.xa ? null : this.newTransaction();
    }

    protected Transaction newTransaction() {
        return this.transactionFactory.newTransaction(null, this.storageManager, this.timeoutSeconds);
    }

    private Transaction newTransaction(Xid xid) {
        return this.transactionFactory.newTransaction(xid, this.storageManager, this.timeoutSeconds);
    }

    @Override
    public synchronized void xaCommit(Xid xid, boolean onePhase) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            String msg = "Cannot commit, session is currently doing work in transaction " + this.tx.getXid();
            throw new ActiveMQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.removeTransaction(xid);
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("XAcommit into " + theTx + ", xid=" + xid));
        }
        if (theTx == null) {
            if (this.resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
                throw new ActiveMQXAException(7, "transaction has been heuristically committed: " + xid);
            }
            if (this.resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
                throw new ActiveMQXAException(6, "transaction has been heuristically rolled back: " + xid);
            }
            if (logger.isTraceEnabled()) {
                logger.trace((Object)("XAcommit into " + theTx + ", xid=" + xid + " cannot find it"));
            }
            throw new ActiveMQXAException(-4, "Cannot find xid in resource manager: " + xid);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            this.resourceManager.putTransaction(xid, theTx);
            throw new ActiveMQXAException(-6, "Cannot commit transaction, it is suspended " + xid);
        }
        theTx.commit(onePhase);
    }

    @Override
    public synchronized void xaEnd(Xid xid) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            if (this.tx.getState() == Transaction.State.SUSPENDED) {
                String msg = "Cannot end, transaction is suspended";
                throw new ActiveMQXAException(-6, "Cannot end, transaction is suspended");
            }
            if (this.tx.getState() == Transaction.State.ROLLEDBACK) {
                String msg = "Cannot end, transaction is rolled back";
                this.tx = null;
                throw new ActiveMQXAException(-6, "Cannot end, transaction is rolled back");
            }
            this.tx = null;
        } else {
            Transaction theTx = this.resourceManager.getTransaction(xid);
            if (theTx == null) {
                String msg = "Cannot find suspended transaction to end " + xid;
                throw new ActiveMQXAException(-4, msg);
            }
            if (theTx.getState() != Transaction.State.SUSPENDED) {
                String msg = "Transaction is not suspended " + xid;
                throw new ActiveMQXAException(-6, msg);
            }
            theTx.resume();
        }
    }

    @Override
    public synchronized void xaForget(Xid xid) throws Exception {
        long id = this.resourceManager.removeHeuristicCompletion(xid);
        if (id != -1L) {
            try {
                this.storageManager.deleteHeuristicCompletion(id);
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
                throw new ActiveMQXAException(-7);
            }
        } else {
            throw new ActiveMQXAException(-4);
        }
    }

    @Override
    public synchronized void xaJoin(Xid xid) throws Exception {
        Transaction theTx = this.resourceManager.getTransaction(xid);
        if (theTx == null) {
            String msg = "Cannot find xid in resource manager: " + xid;
            throw new ActiveMQXAException(-4, msg);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            throw new ActiveMQXAException(-6, "Cannot join tx, it is suspended " + xid);
        }
        this.tx = theTx;
    }

    @Override
    public synchronized void xaResume(Xid xid) throws Exception {
        if (this.tx != null) {
            String msg = "Cannot resume, session is currently doing work in a transaction " + this.tx.getXid();
            throw new ActiveMQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.getTransaction(xid);
        if (theTx == null) {
            String msg = "Cannot find xid in resource manager: " + xid;
            throw new ActiveMQXAException(-4, msg);
        }
        if (theTx.getState() != Transaction.State.SUSPENDED) {
            throw new ActiveMQXAException(-6, "Cannot resume transaction, it is not suspended " + xid);
        }
        this.tx = theTx;
        this.tx.resume();
    }

    @Override
    public synchronized void xaRollback(Xid xid) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            String msg = "Cannot roll back, session is currently doing work in a transaction " + this.tx.getXid();
            throw new ActiveMQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.removeTransaction(xid);
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("xarollback into " + theTx));
        }
        if (theTx == null) {
            if (this.resourceManager.getHeuristicCommittedTransactions().contains(xid)) {
                throw new ActiveMQXAException(7, "transaction has ben heuristically committed: " + xid);
            }
            if (this.resourceManager.getHeuristicRolledbackTransactions().contains(xid)) {
                throw new ActiveMQXAException(6, "transaction has ben heuristically rolled back: " + xid);
            }
            if (logger.isTraceEnabled()) {
                logger.trace((Object)("xarollback into " + theTx + ", xid=" + xid + " forcing a rollback regular"));
            }
            try {
                this.rollback(false);
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
            }
            throw new ActiveMQXAException(-4, "Cannot find xid in resource manager: " + xid);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            if (logger.isTraceEnabled()) {
                logger.trace((Object)("xarollback into " + theTx + " sending tx back as it was suspended"));
            }
            this.resourceManager.putTransaction(xid, this.tx);
            throw new ActiveMQXAException(-6, "Cannot rollback transaction, it is suspended " + xid);
        }
        this.doRollback(false, false, theTx);
    }

    @Override
    public synchronized void xaStart(Xid xid) throws Exception {
        boolean added;
        if (this.tx != null) {
            ActiveMQServerLogger.LOGGER.xidReplacedOnXStart(this.tx.getXid().toString(), xid.toString());
            try {
                if (this.tx.getState() != Transaction.State.PREPARED) {
                    if (this.tx.getXid() != null) {
                        this.resourceManager.removeTransaction(this.tx.getXid());
                    }
                    this.tx.rollback();
                }
            }
            catch (Exception e) {
                logger.debug((Object)"An exception happened while we tried to debug the previous tx, we can ignore this exception", (Throwable)e);
            }
        }
        this.tx = this.newTransaction(xid);
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("xastart into tx= " + this.tx));
        }
        if (!(added = this.resourceManager.putTransaction(xid, this.tx))) {
            String msg = "Cannot start, there is already a xid " + this.tx.getXid();
            throw new ActiveMQXAException(-8, msg);
        }
    }

    @Override
    public synchronized void xaFailed(Xid xid) throws Exception {
        Transaction theTX = this.resourceManager.getTransaction(xid);
        if (theTX == null) {
            theTX = this.newTransaction(xid);
            this.resourceManager.putTransaction(xid, theTX);
        }
        if (theTX.isEffective()) {
            logger.debug((Object)("Client failed with Xid " + xid + " but the server already had it " + (Object)((Object)theTX.getState())));
            this.tx = null;
        } else {
            theTX.markAsRollbackOnly(new ActiveMQException("Can't commit as a Failover happened during the operation"));
            this.tx = theTX;
        }
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("xastart into tx= " + this.tx));
        }
    }

    @Override
    public synchronized void xaSuspend() throws Exception {
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("xasuspend on " + this.tx));
        }
        if (this.tx == null) {
            String msg = "Cannot suspend, session is not doing work in a transaction ";
            throw new ActiveMQXAException(-6, "Cannot suspend, session is not doing work in a transaction ");
        }
        if (this.tx.getState() == Transaction.State.SUSPENDED) {
            String msg = "Cannot suspend, transaction is already suspended " + this.tx.getXid();
            throw new ActiveMQXAException(-6, msg);
        }
        this.tx.suspend();
        this.tx = null;
    }

    @Override
    public synchronized void xaPrepare(Xid xid) throws Exception {
        if (this.tx != null && this.tx.getXid().equals(xid)) {
            String msg = "Cannot commit, session is currently doing work in a transaction " + this.tx.getXid();
            throw new ActiveMQXAException(-6, msg);
        }
        Transaction theTx = this.resourceManager.getTransaction(xid);
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("xaprepare into , xid=" + xid + ", tx= " + this.tx));
        }
        if (theTx == null) {
            String msg = "Cannot find xid in resource manager: " + xid;
            throw new ActiveMQXAException(-4, msg);
        }
        if (theTx.getState() == Transaction.State.SUSPENDED) {
            throw new ActiveMQXAException(-6, "Cannot prepare transaction, it is suspended " + xid);
        }
        if (theTx.getState() == Transaction.State.PREPARED) {
            ActiveMQServerLogger.LOGGER.info("ignoring prepare on xid as already called :" + xid);
        } else {
            theTx.prepare();
        }
    }

    @Override
    public List<Xid> xaGetInDoubtXids() {
        ArrayList<Xid> xids = new ArrayList<Xid>();
        xids.addAll(this.resourceManager.getPreparedTransactions());
        xids.addAll(this.resourceManager.getHeuristicCommittedTransactions());
        xids.addAll(this.resourceManager.getHeuristicRolledbackTransactions());
        return xids;
    }

    @Override
    public int xaGetTimeout() {
        return this.resourceManager.getTimeoutSeconds();
    }

    @Override
    public void xaSetTimeout(int timeout) {
        this.timeoutSeconds = timeout;
        if (this.tx != null) {
            this.tx.setTimeout(timeout);
        }
    }

    @Override
    public void start() {
        this.setStarted(true);
    }

    @Override
    public void stop() {
        this.setStarted(false);
    }

    @Override
    public void waitContextCompletion() {
        try {
            if (!this.context.waitCompletion(10000L)) {
                ActiveMQServerLogger.LOGGER.errorCompletingContext(new Exception("warning"));
            }
        }
        catch (Exception e) {
            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
        }
    }

    @Override
    public void close(final boolean failed) {
        if (this.closed) {
            return;
        }
        this.context.executeOnCompletion(new IOCallback(){

            public void onError(int errorCode, String errorMessage) {
            }

            public void done() {
                try {
                    ServerSessionImpl.this.doClose(failed);
                }
                catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.errorClosingSession(e);
                }
            }
        });
    }

    @Override
    public void closeConsumer(long consumerID) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer != null) {
            consumer.close(false);
        } else {
            ActiveMQServerLogger.LOGGER.cannotFindConsumer(consumerID);
        }
    }

    @Override
    public void receiveConsumerCredits(long consumerID, int credits) throws Exception {
        ServerConsumer consumer = this.consumers.get(consumerID);
        if (consumer == null) {
            logger.debug((Object)("There is no consumer with id " + consumerID));
            return;
        }
        consumer.receiveCredits(credits);
    }

    @Override
    public Transaction getCurrentTransaction() {
        if (this.tx == null) {
            this.tx = this.newTransaction();
        }
        return this.tx;
    }

    @Override
    public void sendLarge(MessageInternal message) throws Exception {
        long id = this.storageManager.generateID();
        LargeServerMessage largeMsg = this.storageManager.createLargeMessage(id, message);
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("sendLarge::" + largeMsg));
        }
        if (this.currentLargeMessage != null) {
            ActiveMQServerLogger.LOGGER.replacingIncompleteLargeMessage(this.currentLargeMessage.getMessageID());
        }
        this.currentLargeMessage = largeMsg;
    }

    @Override
    public void send(ServerMessage message, boolean direct) throws Exception {
        if (!message.isLargeMessage()) {
            long id = this.storageManager.generateID();
            message.setMessageID(id);
            message.encodeMessageIDToBuffer();
        }
        SimpleString address = message.getAddress();
        if (this.defaultAddress == null && address != null) {
            this.defaultAddress = address;
        }
        if (address == null) {
            if (message.isDurable()) {
                message.setAddress(this.defaultAddress);
            } else {
                message.setAddressTransient(this.defaultAddress);
            }
        }
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("send(message=" + message + ", direct=" + direct + ") being called"));
        }
        if (message.getAddress() == null) {
            throw ActiveMQMessageBundle.BUNDLE.noAddress();
        }
        if (message.getAddress().equals((Object)this.managementAddress)) {
            this.handleManagementMessage(message, direct);
        } else {
            this.doSend(message, direct);
        }
    }

    @Override
    public void sendContinuations(int packetSize, long messageBodySize, byte[] body, boolean continues) throws Exception {
        if (this.currentLargeMessage == null) {
            throw ActiveMQMessageBundle.BUNDLE.largeMessageNotInitialised();
        }
        this.currentLargeMessage.addBytes(body);
        if (!continues) {
            this.currentLargeMessage.releaseResources();
            if (messageBodySize >= 0L) {
                this.currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
            }
            this.doSend(this.currentLargeMessage, false);
            this.currentLargeMessage = null;
        }
    }

    @Override
    public void requestProducerCredits(final SimpleString address, final int credits) throws Exception {
        PagingStore store = this.server.getPagingManager().getPageStore(address);
        if (!store.checkMemory(new Runnable(){

            @Override
            public void run() {
                ServerSessionImpl.this.callback.sendProducerCreditsMessage(credits, address);
            }
        })) {
            this.callback.sendProducerCreditsFailMessage(credits, address);
        }
    }

    @Override
    public void setTransferring(boolean transferring) {
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        for (ServerConsumer consumer : consumersClone) {
            consumer.setTransferring(transferring);
        }
    }

    @Override
    public void addMetaData(String key, String data) {
        if (this.metaData == null) {
            this.metaData = new HashMap<String, String>();
        }
        this.metaData.put(key, data);
    }

    @Override
    public boolean addUniqueMetaData(String key, String data) {
        ServerSession sessionWithMetaData = this.server.lookupSession(key, data);
        if (sessionWithMetaData != null && sessionWithMetaData != this) {
            return false;
        }
        this.addMetaData(key, data);
        return true;
    }

    @Override
    public String getMetaData(String key) {
        String data = null;
        if (this.metaData != null) {
            data = this.metaData.get(key);
        }
        if (key.equals("jms-client-id")) {
            this.installJMSHooks();
        }
        return data;
    }

    @Override
    public String[] getTargetAddresses() {
        Map<SimpleString, Pair<UUID, AtomicLong>> copy = this.cloneTargetAddresses();
        Iterator<SimpleString> iter = copy.keySet().iterator();
        int num = copy.keySet().size();
        String[] addresses = new String[num];
        int i = 0;
        while (iter.hasNext()) {
            addresses[i] = iter.next().toString();
            ++i;
        }
        return addresses;
    }

    @Override
    public String getLastSentMessageID(String address) {
        Pair<UUID, AtomicLong> value = this.targetAddressInfos.get(SimpleString.toSimpleString((String)address));
        if (value != null) {
            return ((UUID)value.getA()).toString();
        }
        return null;
    }

    @Override
    public long getCreationTime() {
        return this.creationTime;
    }

    public StorageManager getStorageManager() {
        return this.storageManager;
    }

    @Override
    public void describeProducersInfo(JSONArray array) throws Exception {
        Map<SimpleString, Pair<UUID, AtomicLong>> targetCopy = this.cloneTargetAddresses();
        for (Map.Entry<SimpleString, Pair<UUID, AtomicLong>> entry : targetCopy.entrySet()) {
            JSONObject producerInfo = new JSONObject();
            producerInfo.put("connectionID", (Object)this.getConnectionID().toString());
            producerInfo.put("sessionID", (Object)this.getName());
            producerInfo.put("destination", (Object)entry.getKey().toString());
            producerInfo.put("lastUUIDSent", entry.getValue().getA());
            producerInfo.put("msgSent", ((AtomicLong)entry.getValue().getB()).longValue());
            array.put((Object)producerInfo);
        }
    }

    public String toString() {
        StringBuffer buffer = new StringBuffer();
        if (this.metaData != null) {
            for (Map.Entry<String, String> value : this.metaData.entrySet()) {
                String tmpValue;
                if (buffer.length() != 0) {
                    buffer.append(",");
                }
                if ((tmpValue = value.getValue()) == null || tmpValue.toString().isEmpty()) {
                    buffer.append(value.getKey() + "=*N/A*");
                    continue;
                }
                buffer.append(value.getKey() + "=" + tmpValue);
            }
        }
        return "ServerSessionImpl(" + buffer.toString() + ")";
    }

    public void connectionFailed(ActiveMQException me, boolean failedOver) {
        try {
            ActiveMQServerLogger.LOGGER.clientConnectionFailed(this.name);
            this.close(true);
            ActiveMQServerLogger.LOGGER.clientConnectionFailedClearingSession(this.name);
        }
        catch (Throwable t) {
            ActiveMQServerLogger.LOGGER.errorClosingConnection(this);
        }
    }

    public void connectionFailed(ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
        this.connectionFailed(me, failedOver);
    }

    public void clearLargeMessage() {
        this.currentLargeMessage = null;
    }

    private void installJMSHooks() {
        this.queueCreator = this.server.getJMSQueueCreator();
    }

    private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses() {
        return new HashMap<SimpleString, Pair<UUID, AtomicLong>>(this.targetAddressInfos);
    }

    private void setStarted(boolean s) {
        HashSet<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(this.consumers.values());
        for (ServerConsumer consumer : consumersClone) {
            consumer.setStarted(s);
        }
        this.started = s;
    }

    private void handleManagementMessage(ServerMessage message, boolean direct) throws Exception {
        try {
            this.securityStore.check(message.getAddress(), CheckType.MANAGE, this);
        }
        catch (ActiveMQException e) {
            if (!this.autoCommitSends) {
                this.tx.markAsRollbackOnly(e);
            }
            throw e;
        }
        ServerMessage reply = this.managementService.handleMessage(message);
        SimpleString replyTo = message.getSimpleStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
        if (replyTo != null) {
            reply.setAddress(replyTo);
            this.doSend(reply, direct);
        }
    }

    private void doRollback(boolean clientFailed, boolean lastMessageAsDelived, Transaction theTx) throws Exception {
        boolean wasStarted = this.started;
        ArrayList<MessageReference> toCancel = new ArrayList<MessageReference>();
        for (ServerConsumer consumer : this.consumers.values()) {
            if (wasStarted) {
                consumer.setStarted(false);
            }
            toCancel.addAll(consumer.cancelRefs(clientFailed, lastMessageAsDelived, theTx));
        }
        if (theTx.getState() == Transaction.State.ROLLEDBACK) {
            Transaction newTX = this.newTransaction();
            this.cancelAndRollback(clientFailed, newTX, wasStarted, toCancel);
        } else {
            this.cancelAndRollback(clientFailed, theTx, wasStarted, toCancel);
        }
    }

    private void cancelAndRollback(boolean clientFailed, Transaction theTx, boolean wasStarted, List<MessageReference> toCancel) throws Exception {
        for (MessageReference ref : toCancel) {
            ref.getQueue().cancel(theTx, ref);
        }
        if (wasStarted && !clientFailed) {
            theTx.addOperation(new TransactionOperationAbstract(){

                @Override
                public void afterRollback(Transaction tx) {
                    for (ServerConsumer consumer : ServerSessionImpl.this.consumers.values()) {
                        consumer.setStarted(true);
                    }
                }
            });
        }
        theTx.rollback();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doSend(ServerMessage msg, boolean direct) throws Exception {
        try {
            this.securityStore.check(msg.getAddress(), CheckType.SEND, this);
        }
        catch (ActiveMQException e) {
            if (!this.autoCommitSends && this.tx != null) {
                this.tx.markAsRollbackOnly(e);
            }
            throw e;
        }
        if (this.tx != null && !this.autoCommitSends) {
            this.routingContext.setTransaction(this.tx);
        }
        try {
            this.postOffice.route(msg, this.queueCreator, this.routingContext, direct);
            Pair<UUID, AtomicLong> value = this.targetAddressInfos.get(msg.getAddress());
            if (value == null) {
                this.targetAddressInfos.put(msg.getAddress(), (Pair<UUID, AtomicLong>)new Pair((Object)msg.getUserID(), (Object)new AtomicLong(1L)));
            } else {
                value.setA((Object)msg.getUserID());
                ((AtomicLong)value.getB()).incrementAndGet();
            }
        }
        finally {
            this.routingContext.clear();
        }
    }

    @Override
    public List<MessageReference> getInTXMessagesForConsumer(long consumerId) {
        if (this.tx != null) {
            RefsOperation oper = (RefsOperation)this.tx.getProperty(6);
            if (oper == null) {
                return Collections.emptyList();
            }
            return oper.getListOnConsumer(consumerId);
        }
        return Collections.emptyList();
    }

    private static class DefaultTransactionFactory
    implements TransactionFactory {
        private DefaultTransactionFactory() {
        }

        @Override
        public Transaction newTransaction(Xid xid, StorageManager storageManager, int timeoutSeconds) {
            return new TransactionImpl(xid, storageManager, timeoutSeconds);
        }
    }

    public static class TempQueueCleanerUpper
    implements CloseListener,
    FailureListener {
        private final SimpleString bindingName;
        private final ActiveMQServer server;

        public TempQueueCleanerUpper(ActiveMQServer server, SimpleString bindingName) {
            this.server = server;
            this.bindingName = bindingName;
        }

        private void run() {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("deleting temporary queue " + this.bindingName));
                }
                try {
                    this.server.destroyQueue(this.bindingName, null, false);
                }
                catch (ActiveMQException e) {
                    logger.debug((Object)e.getMessage(), (Throwable)e);
                }
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, this.bindingName);
            }
        }

        public void connectionFailed(ActiveMQException exception, boolean failedOver) {
            this.run();
        }

        public void connectionFailed(ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
            this.connectionFailed(me, failedOver);
        }

        public void connectionClosed() {
            this.run();
        }

        public String toString() {
            return "Temporary Cleaner for queue " + this.bindingName;
        }
    }
}

