/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.openwire.amq;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSessionFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSessionFactory;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.wireformat.WireFormat;

public class AMQSession
implements SessionCallback {
    private AMQServerSession coreSession;
    private ConnectionInfo connInfo;
    private SessionInfo sessInfo;
    private ActiveMQServer server;
    private OpenWireConnection connection;
    private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<Long, AMQConsumer>();
    private Map<Long, AMQProducer> producers = new HashMap<Long, AMQProducer>();
    private AtomicBoolean started = new AtomicBoolean(false);
    private TransactionId txId = null;
    private boolean isTx;
    private final ScheduledExecutorService scheduledPool;
    private OpenWireProtocolManager manager;

    public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, OpenWireConnection connection, ScheduledExecutorService scheduledPool, OpenWireProtocolManager manager) {
        this.connInfo = connInfo;
        this.sessInfo = sessInfo;
        this.server = server;
        this.connection = connection;
        this.scheduledPool = scheduledPool;
        this.manager = manager;
    }

    public void initialize() {
        String name = this.sessInfo.getSessionId().toString();
        String username = this.connInfo.getUserName();
        String password = this.connInfo.getPassword();
        int minLargeMessageSize = Integer.MAX_VALUE;
        try {
            this.coreSession = (AMQServerSession)this.server.createSession(name, username, password, minLargeMessageSize, (RemotingConnection)this.connection, true, false, false, false, null, (SessionCallback)this, (ServerSessionFactory)new AMQServerSessionFactory(), true);
            long sessionId = this.sessInfo.getSessionId().getValue();
            if (sessionId == -1L) {
                this.connection.setAdvisorySession(this);
            }
        }
        catch (Exception e) {
            ActiveMQServerLogger.LOGGER.error((Object)"error init session", (Throwable)e);
        }
    }

    public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception {
        ActiveMQDestination dest = info.getDestination();
        ActiveMQDestination[] dests = null;
        dests = dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest};
        HashMap<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<ActiveMQDestination, AMQConsumer>();
        for (ActiveMQDestination d : dests) {
            if (d.isQueue()) {
                SimpleString queueName = OpenWireUtil.toCoreAddress(d);
                this.getCoreServer().getJMSQueueCreator().create(queueName);
            }
            AMQConsumer consumer = new AMQConsumer(this, d, info, this.scheduledPool);
            consumer.init();
            consumerMap.put(d, consumer);
            this.consumers.put(consumer.getNativeId(), consumer);
        }
        this.connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
        this.coreSession.start();
        this.started.set(true);
    }

    public boolean isWritable(ReadyListener callback) {
        return this.connection.isWritable(callback);
    }

    public void sendProducerCreditsMessage(int credits, SimpleString address) {
    }

    public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
    }

    public int sendMessage(ServerMessage message, ServerConsumer consumerID, int deliveryCount) {
        AMQConsumer consumer = this.consumers.get(consumerID.getID());
        return consumer.handleDeliver(message, deliveryCount);
    }

    public int sendLargeMessage(ServerMessage message, ServerConsumer consumerID, long bodySize, int deliveryCount) {
        return 0;
    }

    public int sendLargeMessageContinuation(ServerConsumer consumerID, byte[] body, boolean continues, boolean requiresResponse) {
        return 0;
    }

    public void closed() {
    }

    public boolean hasCredits(ServerConsumer consumerID) {
        AMQConsumer amqConsumer = this.consumers.get(consumerID.getID());
        return amqConsumer.hasCredits();
    }

    public void disconnect(ServerConsumer consumerId, String queueName) {
    }

    public AMQServerSession getCoreSession() {
        return this.coreSession;
    }

    public ActiveMQServer getCoreServer() {
        return this.server;
    }

    public void removeConsumer(long consumerId) throws Exception {
        boolean failed = this.txId == null && !this.isTx;
        this.coreSession.amqCloseConsumer(consumerId, failed);
        this.consumers.remove(consumerId);
    }

    public void createProducer(ProducerInfo info) throws Exception {
        AMQProducer producer = new AMQProducer(this, info);
        producer.init();
        this.producers.put(info.getProducerId().getValue(), producer);
    }

    public void removeProducer(ProducerInfo info) {
        this.removeProducer(info.getProducerId());
    }

    public void removeProducer(ProducerId id) {
        this.producers.remove(id.getValue());
    }

    public SendingResult send(AMQProducerBrokerExchange producerExchange, Message messageSend, boolean sendProducerAck) throws Exception {
        SendingResult result = new SendingResult();
        TransactionId tid = messageSend.getTransactionId();
        if (tid != null) {
            this.resetSessionTx(tid);
        }
        messageSend.setBrokerInTime(System.currentTimeMillis());
        ActiveMQDestination destination = messageSend.getDestination();
        ActiveMQDestination[] actualDestinations = null;
        actualDestinations = destination.isComposite() ? destination.getCompositeDestinations() : new ActiveMQDestination[]{destination};
        for (ActiveMQDestination dest : actualDestinations) {
            ServerMessageImpl coreMsg = new ServerMessageImpl(-1L, 1024);
            if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(ServerMessage.HDR_DUPLICATE_DETECTION_ID)) {
                coreMsg.putStringProperty(ServerMessage.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
            }
            OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, this.connection.getMarshaller());
            SimpleString address = OpenWireUtil.toCoreAddress(dest);
            coreMsg.setAddress(address);
            PagingStoreImpl store = (PagingStoreImpl)this.server.getPagingManager().getPageStore(address);
            if (store.isFull()) {
                result.setBlockNextSend(true);
                result.setBlockPagingStore(store);
                result.setBlockingAddress(address);
                ScheduledExecutorService scheduler = this.server.getScheduledPool();
                SendRetryTask sendRetryTask = new SendRetryTask((ServerMessage)coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId());
                scheduler.schedule(sendRetryTask, 10L, TimeUnit.MILLISECONDS);
                continue;
            }
            this.coreSession.send((ServerMessage)coreMsg, false);
        }
        return result;
    }

    public WireFormat getMarshaller() {
        return this.connection.getMarshaller();
    }

    public void acknowledge(MessageAck ack, AMQConsumer consumer) throws Exception {
        TransactionId tid = ack.getTransactionId();
        if (tid != null) {
            this.resetSessionTx(ack.getTransactionId());
        }
        consumer.acknowledge(ack);
        if (tid == null && ack.getAckType() == 2) {
            this.coreSession.commit();
        }
    }

    public void resetSessionTx(TransactionId xid) throws Exception {
        if (this.txId != null && !this.txId.equals(xid)) {
            throw new IllegalStateException("Session already associated with a tx");
        }
        this.isTx = true;
        if (this.txId == null) {
            this.txId = xid;
            if (xid.isXATransaction()) {
                XATransactionId xaXid = (XATransactionId)xid;
                this.coreSession.enableXA();
                XidImpl coreXid = new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
                this.coreSession.xaStart((Xid)coreXid);
            } else {
                this.coreSession.enableTx();
            }
            this.manager.registerTx(this.txId, this);
        }
    }

    private void checkTx(TransactionId inId) {
        if (this.txId == null) {
            throw new IllegalStateException("Session has no transaction associated with it");
        }
        if (!this.txId.equals(inId)) {
            throw new IllegalStateException("Session already associated with another tx");
        }
        this.isTx = true;
    }

    public void endTransaction(TransactionInfo info) throws Exception {
        this.checkTx(info.getTransactionId());
        if (this.txId.isXATransaction()) {
            XATransactionId xid = (XATransactionId)this.txId;
            XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
            this.coreSession.xaEnd((Xid)coreXid);
        }
    }

    public void commitOnePhase(TransactionInfo info) throws Exception {
        this.checkTx(info.getTransactionId());
        if (this.txId.isXATransaction()) {
            XATransactionId xid = (XATransactionId)this.txId;
            XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
            this.coreSession.xaCommit((Xid)coreXid, true);
        } else {
            for (AMQConsumer consumer : this.consumers.values()) {
                consumer.finishTx();
            }
            this.coreSession.commit();
        }
        this.txId = null;
    }

    public void prepareTransaction(XATransactionId xid) throws Exception {
        this.checkTx((TransactionId)xid);
        XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
        this.coreSession.xaPrepare((Xid)coreXid);
    }

    public void commitTwoPhase(XATransactionId xid) throws Exception {
        this.checkTx((TransactionId)xid);
        XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
        this.coreSession.xaCommit((Xid)coreXid, false);
        this.txId = null;
    }

    public void rollback(TransactionInfo info) throws Exception {
        this.checkTx(info.getTransactionId());
        if (this.txId.isXATransaction()) {
            XATransactionId xid = (XATransactionId)this.txId;
            XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
            this.coreSession.xaRollback((Xid)coreXid);
        } else {
            Iterator<AMQConsumer> iter = this.consumers.values().iterator();
            HashSet<Long> acked = new HashSet<Long>();
            while (iter.hasNext()) {
                AMQConsumer consumer = iter.next();
                consumer.rollbackTx(acked);
            }
            this.coreSession.amqRollback(acked);
        }
        this.txId = null;
    }

    public void recover(List<TransactionId> recovered) {
        List xids = this.coreSession.xaGetInDoubtXids();
        for (Xid xid : xids) {
            XATransactionId amqXid = new XATransactionId(xid);
            recovered.add((TransactionId)amqXid);
        }
    }

    public void forget(TransactionId tid) throws Exception {
        this.checkTx(tid);
        XATransactionId xid = (XATransactionId)tid;
        XidImpl coreXid = new XidImpl(xid.getBranchQualifier(), xid.getFormatId(), xid.getGlobalTransactionId());
        this.coreSession.xaForget((Xid)coreXid);
        this.txId = null;
    }

    public ConnectionInfo getConnectionInfo() {
        return this.connInfo;
    }

    public void setInternal(boolean internal) {
        this.coreSession.setInternal(internal);
    }

    public boolean isInternal() {
        return this.coreSession.isInternal();
    }

    public void deliverMessage(MessageDispatch dispatch) {
        this.connection.deliverMessage(dispatch);
    }

    public void close() throws Exception {
        this.coreSession.close(false);
    }

    public AMQConsumer getConsumer(Long coreConsumerId) {
        return this.consumers.get(coreConsumerId);
    }

    public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange, SendingResult result) throws IOException {
        long start;
        long nextWarn = start = System.currentTimeMillis();
        producerExchange.blockingOnFlowControl(true);
        AMQConnectionContext context = producerExchange.getConnectionContext();
        PagingStoreImpl store = result.getBlockPagingStore();
        long blockedProducerWarningInterval = 30000L;
        ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId();
        while (store.isFull()) {
            if (context.getStopping().get()) {
                throw new IOException("Connection closed, send aborted.");
            }
            long now = System.currentTimeMillis();
            if (now < nextWarn) continue;
            ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), (now - start) / 1000L);
            nextWarn = now + blockedProducerWarningInterval;
        }
        producerExchange.blockingOnFlowControl(false);
    }

    private class SendRetryTask
    implements Runnable {
        private ServerMessage coreMsg;
        private AMQProducerBrokerExchange producerExchange;
        private boolean sendProducerAck;
        private int msgSize;
        private int commandId;

        public SendRetryTask(ServerMessage coreMsg, AMQProducerBrokerExchange producerExchange, boolean sendProducerAck, int msgSize, int commandId) {
            this.coreMsg = coreMsg;
            this.producerExchange = producerExchange;
            this.sendProducerAck = sendProducerAck;
            this.msgSize = msgSize;
            this.commandId = commandId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            AMQSession aMQSession = AMQSession.this;
            synchronized (aMQSession) {
                try {
                    SimpleString address = this.coreMsg.getAddress();
                    PagingStoreImpl store = (PagingStoreImpl)AMQSession.this.server.getPagingManager().getPageStore(address);
                    if (store.isFull()) {
                        AMQSession.this.server.getScheduledPool().schedule(this, 10L, TimeUnit.MILLISECONDS);
                    } else {
                        AMQSession.this.coreSession.send(this.coreMsg, false);
                        if (this.sendProducerAck) {
                            ProducerInfo producerInfo = this.producerExchange.getProducerState().getInfo();
                            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), this.msgSize);
                            AMQSession.this.connection.dispatchAsync((Command)ack);
                        } else {
                            Response response = new Response();
                            response.setCorrelationId(this.commandId);
                            AMQSession.this.connection.dispatchAsync((Command)response);
                        }
                    }
                }
                catch (Exception e) {
                    ExceptionResponse response = new ExceptionResponse((Throwable)e);
                    response.setCorrelationId(this.commandId);
                    AMQSession.this.connection.dispatchAsync((Command)response);
                }
            }
        }
    }
}

