/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageEOFException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.StreamMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQConnectionDelegate;
import org.apache.qpid.client.AMQConnectionDelegate_0_10;
import org.apache.qpid.client.AMQConnectionDelegate_0_9;
import org.apache.qpid.client.AMQConnectionDelegate_0_91;
import org.apache.qpid.client.AMQConnectionDelegate_8_0;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.AMQProtocolHandler;
import org.apache.qpid.client.AMQQueueSessionAdaptor;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopicSessionAdaptor;
import org.apache.qpid.client.BrokerDetails;
import org.apache.qpid.client.ChannelToSessionMap;
import org.apache.qpid.client.Closeable;
import org.apache.qpid.client.CommonConnection;
import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.client.JmsNotImplementedException;
import org.apache.qpid.client.QpidConnectionMetaData;
import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.security.CallbackHandlerRegistry;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.jms.Session;
import org.apache.qpid.jndi.ObjectFactory;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.url.URLSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQConnection
extends Closeable
implements CommonConnection,
Referenceable {
    public static final String JNDI_ADDRESS_CONNECTION_URL = "connectionURL";
    private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
    private static final AtomicLong CONN_NUMBER_GENERATOR;
    private static final long DEFAULT_CLOSE_TIMEOUT;
    private final long _connectionNumber = CONN_NUMBER_GENERATOR.incrementAndGet();
    private final Object _failoverMutex = new Object();
    private final Object _sessionCreationLock = new Object();
    private long _maximumChannelCount;
    private long _maximumFrameSize;
    private AMQProtocolHandler _protocolHandler;
    private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
    private String _clientName;
    private String _username;
    private String _password;
    private String _virtualHost;
    private volatile ExceptionListener _exceptionListener;
    private ConnectionListener _connectionListener;
    private final ConnectionURL _connectionURL;
    private volatile boolean _started;
    private FailoverPolicy _failoverPolicy;
    private boolean _connected;
    private boolean _connectionAttempted;
    private QpidConnectionMetaData _connectionMetaData;
    private String _defaultTopicExchangeName = "amq.topic";
    private String _defaultQueueExchangeName = "amq.direct";
    private String _temporaryTopicExchangeName = "amq.topic";
    private String _temporaryQueueExchangeName = "amq.direct";
    private final ScheduledExecutorService _taskPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "Connection_" + AMQConnection.this._connectionNumber + "_task");
            if (!thread.isDaemon()) {
                thread.setDaemon(true);
            }
            return thread;
        }
    });
    private AMQConnectionDelegate _delegate;
    private int _maxPrefetch;
    private boolean _syncPersistence;
    private boolean _syncAck;
    private String _syncPublish = "";
    private boolean _populateUserId = true;
    private boolean _useLegacyMapMessageFormat;
    private boolean _useLegacyStreamMessageFormat;
    private final boolean _validateQueueOnSend;
    private volatile long _lastFailoverTime = 0L;
    private boolean _compressMessages;
    private int _messageCompressionThresholdSize;
    private final Map<String, String> _virtualHostProperties = new HashMap<String, String>();
    private volatile boolean _virtualHostPropertiesPopulated;
    private ConnectionSettings _connectionSettings;
    private final ConcurrentMap<String, KeyStore> _brokerTrustStores = new ConcurrentHashMap<String, KeyStore>();
    private Session _brokerTrustStoreSession;

    public AMQConnection(String broker, String username, String password, String clientName, String virtualHost) throws QpidException, URLSyntaxException {
        this(new AMQConnectionURL("amqp://" + username + ":" + password + "@" + (clientName == null ? "" : clientName) + "/" + virtualHost + "?brokerlist='" + BrokerDetails.checkTransport(broker) + "'"));
    }

    public AMQConnection(String host, int port, String username, String password, String clientName, String virtualHost) throws QpidException, URLSyntaxException {
        this(new AMQConnectionURL("amqp://" + username + ":" + password + "@" + (clientName == null ? "" : clientName) + virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"));
    }

    public AMQConnection(String connection) throws QpidException, URLSyntaxException {
        this(new AMQConnectionURL(connection));
    }

    public AMQConnection(ConnectionURL connectionURL) throws QpidException {
        if (connectionURL == null) {
            throw new IllegalArgumentException("Connection must be specified");
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("Connection(" + this._connectionNumber + "):" + connectionURL);
        }
        this._maxPrefetch = connectionURL.getOption("maxprefetch") != null ? Integer.parseInt(connectionURL.getOption("maxprefetch")) : Integer.parseInt(System.getProperties().getProperty("max_prefetch", "500"));
        if (connectionURL.getOption("sync_persistence") != null) {
            this._syncPersistence = Boolean.parseBoolean(connectionURL.getOption("sync_persistence"));
            _logger.warn("sync_persistence is a deprecated property, please use sync_publish={persistent|all} instead");
        } else {
            this._syncPersistence = Boolean.getBoolean("sync_persistence");
            if (this._syncPersistence) {
                _logger.warn("sync_persistence is a deprecated property, please use sync_publish={persistent|all} instead");
            }
        }
        this._syncAck = connectionURL.getOption("sync_ack") != null ? Boolean.parseBoolean(connectionURL.getOption("sync_ack")) : Boolean.getBoolean("sync_ack");
        this._syncPublish = connectionURL.getOption("sync_publish") != null ? connectionURL.getOption("sync_publish") : System.getProperty("sync_publish", this._syncPublish);
        if (connectionURL.getOption("populateJMSXUserID") != null) {
            this._populateUserId = Boolean.parseBoolean(connectionURL.getOption("populateJMSXUserID"));
        }
        this._useLegacyMapMessageFormat = connectionURL.getOption("use_legacy_map_msg_format") != null ? Boolean.parseBoolean(connectionURL.getOption("use_legacy_map_msg_format")) : Boolean.getBoolean("qpid.use_legacy_map_message");
        this._useLegacyStreamMessageFormat = connectionURL.getOption("use_legacy_stream_msg_format") != null ? Boolean.parseBoolean(connectionURL.getOption("use_legacy_stream_msg_format")) : (System.getProperty("qpid.use_legacy_stream_message") == null ? true : Boolean.getBoolean("qpid.use_legacy_stream_message"));
        this._validateQueueOnSend = connectionURL.getOption("verifyQueueOnSend") != null ? Boolean.parseBoolean(connectionURL.getOption("verifyQueueOnSend")) : Boolean.parseBoolean(System.getProperty("qpid.verify_queue_on_send", "false"));
        this._compressMessages = connectionURL.getOption("compressMessages") != null ? Boolean.parseBoolean(connectionURL.getOption("compressMessages")) : Boolean.parseBoolean(System.getProperty("qpid.connection_compress_messages", String.valueOf(false)));
        this._messageCompressionThresholdSize = connectionURL.getOption("messageCompressionThresholdSize") != null ? Integer.valueOf(connectionURL.getOption("messageCompressionThresholdSize")).intValue() : Integer.getInteger("qpid.message_compression_threshold_size", 102400).intValue();
        if (this._messageCompressionThresholdSize <= 0) {
            this._messageCompressionThresholdSize = Integer.MAX_VALUE;
        }
        String amqpVersion = System.getProperty("qpid.amqp.version", "0-10");
        if (_logger.isDebugEnabled()) {
            _logger.debug("AMQP version " + amqpVersion);
        }
        this._failoverPolicy = new FailoverPolicy(connectionURL, this);
        this._delegate = "0-8".equals(amqpVersion) ? new AMQConnectionDelegate_8_0(this) : ("0-9".equals(amqpVersion) ? new AMQConnectionDelegate_0_9(this) : ("0-91".equals(amqpVersion) || "0-9-1".equals(amqpVersion) ? new AMQConnectionDelegate_0_91(this) : new AMQConnectionDelegate_0_10(this)));
        this._connectionURL = connectionURL;
        this._clientName = connectionURL.getClientName();
        this._username = connectionURL.getUsername();
        this._password = connectionURL.getPassword();
        this.setVirtualHost(connectionURL.getVirtualHost());
        if (connectionURL.getDefaultQueueExchangeName() != null) {
            this._defaultQueueExchangeName = connectionURL.getDefaultQueueExchangeName();
        }
        if (connectionURL.getDefaultTopicExchangeName() != null) {
            this._defaultTopicExchangeName = connectionURL.getDefaultTopicExchangeName();
        }
        if (connectionURL.getTemporaryQueueExchangeName() != null) {
            this._temporaryQueueExchangeName = connectionURL.getTemporaryQueueExchangeName();
        }
        if (connectionURL.getTemporaryTopicExchangeName() != null) {
            this._temporaryTopicExchangeName = connectionURL.getTemporaryTopicExchangeName();
        }
        this._protocolHandler = new AMQProtocolHandler(this);
        if (_logger.isDebugEnabled()) {
            _logger.debug("Connecting with ProtocolHandler Version:" + this._protocolHandler.getProtocolVersion());
        }
        this.setConnected(false);
        if (this._clientName != null) {
            this.makeConnection();
        }
        this._connectionMetaData = new QpidConnectionMetaData();
    }

    private void makeConnection() throws QpidException {
        this._connectionAttempted = true;
        if (this._clientName == null) {
            try {
                InetAddress addr = InetAddress.getLocalHost();
                this._clientName = addr.getHostName() + System.currentTimeMillis();
            }
            catch (UnknownHostException e) {
                this._clientName = "UnknownHost" + UUID.randomUUID();
            }
        }
        BrokerDetails brokerDetails = this._failoverPolicy.getCurrentBrokerDetails();
        boolean retryAllowed = true;
        Exception connectionException = null;
        while (!this.isConnected() && retryAllowed && brokerDetails != null) {
            ProtocolVersion pe = null;
            try {
                pe = this.makeBrokerConnection(brokerDetails);
            }
            catch (Exception e) {
                if (_logger.isInfoEnabled()) {
                    _logger.info("Unable to connect to broker at " + this._failoverPolicy.getCurrentBrokerDetails(), (Throwable)e);
                }
                connectionException = e;
            }
            if (pe != null) {
                this.initDelegate(pe);
                continue;
            }
            if (this.isConnected()) continue;
            if (connectionException instanceof ConnectionRedirectException) {
                ConnectionRedirectException redirect = (ConnectionRedirectException)((Object)connectionException);
                retryAllowed = true;
                brokerDetails = new BrokerDetails(brokerDetails);
                brokerDetails.setHost(redirect.getHost());
                brokerDetails.setPort(redirect.getPort());
                this._protocolHandler.setStateManager(new AMQStateManager(this._protocolHandler.getProtocolSession()));
                continue;
            }
            retryAllowed = this._failoverPolicy.failoverAllowed();
            brokerDetails = this._failoverPolicy.getNextBrokerDetails();
            this._protocolHandler.setStateManager(new AMQStateManager(this._protocolHandler.getProtocolSession()));
        }
        this.verifyClientID();
        if (_logger.isDebugEnabled()) {
            _logger.debug("Are we connected:" + this.isConnected());
        }
        if (!this.isConnected()) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Last attempted ProtocolHandler Version:" + this._protocolHandler.getProtocolVersion());
            }
            String message = null;
            if (connectionException != null) {
                message = connectionException.getCause() != null ? connectionException.getCause().getMessage() : connectionException.getMessage();
            }
            if (message == null) {
                message = "Unable to Connect";
            } else if ("".equals(message)) {
                message = "Unable to Connect:" + connectionException.getClass();
            }
            for (Throwable th = connectionException; th != null; th = th.getCause()) {
                if (!(th instanceof UnresolvedAddressException) && !(th instanceof UnknownHostException)) continue;
                throw new AMQUnresolvedAddressException(message, this._failoverPolicy.getCurrentBrokerDetails().toString(), (Throwable)connectionException);
            }
            throw new AMQConnectionFailureException(message, (Throwable)connectionException);
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("Connected with ProtocolHandler Version:" + this._protocolHandler.getProtocolVersion());
        }
        this._sessions.setMaxChannelID(this._delegate.getMaxChannelID());
        this._sessions.setMinChannelID(this._delegate.getMinChannelID());
    }

    private void initDelegate(ProtocolVersion pe) throws AMQProtocolException {
        try {
            String delegateClassName = String.format("org.apache.qpid.client.AMQConnectionDelegate_%s_%s", pe.getMajorVersion(), pe.getMinorVersion());
            if (_logger.isDebugEnabled()) {
                _logger.debug("Looking up delegate '" + delegateClassName + "' Based on PE:" + pe);
            }
            Class<?> c = Class.forName(delegateClassName);
            Class[] partypes = new Class[]{AMQConnection.class};
            this._delegate = (AMQConnectionDelegate)c.getConstructor(partypes).newInstance(this);
            if (!ProtocolVersion.v0_10.equals((Object)this._delegate.getProtocolVersion())) {
                this._protocolHandler.getProtocolSession().setProtocolVersion(this._delegate.getProtocolVersion());
            }
            this._protocolHandler.getStateManager().clearLastException();
            this._protocolHandler.getStateManager().changeState(AMQState.CONNECTION_NOT_STARTED);
        }
        catch (ClassNotFoundException e) {
            throw new AMQProtocolException(AMQConstant.UNSUPPORTED_CLIENT_PROTOCOL_ERROR, String.format("Protocol: %s.%s is required by the broker but is not currently supported by this client library implementation", pe.getMajorVersion(), pe.getMinorVersion()), (Throwable)e);
        }
        catch (NoSuchMethodException e) {
            throw new RuntimeException("unable to locate constructor for delegate", e);
        }
        catch (InstantiationException e) {
            throw new RuntimeException("error instantiating delegate", e);
        }
        catch (IllegalAccessException e) {
            throw new RuntimeException("error accessing delegate", e);
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException("error invoking delegate", e);
        }
    }

    private void setVirtualHost(String virtualHost) {
        if (virtualHost != null && virtualHost.startsWith("/")) {
            virtualHost = virtualHost.substring(1);
        }
        this._virtualHost = virtualHost;
    }

    public boolean attemptReconnection(String host, int port, boolean useFailoverConfigOnFailure) {
        BrokerDetails bd = new BrokerDetails(this._failoverPolicy.getCurrentBrokerDetails());
        bd.setHost(host);
        bd.setPort(port);
        this._failoverPolicy.setBroker(bd);
        try {
            this.makeBrokerConnection(bd);
            return true;
        }
        catch (Exception e) {
            if (_logger.isInfoEnabled()) {
                _logger.info("Unable to connect to broker at " + bd);
            }
            return useFailoverConfigOnFailure && this.attemptReconnection();
        }
    }

    public boolean attemptReconnection() {
        BrokerDetails broker;
        while (!this.isClosed() && !this.isClosing() && this._failoverPolicy.failoverAllowed() && (broker = this._failoverPolicy.getNextBrokerDetails()) != null) {
            if (!this.attemptConnection(broker)) continue;
            return true;
        }
        return false;
    }

    private boolean attemptConnection(BrokerDetails broker) {
        try {
            this.makeBrokerConnection(broker);
            return true;
        }
        catch (Exception e) {
            if (!(e instanceof QpidException)) {
                if (_logger.isInfoEnabled()) {
                    _logger.info("Unable to connect to broker at " + this._failoverPolicy.getCurrentBrokerDetails(), (Throwable)e);
                }
            } else if (_logger.isInfoEnabled()) {
                _logger.info(e.getMessage() + ":Unable to connect to broker at " + this._failoverPolicy.getCurrentBrokerDetails());
            }
            return false;
        }
    }

    public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, QpidException {
        return this._delegate.makeBrokerConnection(brokerDetail);
    }

    public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T, E> operation) throws E {
        return this._delegate.executeRetrySupport(operation);
    }

    public BrokerDetails getActiveBrokerDetails() {
        return this._failoverPolicy.getCurrentBrokerDetails();
    }

    public boolean failoverAllowed() {
        if (!this.isConnected()) {
            return false;
        }
        return this._failoverPolicy.failoverAllowed();
    }

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        return this.createSession(transacted, acknowledgeMode, this._maxPrefetch);
    }

    @Override
    public Session createSession(boolean transacted, int acknowledgeMode, int prefetch) throws JMSException {
        return this.createSession(transacted, acknowledgeMode, prefetch, prefetch);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow) throws JMSException {
        Object object = this._sessionCreationLock;
        synchronized (object) {
            this.checkNotClosed();
            if (!this._connectionAttempted) {
                try {
                    this.makeConnection();
                }
                catch (QpidException e) {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Unable to establish connection"), e);
                }
            }
            if (this._delegate.isVirtualHostPropertiesSupported() && !this._virtualHostPropertiesPopulated) {
                this.retrieveVirtualHostPropertiesIfNecessary();
            }
            return this._delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void retrieveVirtualHostPropertiesIfNecessary() throws JMSException {
        Map<String, String> map = this._virtualHostProperties;
        synchronized (map) {
            if (!this._virtualHostPropertiesPopulated) {
                Session session = this._delegate.createSession(false, 257, 3, 3);
                MessageConsumer consumer = session.createConsumer((Destination)session.createQueue("ADDR: $virtualhostProperties; {assert: never, create: never, node:{ type: queue }}"));
                try {
                    ((AMQSession)session).start();
                }
                catch (QpidException e) {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Failed to retrieve virtual host properties"), e);
                }
                Message propertiesMessage = consumer.receive(this.getProtocolHandler().getDefaultTimeout());
                if (propertiesMessage != null) {
                    for (String property : Collections.list(propertiesMessage.getPropertyNames())) {
                        this._virtualHostProperties.put(property, propertiesMessage.getStringProperty(property));
                    }
                }
                session.close();
                this._virtualHostPropertiesPopulated = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public KeyStore getBrokerSuppliedTrustStore(String name) throws JMSException {
        ConcurrentMap<String, KeyStore> concurrentMap = this._brokerTrustStores;
        synchronized (concurrentMap) {
            if (!this._brokerTrustStores.containsKey(name)) {
                MessageConsumer consumer;
                Message message;
                if (this._brokerTrustStoreSession == null) {
                    this._brokerTrustStoreSession = this._delegate.createSession(false, 1, 1, 1);
                    try {
                        ((AMQSession)this._brokerTrustStoreSession).start();
                    }
                    catch (QpidException e) {
                        throw JMSExceptionHelper.chainJMSException(new JMSException("Failed to retrieve virtual host properties"), e);
                    }
                }
                if ((message = (consumer = this._brokerTrustStoreSession.createConsumer((Destination)this._brokerTrustStoreSession.createQueue("ADDR: " + name + "; {assert: never, create: never, node:{ type: queue }}"))).receive(2000L)) != null) {
                    StreamMessage streamMessage = (StreamMessage)message;
                    ArrayList<X509Certificate> certs = new ArrayList<X509Certificate>();
                    try {
                        try {
                            byte[] bytes;
                            CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
                            while ((bytes = (byte[])streamMessage.readObject()) != null) {
                                certs.add((X509Certificate)certFactory.generateCertificate(new ByteArrayInputStream(bytes)));
                            }
                        }
                        catch (MessageEOFException e) {
                            // empty catch block
                        }
                        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
                        char[] encryptionTrustStorePassword = this.getConnectionSettings().getEncryptionTrustStorePassword() == null ? null : this.getConnectionSettings().getEncryptionTrustStorePassword().toCharArray();
                        keyStore.load(null, encryptionTrustStorePassword);
                        int i = 1;
                        for (X509Certificate cert : certs) {
                            keyStore.setCertificateEntry(String.valueOf(i++), cert);
                        }
                        this._brokerTrustStores.put(name, keyStore);
                    }
                    catch (IOException | GeneralSecurityException | JMSException e) {
                        _logger.error(e.getMessage(), e);
                    }
                }
            }
            return (KeyStore)this._brokerTrustStores.get(name);
        }
    }

    public void setFailoverPolicy(FailoverPolicy policy) {
        this._failoverPolicy = policy;
    }

    public FailoverPolicy getFailoverPolicy() {
        return this._failoverPolicy;
    }

    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
        return new AMQQueueSessionAdaptor(this.createSession(transacted, acknowledgeMode));
    }

    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
        return new AMQTopicSessionAdaptor(this.createSession(transacted, acknowledgeMode));
    }

    public boolean channelLimitReached() {
        return (long)this._sessions.size() >= this._maximumChannelCount;
    }

    public String getClientID() throws JMSException {
        this.checkNotClosed();
        return this._clientName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setClientID(String clientID) throws JMSException {
        this.checkNotClosed();
        Object object = this._sessionCreationLock;
        synchronized (object) {
            if (this._connectionAttempted) {
                if (!Boolean.getBoolean("ignore_setclientID")) {
                    throw new IllegalStateException("Client name cannot be changed after being set");
                }
                _logger.info("Operation setClientID is ignored using ID: " + this.getClientID());
            } else {
                this._clientName = clientID;
            }
        }
    }

    public ConnectionMetaData getMetaData() throws JMSException {
        this.checkNotClosed();
        return this._connectionMetaData;
    }

    protected final ExceptionListener getExceptionListenerNoCheck() {
        return this._exceptionListener;
    }

    public ExceptionListener getExceptionListener() throws JMSException {
        this.checkNotClosed();
        return this.getExceptionListenerNoCheck();
    }

    public void setExceptionListener(ExceptionListener listener) throws JMSException {
        this.checkNotClosed();
        this._exceptionListener = listener;
    }

    public void start() throws JMSException {
        this.checkNotClosed();
        if (!this._started) {
            this._started = true;
            for (AMQSession s : this._sessions.values()) {
                try {
                    s.start();
                }
                catch (QpidException e) {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.start failed"), e);
                }
            }
        }
    }

    public void stop() throws JMSException {
        this.checkNotClosed();
        if (this._started) {
            Iterator<AMQSession> i = this._sessions.values().iterator();
            while (i.hasNext()) {
                try {
                    i.next().stop();
                }
                catch (QpidException e) {
                    throw JMSExceptionHelper.chainJMSException(new JMSException("Connection.stop failed."), e);
                }
            }
            this._started = false;
        }
    }

    @Override
    public void close() throws JMSException {
        this.close(DEFAULT_CLOSE_TIMEOUT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(long timeout) throws JMSException {
        boolean closed;
        Object object = this._sessionCreationLock;
        synchronized (object) {
            closed = this.setClosed();
        }
        if (!closed) {
            ArrayList<AMQSession> sessions = new ArrayList<AMQSession>(this._sessions.values());
            this.setClosing(true);
            try {
                this.doClose(sessions, timeout);
            }
            finally {
                this.setClosing(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doClose(List<AMQSession> sessions, long timeout) throws JMSException {
        if (!sessions.isEmpty()) {
            AMQSession session = sessions.remove(0);
            session.lockMessageDelivery();
            try {
                this.doClose(sessions, timeout);
            }
            finally {
                session.unlockMessageDelivery();
            }
        }
        Object object = this.getFailoverMutex();
        synchronized (object) {
            try {
                try {
                    this.closeAllSessions(null, timeout);
                }
                finally {
                    this.shutdownTaskPool();
                }
            }
            catch (JMSException e) {
                _logger.error("Error closing connection", (Throwable)e);
                throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + (Object)((Object)e)), e);
            }
            finally {
                try {
                    this._delegate.closeConnection(timeout);
                }
                catch (Exception e) {
                    _logger.warn("Error closing underlying protocol connection", (Throwable)e);
                }
            }
        }
    }

    private void shutdownTaskPool() {
        this._taskPool.shutdown();
    }

    private void markAllSessionsClosed() {
        LinkedList<AMQSession> sessionCopy = new LinkedList<AMQSession>(this._sessions.values());
        for (AMQSession session : sessionCopy) {
            session.markClosed();
        }
        this._sessions.clear();
    }

    private void closeAllSessions(Throwable cause, long timeout) throws JMSException {
        LinkedList<AMQSession> sessionCopy = new LinkedList<AMQSession>(this._sessions.values());
        Iterator it = sessionCopy.iterator();
        JMSException sessionException = null;
        while (it.hasNext()) {
            AMQSession session = (AMQSession)it.next();
            if (cause != null) {
                session.closed(cause);
                continue;
            }
            try {
                session.close(timeout);
            }
            catch (JMSException e) {
                _logger.error("Error closing session: " + (Object)((Object)e));
                sessionException = e;
            }
        }
        this._sessions.clear();
        if (sessionException != null) {
            throw sessionException;
        }
    }

    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        throw new JmsNotImplementedException();
    }

    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        throw new JmsNotImplementedException();
    }

    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        throw new JmsNotImplementedException();
    }

    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
        this.checkNotClosed();
        throw new JmsNotImplementedException();
    }

    @Override
    public long getMaximumChannelCount() throws JMSException {
        this.checkNotClosed();
        return this._maximumChannelCount;
    }

    @Override
    public void setConnectionListener(ConnectionListener listener) {
        this._connectionListener = listener;
    }

    @Override
    public ConnectionListener getConnectionListener() {
        return this._connectionListener;
    }

    public void setMaximumChannelCount(long maximumChannelCount) {
        this._maximumChannelCount = maximumChannelCount;
    }

    public void setMaximumFrameSize(long frameMax) {
        this._maximumFrameSize = frameMax;
    }

    public long getMaximumFrameSize() {
        return this._maximumFrameSize;
    }

    public ChannelToSessionMap getSessions() {
        return this._sessions;
    }

    public String getUsername() {
        return this._username;
    }

    public void setUsername(String id) {
        this._username = id;
    }

    public String getPassword() {
        return this._password;
    }

    public String getVirtualHost() {
        return this._virtualHost;
    }

    public final AMQProtocolHandler getProtocolHandler() {
        return this._protocolHandler;
    }

    public final boolean started() {
        return this._started;
    }

    public final boolean isConnected() {
        return this._connected;
    }

    protected final void setConnected(boolean connected) {
        this._connected = connected;
    }

    public void bytesSent(long writtenBytes) {
        if (this._connectionListener != null) {
            this._connectionListener.bytesSent(writtenBytes);
        }
    }

    public void bytesReceived(long receivedBytes) {
        if (this._connectionListener != null) {
            this._connectionListener.bytesReceived(receivedBytes);
        }
    }

    public boolean firePreFailover(boolean redirect) {
        this._lastFailoverTime = System.currentTimeMillis();
        boolean proceed = true;
        if (this._connectionListener != null) {
            proceed = this._connectionListener.preFailover(redirect);
        }
        return proceed;
    }

    public boolean firePreResubscribe() throws JMSException {
        if (this._connectionListener != null) {
            boolean resubscribe = this._connectionListener.preResubscribe();
            if (!resubscribe) {
                this.markAllSessionsClosed();
            }
            return resubscribe;
        }
        return true;
    }

    public void fireFailoverComplete() {
        if (this._connectionListener != null) {
            this._connectionListener.failoverComplete();
        }
    }

    public final Object getFailoverMutex() {
        return this._failoverMutex;
    }

    public void resubscribeSessions() throws JMSException, QpidException, FailoverException {
        this._delegate.resubscribeSessions();
    }

    public void blockUntilNotFailingOver() throws InterruptedException {
        this._protocolHandler.blockUntilNotFailingOver();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void exceptionReceived(Throwable cause) {
        if (_logger.isDebugEnabled()) {
            _logger.debug("exceptionReceived done by:" + Thread.currentThread().getName(), cause);
        }
        JMSException je = this.convertToJMSException(cause);
        try {
            if (this.hardError(cause)) {
                this.closeSessions(cause);
            }
        }
        finally {
            this.deliverJMSExceptionToExceptionListenerOrLog(je, cause);
        }
    }

    private JMSException convertToJMSException(Throwable cause) {
        JMSException je;
        if (cause instanceof JMSException) {
            je = (JMSException)cause;
        } else {
            AMQConstant code = null;
            if (cause instanceof AMQException) {
                code = ((AMQException)cause).getErrorCode();
            }
            if (code != null) {
                je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against " + this.toString() + ": " + cause, Integer.toString(code.getCode())), cause);
            } else {
                Exception last;
                if (cause instanceof AMQDisconnectedException && (last = this._protocolHandler.getStateManager().getLastException()) != null) {
                    _logger.info("StateManager had an exception for us to use a cause of our Disconnected Exception");
                    cause = last;
                }
                je = JMSExceptionHelper.chainJMSException(new JMSException("Exception thrown against " + this.toString() + ": " + cause), cause);
            }
        }
        return je;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closed(Throwable cause) {
        _logger.debug("Closing closed connection {} ", (Object)this.toString());
        JMSException je = this.convertToJMSException(cause);
        try {
            boolean performClose;
            this._protocolHandler.getProtocolSession().notifyError((Exception)((Object)je));
            boolean bl = performClose = !this.setClosed();
            if (performClose) {
                this.closeSessions(cause);
            }
        }
        finally {
            this.deliverJMSExceptionToExceptionListenerOrLog(je, cause);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeSessions(Throwable cause) {
        Object object = this.getFailoverMutex();
        synchronized (object) {
            try {
                this.closeAllSessions(cause, -1L);
            }
            catch (JMSException e) {
                _logger.error("Error closing all sessions: " + (Object)((Object)e), (Throwable)e);
            }
        }
    }

    private void deliverJMSExceptionToExceptionListenerOrLog(final JMSException je, Throwable cause) {
        final ExceptionListener exceptionListener = this.getExceptionListenerNoCheck();
        if (exceptionListener != null) {
            this.performConnectionTask(new Runnable(){

                @Override
                public void run() {
                    try {
                        exceptionListener.onException(je);
                    }
                    catch (RuntimeException e) {
                        _logger.error("Exception occurred in ExceptionListener", (Throwable)e);
                    }
                }
            });
        } else {
            _logger.error("Throwable Received but no listener set: " + cause);
        }
    }

    private boolean hardError(Throwable cause) {
        if (cause instanceof AMQException) {
            return ((AMQException)cause).isHardError();
        }
        return true;
    }

    void registerSession(int channelId, AMQSession session) {
        this._sessions.put(channelId, session);
    }

    public void deregisterSession(int channelId) {
        this._sessions.remove(channelId);
    }

    public String toString() {
        StringBuffer buf = new StringBuffer("AMQConnection:\n");
        if (this._failoverPolicy.getCurrentBrokerDetails() == null) {
            buf.append("No active broker connection");
        } else {
            BrokerDetails bd = this._failoverPolicy.getCurrentBrokerDetails();
            buf.append("Host: ").append(String.valueOf(bd.getHost()));
            buf.append("\nPort: ").append(String.valueOf(bd.getPort()));
        }
        buf.append("\nVirtual Host: ").append(String.valueOf(this._virtualHost));
        buf.append("\nClient ID: ").append(String.valueOf(this._clientName));
        buf.append("\nActive session count: ").append(this._sessions == null ? 0 : this._sessions.size());
        return buf.toString();
    }

    public ConnectionURL getConnectionURL() {
        return this._connectionURL;
    }

    public String toURL() {
        return this._connectionURL.toString();
    }

    @Override
    public Reference getReference() throws NamingException {
        return new Reference(AMQConnection.class.getName(), new StringRefAddr(JNDI_ADDRESS_CONNECTION_URL, this.toURL()), ObjectFactory.class.getName(), null);
    }

    public String getDefaultTopicExchangeName() {
        return this._defaultTopicExchangeName;
    }

    public void setDefaultTopicExchangeName(String defaultTopicExchangeName) {
        this._defaultTopicExchangeName = defaultTopicExchangeName;
    }

    public String getDefaultQueueExchangeName() {
        return this._defaultQueueExchangeName;
    }

    public void setDefaultQueueExchangeName(String defaultQueueExchangeName) {
        this._defaultQueueExchangeName = defaultQueueExchangeName;
    }

    public String getTemporaryTopicExchangeName() {
        return this._temporaryTopicExchangeName;
    }

    public String getTemporaryQueueExchangeName() {
        return this._temporaryQueueExchangeName;
    }

    public void setTemporaryTopicExchangeName(String temporaryTopicExchangeName) {
        this._temporaryTopicExchangeName = temporaryTopicExchangeName;
    }

    public void setTemporaryQueueExchangeName(String temporaryQueueExchangeName) {
        this._temporaryQueueExchangeName = temporaryQueueExchangeName;
    }

    public void performConnectionTask(Runnable task) {
        block2: {
            try {
                this._taskPool.execute(task);
            }
            catch (RejectedExecutionException e) {
                if (this.isClosed() || this.isClosing()) break block2;
                throw e;
            }
        }
    }

    ScheduledFuture<?> scheduleTask(Runnable task, long initialDelay, long period, TimeUnit timeUnit) {
        return this._taskPool.scheduleAtFixedRate(task, initialDelay, period, timeUnit);
    }

    public AMQSession getSession(int channelId) {
        return this._sessions.get(channelId);
    }

    public ProtocolVersion getProtocolVersion() {
        return this._delegate.getProtocolVersion();
    }

    public String getBrokerUUID() {
        if (this.getProtocolVersion().equals((Object)ProtocolVersion.v0_10)) {
            return ((AMQConnectionDelegate_0_10)this._delegate).getUUID();
        }
        return null;
    }

    boolean isSupportedServerFeature(String featureName) {
        return this._delegate.isSupportedServerFeature(featureName);
    }

    public boolean isFailingOver() {
        return this._protocolHandler.getFailoverLatch() != null;
    }

    public long getMaxPrefetch() {
        return this._maxPrefetch;
    }

    public boolean getSyncPersistence() {
        return this._syncPersistence;
    }

    public boolean getSyncAck() {
        return this._syncAck;
    }

    public String getSyncPublish() {
        return this._syncPublish;
    }

    public boolean isPopulateUserId() {
        return this._populateUserId;
    }

    public boolean isMessageCompressionDesired() {
        return this._compressMessages;
    }

    public int getNextChannelID() {
        return this._sessions.getNextChannelId();
    }

    public boolean isUseLegacyMapMessageFormat() {
        return this._useLegacyMapMessageFormat;
    }

    public boolean isUseLegacyStreamMessageFormat() {
        return this._useLegacyStreamMessageFormat;
    }

    private void verifyClientID() throws QpidException {
        if (Boolean.getBoolean("qpid.verify_client_id")) {
            try {
                if (!this._delegate.verifyClientID()) {
                    throw new AMQException(AMQConstant.ALREADY_EXISTS, "ClientID must be unique");
                }
            }
            catch (JMSException e) {
                throw new QpidException(e.getMessage(), (Throwable)e);
            }
        }
    }

    public long getLastFailoverTime() {
        return this._lastFailoverTime;
    }

    protected AMQConnectionDelegate getDelegate() {
        return this._delegate;
    }

    public Long getConnectionNumber() {
        return this._connectionNumber;
    }

    protected void logConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
        if (_logger.isInfoEnabled()) {
            _logger.info("Connection " + this._connectionNumber + " now connected from " + localAddress + " to " + remoteAddress);
        }
    }

    void setHeartbeatListener(HeartbeatListener listener) {
        this._delegate.setHeartbeatListener(listener);
    }

    public boolean validateQueueOnSend() {
        return this._validateQueueOnSend;
    }

    public int getMessageCompressionThresholdSize() {
        return this._messageCompressionThresholdSize;
    }

    void doWithAllLocks(Runnable r) {
        this.doWithAllLocks(r, this._sessions.values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doWithAllLocks(Runnable r, List<AMQSession> sessions) {
        if (!sessions.isEmpty()) {
            AMQSession session = sessions.remove(0);
            Object dispatcherLock = session.getDispatcherLock();
            if (dispatcherLock == null) {
                dispatcherLock = new Object();
            }
            Object object = dispatcherLock;
            synchronized (object) {
                session.lockMessageDelivery();
                try {
                    this.doWithAllLocks(r, sessions);
                }
                finally {
                    session.unlockMessageDelivery();
                }
            }
        }
        Object object = this.getFailoverMutex();
        synchronized (object) {
            r.run();
        }
    }

    public String getTemporaryQueuePrefix() {
        if (this._delegate.isVirtualHostPropertiesSupported()) {
            String prefix = this.getVirtualHostProperty("virtualHost.temporaryQueuePrefix");
            return prefix == null ? "" : prefix;
        }
        return "";
    }

    String getVirtualHostProperty(String propertyName) {
        return this._virtualHostProperties.get(propertyName);
    }

    public void setConnectionSettings(ConnectionSettings connectionSettings) {
        this._connectionSettings = connectionSettings;
    }

    public ConnectionSettings getConnectionSettings() {
        return this._connectionSettings;
    }

    static {
        ClientProperties.ensureIsLoaded();
        CONN_NUMBER_GENERATOR = new AtomicLong();
        DEFAULT_CLOSE_TIMEOUT = Long.getLong("qpid.close_timeout", 2000L);
        if (_logger.isDebugEnabled()) {
            _logger.debug("Qpid version : " + CommonProperties.getVersionString());
        }
        CallbackHandlerRegistry registry = CallbackHandlerRegistry.getInstance();
        if (_logger.isDebugEnabled()) {
            _logger.debug("Loaded mechanisms " + registry.getMechanisms());
        }
    }
}

