/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client.protocol;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.protocol.HeartbeatConfig;
import org.apache.qpid.client.protocol.HeartbeatDiagnostics;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.Job;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AMQProtocolHandler
implements ProtocolEngine {
    private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
    private static final Logger _protocolLogger = LoggerFactory.getLogger((String)"qpid.protocol");
    private static final boolean PROTOCOL_DEBUG = System.getProperty("amqj.protocol.logging.level") != null;
    private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
    private AMQConnection _connection;
    private volatile AMQProtocolSession _protocolSession;
    private AMQStateManager _stateManager;
    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet();
    private FailoverHandler _failoverHandler;
    private FailoverState _failoverState = FailoverState.NOT_STARTED;
    private CountDownLatch _failoverLatch;
    private FailoverException _lastFailoverException;
    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 30000L);
    private Object _failoverLatchChange = new Object();
    private AMQCodecFactory _codecFactory;
    private Job _readJob;
    private Job _writeJob;
    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
    private NetworkDriver _networkDriver;
    private ProtocolVersion _suggestedProtocolVersion;
    private long _writtenBytes;
    private long _readBytes;
    private static int _messageReceivedCount;
    private static int _messagesOut;

    public AMQProtocolHandler(AMQConnection con) {
        this._connection = con;
        this._protocolSession = new AMQProtocolSession(this, this._connection);
        this._stateManager = new AMQStateManager(this._protocolSession);
        this._codecFactory = new AMQCodecFactory(false, (AMQVersionAwareProtocolSession)this._protocolSession);
        this._poolReference.setThreadFactory(new ThreadFactory(){

            public Thread newThread(Runnable runnable) {
                try {
                    return Threading.getThreadFactory().createThread(runnable);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to create thread", e);
                }
            }
        });
        this._readJob = new Job(this._poolReference, Job.MAX_JOB_EVENTS, true);
        this._writeJob = new Job(this._poolReference, Job.MAX_JOB_EVENTS, false);
        this._poolReference.acquireExecutorService();
        this._failoverHandler = new FailoverHandler(this);
    }

    public void createIoTransportSession(BrokerDetails brokerDetail) {
        this._protocolSession = new AMQProtocolSession(this, this._connection);
        this._stateManager.setProtocolSession(this._protocolSession);
        IoTransport.connect_0_9((AMQVersionAwareProtocolSession)this.getProtocolSession(), (String)brokerDetail.getHost(), (int)brokerDetail.getPort(), (boolean)brokerDetail.getBooleanProperty("ssl"));
        this._protocolSession.init();
    }

    public void closed() {
        if (this._connection.isClosed()) {
            _logger.debug("Session closed called by client");
        } else {
            _logger.debug("Session closed called with failover state currently " + this._failoverState);
            if (this._failoverState != FailoverState.IN_PROGRESS && this._connection.failoverAllowed()) {
                _logger.debug("FAILOVER STARTING");
                if (this._failoverState == FailoverState.NOT_STARTED) {
                    this._failoverState = FailoverState.IN_PROGRESS;
                    this.startFailoverThread();
                } else {
                    _logger.debug("Not starting failover as state currently " + this._failoverState);
                }
            } else {
                _logger.debug("Failover not allowed by policy.");
                if (_logger.isDebugEnabled()) {
                    _logger.debug(this._connection.getFailoverPolicy().toString());
                }
                if (this._failoverState != FailoverState.IN_PROGRESS) {
                    _logger.debug("sessionClose() not allowed to failover");
                    this._connection.exceptionReceived((Throwable)new AMQDisconnectedException("Server closed connection and reconnection not permitted.", (Throwable)this._stateManager.getLastException()));
                } else {
                    _logger.debug("sessionClose() failover in progress");
                }
            }
        }
        _logger.debug("Protocol Session [" + this + "] closed");
    }

    private void startFailoverThread() {
        if (!this._connection.isClosed()) {
            Thread failoverThread;
            try {
                failoverThread = Threading.getThreadFactory().createThread((Runnable)this._failoverHandler);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create thread", e);
            }
            failoverThread.setName("Failover");
            failoverThread.setDaemon(false);
            failoverThread.start();
        }
    }

    public void readerIdle() {
        _logger.debug("Protocol Session [" + this + "] idle: reader");
        HeartbeatDiagnostics.timeout();
        _logger.warn("Timed out while waiting for heartbeat from peer.");
        this._networkDriver.close();
    }

    public void writerIdle() {
        _logger.debug("Protocol Session [" + this + "] idle: reader");
        this.writeFrame((AMQDataBlock)HeartbeatBody.FRAME);
        HeartbeatDiagnostics.sent();
    }

    public void exception(Throwable cause) {
        if (this._failoverState == FailoverState.NOT_STARTED) {
            if (cause instanceof AMQConnectionClosedException || cause instanceof IOException) {
                _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
                this._networkDriver.close();
                this.closed();
            } else {
                if (cause instanceof ProtocolCodecException) {
                    _logger.info("Protocol Exception caught NOT going to attempt failover as cause isn't AMQConnectionClosedException: " + cause, cause);
                    AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
                    this.propagateExceptionToAllWaiters((Exception)((Object)amqe));
                }
                this._connection.exceptionReceived(cause);
            }
        } else if (this._failoverState == FailoverState.FAILED) {
            _logger.error("Exception caught by protocol handler: " + cause, cause);
            AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
            this.propagateExceptionToAllWaiters((Exception)((Object)amqe));
            this._connection.exceptionReceived(cause);
        }
    }

    public void propagateExceptionToAllWaiters(Exception e) {
        this.getStateManager().error(e);
        this.propagateExceptionToFrameListeners(e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void propagateExceptionToFrameListeners(Exception e) {
        CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
        synchronized (copyOnWriteArraySet) {
            if (!this._frameListeners.isEmpty()) {
                for (AMQMethodListener ml : this._frameListeners) {
                    ml.error(e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyFailoverStarting() {
        CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
        synchronized (copyOnWriteArraySet) {
            this._lastFailoverException = new FailoverException("Failing over about to start");
        }
        this.propagateExceptionToFrameListeners(this._lastFailoverException);
    }

    public void failoverInProgress() {
        this._lastFailoverException = null;
    }

    public void received(ByteBuffer msg) {
        try {
            this._readBytes += (long)msg.remaining();
            final ArrayList dataBlocks = this._codecFactory.getDecoder().decodeBuffer(msg);
            Job.fireAsynchEvent((ExecutorService)this._poolReference.getPool(), (Job)this._readJob, (Runnable)new Runnable(){

                public void run() {
                    for (AMQDataBlock message : dataBlocks) {
                        try {
                            if (PROTOCOL_DEBUG) {
                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
                            }
                            if (message instanceof AMQFrame) {
                                boolean debug = _logger.isDebugEnabled();
                                long msgNumber = ++_messageReceivedCount;
                                if (debug && msgNumber % 1000L == 0L) {
                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
                                }
                                AMQFrame frame = (AMQFrame)message;
                                AMQBody bodyFrame = frame.getBodyFrame();
                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
                                bodyFrame.handle(frame.getChannel(), (AMQVersionAwareProtocolSession)AMQProtocolHandler.this._protocolSession);
                                AMQProtocolHandler.this._connection.bytesReceived(AMQProtocolHandler.this._readBytes);
                                continue;
                            }
                            if (!(message instanceof ProtocolInitiation)) continue;
                            ProtocolInitiation protocolInit = (ProtocolInitiation)message;
                            AMQProtocolHandler.this._suggestedProtocolVersion = protocolInit.checkVersion();
                            _logger.info("Broker suggested using protocol version:" + AMQProtocolHandler.this._suggestedProtocolVersion);
                            AMQProtocolHandler.this._stateManager.changeState(AMQState.CONNECTION_CLOSED);
                        }
                        catch (Exception e) {
                            _logger.error("Exception processing frame", (Throwable)e);
                            AMQProtocolHandler.this.propagateExceptionToFrameListeners(e);
                            AMQProtocolHandler.this.exception(e);
                        }
                    }
                }
            });
        }
        catch (Exception e) {
            this.propagateExceptionToFrameListeners(e);
            this.exception(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void methodBodyReceived(int channelId, AMQBody bodyFrame) throws AMQException {
        if (_logger.isDebugEnabled()) {
            _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
        }
        AMQMethodEvent evt = new AMQMethodEvent(channelId, (AMQMethodBody)bodyFrame);
        try {
            boolean wasAnyoneInterested = this.getStateManager().methodReceived(evt);
            CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
            synchronized (copyOnWriteArraySet) {
                if (!this._frameListeners.isEmpty()) {
                    for (AMQMethodListener listener : this._frameListeners) {
                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                    }
                }
            }
            if (!wasAnyoneInterested) {
                throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + this._frameListeners, null);
            }
        }
        catch (AMQException e) {
            this.propagateExceptionToFrameListeners((Exception)((Object)e));
            this.exception(e);
        }
    }

    public StateWaiter createWaiter(Set<AMQState> states) throws AMQException {
        return this.getStateManager().createWaiter(states);
    }

    public void writeFrame(AMQDataBlock frame) {
        this.writeFrame(frame, false);
    }

    public void writeFrame(AMQDataBlock frame, boolean wait) {
        final ByteBuffer buf = frame.toNioByteBuffer();
        this._writtenBytes += (long)buf.remaining();
        Job.fireAsynchEvent((ExecutorService)this._poolReference.getPool(), (Job)this._writeJob, (Runnable)new Runnable(){

            public void run() {
                AMQProtocolHandler.this._networkDriver.send((Object)buf);
            }
        });
        if (PROTOCOL_DEBUG) {
            _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
        }
        long sentMessages = _messagesOut++;
        boolean debug = _logger.isDebugEnabled();
        if (debug && sentMessages % 1000L == 0L) {
            _logger.debug("Sent " + _messagesOut + " protocol messages");
        }
        this._connection.bytesSent(this._writtenBytes);
        if (wait) {
            this._networkDriver.flush();
        }
    }

    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener) throws AMQException, FailoverException {
        return this.writeCommandFrameAndWaitForReply(frame, listener, this.DEFAULT_SYNC_TIMEOUT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener, long timeout) throws AMQException, FailoverException {
        try {
            AMQMethodEvent aMQMethodEvent = this._frameListeners;
            synchronized (aMQMethodEvent) {
                Exception e;
                if (this._lastFailoverException != null) {
                    throw this._lastFailoverException;
                }
                if ((this._stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || this._stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) && (e = this._stateManager.getLastException()) != null) {
                    if (e instanceof AMQException) {
                        AMQException amqe = (AMQException)((Object)e);
                        throw amqe.cloneForCurrentThread();
                    }
                    throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), (Throwable)e);
                }
                this._frameListeners.add(listener);
            }
            this.writeFrame((AMQDataBlock)frame);
            aMQMethodEvent = listener.blockForFrame(timeout);
            return aMQMethodEvent;
        }
        finally {
            this._frameListeners.remove(listener);
        }
    }

    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException, FailoverException {
        return this.syncWrite(frame, responseClass, this.DEFAULT_SYNC_TIMEOUT);
    }

    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException {
        return this.writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
    }

    public void closeSession(AMQSession session) throws AMQException {
        this._protocolSession.closeSession(session);
    }

    public void closeConnection(long timeout) throws AMQException {
        ConnectionCloseBody body = this._protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), new AMQShortString("JMS client is closing the connection."), 0, 0);
        AMQFrame frame = body.generateFrame(0);
        if (!this.getStateManager().getCurrentState().equals((Object)AMQState.CONNECTION_CLOSED)) {
            try {
                this.syncWrite(frame, ConnectionCloseOkBody.class, timeout);
                this._networkDriver.close();
                this.closed();
            }
            catch (AMQTimeoutException e) {
                this.closed();
            }
            catch (FailoverException e) {
                _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
            }
        }
        this._poolReference.releaseExecutorService();
    }

    public long getReadBytes() {
        return this._readBytes;
    }

    public long getWrittenBytes() {
        return this._writtenBytes;
    }

    public void failover(String host, int port) {
        this._failoverHandler.setHost(host);
        this._failoverHandler.setPort(port);
        this.startFailoverThread();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void blockUntilNotFailingOver() throws InterruptedException {
        Object object = this._failoverLatchChange;
        synchronized (object) {
            if (this._failoverLatch == null || !this._failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) {
                // empty if block
            }
        }
    }

    public AMQShortString generateQueueName() {
        return this._protocolSession.generateQueueName();
    }

    public CountDownLatch getFailoverLatch() {
        return this._failoverLatch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setFailoverLatch(CountDownLatch failoverLatch) {
        Object object = this._failoverLatchChange;
        synchronized (object) {
            this._failoverLatch = failoverLatch;
        }
    }

    public AMQConnection getConnection() {
        return this._connection;
    }

    public AMQStateManager getStateManager() {
        return this._stateManager;
    }

    public void setStateManager(AMQStateManager stateManager) {
        this._stateManager = stateManager;
        this._stateManager.setProtocolSession(this._protocolSession);
    }

    public AMQProtocolSession getProtocolSession() {
        return this._protocolSession;
    }

    FailoverState getFailoverState() {
        return this._failoverState;
    }

    public void setFailoverState(FailoverState failoverState) {
        this._failoverState = failoverState;
    }

    public byte getProtocolMajorVersion() {
        return this._protocolSession.getProtocolMajorVersion();
    }

    public byte getProtocolMinorVersion() {
        return this._protocolSession.getProtocolMinorVersion();
    }

    public MethodRegistry getMethodRegistry() {
        return this._protocolSession.getMethodRegistry();
    }

    public ProtocolVersion getProtocolVersion() {
        return this._protocolSession.getProtocolVersion();
    }

    public SocketAddress getRemoteAddress() {
        return this._networkDriver.getRemoteAddress();
    }

    public SocketAddress getLocalAddress() {
        return this._networkDriver.getLocalAddress();
    }

    public void setNetworkDriver(NetworkDriver driver) {
        this._networkDriver = driver;
    }

    void initHeartbeats(int delay) {
        if (delay > 0) {
            this.getNetworkDriver().setMaxWriteIdle(delay);
            this.getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
        }
    }

    public NetworkDriver getNetworkDriver() {
        return this._networkDriver;
    }

    public ProtocolVersion getSuggestedProtocolVersion() {
        return this._suggestedProtocolVersion;
    }
}

