/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.io.DataOutput;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.FailoverHandler;
import org.apache.qpid.client.HeartbeatListener;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ClientMethodProcessor;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
import org.apache.qpid.util.BytesDataOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQProtocolHandler
implements ExceptionHandlingByteBufferReceiver,
TransportActivity {
    private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
    private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
    private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
    private final AMQConnection _connection;
    private final AMQProtocolSession _protocolSession;
    private AMQStateManager _stateManager;
    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet();
    private final FailoverHandler _failoverHandler;
    private FailoverState _failoverState = FailoverState.NOT_STARTED;
    private CountDownLatch _failoverLatch;
    private FailoverException _lastFailoverException;
    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("qpid.sync_op_timeout", Long.getLong("amqj.default_syncwrite_timeout", 60000L));
    private Object _failoverLatchChange = new Object();
    private ClientDecoder _decoder;
    private ProtocolVersion _suggestedProtocolVersion;
    private long _writtenBytes;
    private long _readBytes;
    private int _messageReceivedCount;
    private int _messagesOut;
    private NetworkConnection _network;
    private ByteBufferSender _sender;
    private long _lastReadTime = System.currentTimeMillis();
    private long _lastWriteTime = System.currentTimeMillis();
    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
    private Throwable _initialConnectionException;
    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 66560;
    private final byte[] _reusableBytes = new byte[66560];
    private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(this._reusableBytes);
    private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(this._reusableBytes);
    private int _queueId = 1;
    private final Object _queueIdLock = new Object();

    public AMQProtocolHandler(AMQConnection con) {
        this._connection = con;
        this._protocolSession = new AMQProtocolSession(this, this._connection);
        this._stateManager = new AMQStateManager(this._protocolSession);
        this._failoverHandler = new FailoverHandler(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void closed() {
        if (this._connection.isClosed()) {
            _logger.debug("Session closed called by client");
        } else {
            boolean failoverNotAllowed = false;
            boolean failedWithoutConnecting = false;
            Throwable initialConnectionException = null;
            AMQProtocolHandler aMQProtocolHandler = this;
            synchronized (aMQProtocolHandler) {
                if (_logger.isDebugEnabled()) {
                    _logger.debug("Session closed called with failover state " + this._failoverState);
                }
                if (this._failoverState == FailoverState.NOT_STARTED) {
                    try {
                        this._sender.close();
                    }
                    catch (Exception e) {
                        _logger.warn("Exception occurred on closing the sender", (Throwable)e);
                    }
                    if (this._connection.failoverAllowed()) {
                        this._failoverState = FailoverState.IN_PROGRESS;
                        _logger.debug("FAILOVER STARTING");
                        this.startFailoverThread();
                    } else if (this._connection.isConnected()) {
                        failoverNotAllowed = true;
                        if (_logger.isDebugEnabled()) {
                            _logger.debug("Failover not allowed by policy:" + this._connection.getFailoverPolicy());
                        }
                    } else {
                        failedWithoutConnecting = true;
                        initialConnectionException = this._initialConnectionException;
                        _logger.debug("We are in process of establishing the initial connection");
                    }
                    this._initialConnectionException = null;
                } else {
                    _logger.debug("Not starting the failover thread as state currently " + this._failoverState);
                }
            }
            if (failoverNotAllowed) {
                this._connection.closed((Throwable)new AMQDisconnectedException("Server closed connection and reconnection not permitted.", (Throwable)this._stateManager.getLastException()));
            } else if (failedWithoutConnecting) {
                if (initialConnectionException == null) {
                    initialConnectionException = this._stateManager.getLastException();
                }
                String message = initialConnectionException == null ? "" : initialConnectionException.getMessage();
                this._connection.exceptionReceived(new QpidException("Connection could not be established: " + message, initialConnectionException));
            }
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("Protocol Session [" + this + "] closed");
        }
    }

    private void startFailoverThread() {
        if (!this._connection.isClosed()) {
            Thread failoverThread;
            try {
                failoverThread = Threading.getThreadFactory().createThread(new Runnable(){

                    @Override
                    public void run() {
                        if (Thread.currentThread().isDaemon()) {
                            throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
                        }
                        AMQProtocolHandler.this.setFailoverLatch(new CountDownLatch(1));
                        AMQProtocolHandler.this.notifyFailoverStarting();
                        AMQProtocolHandler.this.getConnection().doWithAllLocks(AMQProtocolHandler.this._failoverHandler);
                        AMQProtocolHandler.this.getFailoverLatch().countDown();
                    }
                });
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create thread", e);
            }
            failoverThread.setName("Failover");
            failoverThread.setDaemon(false);
            failoverThread.start();
        }
    }

    public void readerIdle() {
        _logger.debug("Protocol Session [" + this + "] idle: reader");
        _logger.warn("Timed out while waiting for heartbeat from peer.");
        this._network.close();
    }

    public void writerIdle() {
        _logger.debug("Protocol Session [" + this + "] idle: writer");
        this.writeFrame((AMQDataBlock)HeartbeatBody.FRAME);
        this._heartbeatListener.heartbeatSent();
    }

    public void exception(Throwable cause) {
        FailoverState state;
        boolean causeIsAConnectionProblem;
        boolean bl = causeIsAConnectionProblem = cause instanceof AMQConnectionClosedException || cause instanceof IOException || cause instanceof TransportException;
        if (causeIsAConnectionProblem) {
            try {
                this._network.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if ((state = this.getFailoverState()) == FailoverState.NOT_STARTED) {
            if (causeIsAConnectionProblem) {
                _logger.info("Connection exception caught therefore going to attempt failover: " + cause, cause);
                this._initialConnectionException = cause;
            } else {
                this._connection.exceptionReceived(cause);
            }
        } else if (state == FailoverState.FAILED) {
            _logger.error("Exception caught by protocol handler: " + cause, cause);
            AMQDisconnectedException amqe = new AMQDisconnectedException("Failover could not re-establish connectivity: " + cause, cause);
            this.propagateExceptionToAllWaiters((Exception)amqe);
            this._connection.closed((Throwable)amqe);
        } else {
            _logger.warn("Exception caught by protocol handler: " + cause, cause);
        }
    }

    public void propagateExceptionToAllWaiters(Exception e) {
        this.getStateManager().error(e);
        this.propagateExceptionToFrameListeners(e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void propagateExceptionToFrameListeners(Exception e) {
        CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
        synchronized (copyOnWriteArraySet) {
            if (!this._frameListeners.isEmpty()) {
                for (AMQMethodListener ml : this._frameListeners) {
                    ml.error(e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyFailoverStarting() {
        CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
        synchronized (copyOnWriteArraySet) {
            this._lastFailoverException = new FailoverException("Failing over about to start");
        }
        this.propagateExceptionToFrameListeners(this._lastFailoverException);
    }

    public void failoverInProgress() {
        this._lastFailoverException = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void received(ByteBuffer msg) {
        this._readBytes += (long)msg.remaining();
        this._lastReadTime = System.currentTimeMillis();
        List dataBlocks = this._protocolSession.getMethodProcessor().getProcessedMethods();
        try {
            this._decoder.decodeBuffer(msg);
            int size = dataBlocks.size();
            for (int i = 0; i < size; ++i) {
                AMQDataBlock message = (AMQDataBlock)dataBlocks.get(i);
                _logger.debug("RECV: {}", (Object)message);
                if (message instanceof AMQFrame) {
                    long msgNumber;
                    if ((msgNumber = (long)(++this._messageReceivedCount)) % 1000L == 0L && _logger.isDebugEnabled()) {
                        _logger.debug("Received {} protocol messages", (Object)this._messageReceivedCount);
                    }
                    AMQFrame frame = (AMQFrame)message;
                    AMQBody bodyFrame = frame.getBodyFrame();
                    bodyFrame.handle(frame.getChannel(), (AMQVersionAwareProtocolSession)this._protocolSession);
                    this._connection.bytesReceived(this._readBytes);
                    continue;
                }
                if (!(message instanceof ProtocolInitiation)) continue;
                try {
                    ProtocolInitiation protocolInit = (ProtocolInitiation)message;
                    this._suggestedProtocolVersion = protocolInit.checkVersion();
                    _logger.debug("Broker suggested using protocol version: {} ", (Object)this._suggestedProtocolVersion);
                    this._stateManager.changeState(AMQState.CONNECTION_CLOSED);
                    continue;
                }
                catch (AMQProtocolHeaderException e) {
                    this._stateManager.error((Exception)((Object)e));
                    throw e;
                }
            }
        }
        catch (Exception e) {
            _logger.error("Exception processing frame", (Throwable)e);
            this.propagateExceptionToFrameListeners(e);
            this.exception(e);
        }
        finally {
            dataBlocks.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void methodBodyReceived(int channelId, AMQBody bodyFrame) throws QpidException {
        AMQMethodEvent evt = new AMQMethodEvent(channelId, (AMQMethodBody)bodyFrame);
        try {
            boolean wasAnyoneInterested = this.getStateManager().methodReceived(evt);
            CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
            synchronized (copyOnWriteArraySet) {
                if (!this._frameListeners.isEmpty()) {
                    for (AMQMethodListener listener : this._frameListeners) {
                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                    }
                }
            }
            if (!wasAnyoneInterested) {
                throw new QpidException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + this._frameListeners, null);
            }
        }
        catch (QpidException e) {
            this.propagateExceptionToFrameListeners((Exception)((Object)e));
            this.exception(e);
        }
    }

    public StateWaiter createWaiter(Set<AMQState> states) throws QpidException {
        return this.getStateManager().createWaiter(states);
    }

    public void writeFrame(AMQDataBlock frame) {
        this.writeFrame(frame, true);
    }

    public synchronized void writeFrame(AMQDataBlock frame, boolean flush) {
        ByteBuffer buf = this.asByteBuffer(frame);
        this._lastWriteTime = System.currentTimeMillis();
        this._writtenBytes += (long)buf.remaining();
        this._sender.send(QpidByteBuffer.wrap((ByteBuffer)buf));
        if (flush) {
            this._sender.flush();
        }
        _logger.debug("SEND: {}", (Object)frame);
        long sentMessages = this._messagesOut++;
        boolean debug = _logger.isDebugEnabled();
        if (debug && sentMessages % 1000L == 0L) {
            _logger.debug("Sent {} protocol messages", (Object)this._messagesOut);
        }
        this._connection.bytesSent(this._writtenBytes);
    }

    private ByteBuffer asByteBuffer(AMQDataBlock block) {
        ByteBuffer buf;
        int size = (int)block.getSize();
        byte[] data = size > 66560 ? new byte[size] : this._reusableBytes;
        this._reusableDataOutput.setBuffer(data);
        try {
            block.writePayload((DataOutput)this._reusableDataOutput);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (size < 66560) {
            buf = this._reusableByteBuffer;
            buf.position(0);
        } else {
            buf = ByteBuffer.wrap(data);
        }
        buf.limit(this._reusableDataOutput.length());
        return buf;
    }

    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, BlockingMethodFrameListener listener) throws QpidException, FailoverException {
        return this.writeCommandFrameAndWaitForReply(frame, listener, this.DEFAULT_SYNC_TIMEOUT);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock frame, BlockingMethodFrameListener listener, long timeout) throws QpidException, FailoverException {
        try {
            CopyOnWriteArraySet<AMQMethodListener> copyOnWriteArraySet = this._frameListeners;
            synchronized (copyOnWriteArraySet) {
                Exception e;
                if (this._lastFailoverException != null) {
                    throw this._lastFailoverException;
                }
                if ((this._stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || this._stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) && (e = this._stateManager.getLastException()) != null) {
                    if (e instanceof QpidException) {
                        QpidException amqe = (QpidException)((Object)e);
                        throw amqe.cloneForCurrentThread();
                    }
                    throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), (Throwable)e);
                }
                this._frameListeners.add(listener);
            }
            this.writeFrame(frame);
            long actualTimeout = timeout == -1L ? this.DEFAULT_SYNC_TIMEOUT : timeout;
            AMQMethodEvent aMQMethodEvent = listener.blockForFrame(actualTimeout);
            return aMQMethodEvent;
        }
        finally {
            this._frameListeners.remove(listener);
        }
    }

    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws QpidException, FailoverException {
        return this.syncWrite(frame, responseClass, this.DEFAULT_SYNC_TIMEOUT);
    }

    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws QpidException, FailoverException {
        return this.writeCommandFrameAndWaitForReply((AMQDataBlock)frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
    }

    public void closeSession(AMQSession session) throws QpidException {
        this._protocolSession.closeSession(session);
    }

    public void closeConnection(long timeout) throws QpidException {
        if (this.getStateManager().getCurrentState().equals((Object)AMQState.CONNECTION_OPEN)) {
            try {
                ConnectionCloseBody body = this._protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), new AMQShortString("JMS client is closing the connection."), 0, 0);
                AMQFrame frame = body.generateFrame(0);
                this.syncWrite(frame, ConnectionCloseOkBody.class, timeout);
                this._network.close();
                this.closed();
            }
            catch (AMQTimeoutException e) {
                this.closed();
            }
            catch (FailoverException e) {
                _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
            }
        }
    }

    public long getReadBytes() {
        return this._readBytes;
    }

    public long getWrittenBytes() {
        return this._writtenBytes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void blockUntilNotFailingOver() throws InterruptedException {
        Object object = this._failoverLatchChange;
        synchronized (object) {
            if (this._failoverLatch == null || !this._failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) {
                // empty if block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String generateQueueName() {
        int id;
        Object object = this._queueIdLock;
        synchronized (object) {
            id = this._queueId++;
        }
        String localAddress = this.getLocalAddress().toString().replaceAll("[./:;]", "_");
        String queueName = "tmp_" + localAddress + "_" + id;
        return queueName.replaceAll("_+", "_");
    }

    public CountDownLatch getFailoverLatch() {
        return this._failoverLatch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setFailoverLatch(CountDownLatch failoverLatch) {
        Object object = this._failoverLatchChange;
        synchronized (object) {
            this._failoverLatch = failoverLatch;
        }
    }

    public AMQConnection getConnection() {
        return this._connection;
    }

    public AMQStateManager getStateManager() {
        return this._stateManager;
    }

    public void setStateManager(AMQStateManager stateManager) {
        this._stateManager = stateManager;
        this._stateManager.setProtocolSession(this._protocolSession);
    }

    public AMQProtocolSession getProtocolSession() {
        return this._protocolSession;
    }

    synchronized FailoverState getFailoverState() {
        return this._failoverState;
    }

    public synchronized void setFailoverState(FailoverState failoverState) {
        this._failoverState = failoverState;
    }

    public MethodRegistry getMethodRegistry() {
        return this._protocolSession.getMethodRegistry();
    }

    public ProtocolVersion getProtocolVersion() {
        return this._protocolSession.getProtocolVersion();
    }

    public SocketAddress getRemoteAddress() {
        return this._network.getRemoteAddress();
    }

    public SocketAddress getLocalAddress() {
        return this._network.getLocalAddress();
    }

    public void setNetworkConnection(NetworkConnection network) {
        this.setNetworkConnection(network, network.getSender());
    }

    public void setNetworkConnection(NetworkConnection network, ByteBufferSender sender) {
        this._network = network;
        this._sender = sender;
        this._protocolSession.setSender(sender);
    }

    public long getLastReadTime() {
        return this._lastReadTime;
    }

    public long getLastWriteTime() {
        return this._lastWriteTime;
    }

    protected ByteBufferSender getSender() {
        return this._sender;
    }

    public NetworkConnection getNetworkConnection() {
        return this._network;
    }

    public ProtocolVersion getSuggestedProtocolVersion() {
        return this._suggestedProtocolVersion;
    }

    public void setHeartbeatListener(HeartbeatListener listener) {
        this._heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
    }

    public void heartbeatBodyReceived() {
        this._heartbeatListener.heartbeatReceived();
    }

    public void setMaxFrameSize(long frameMax) {
        this._decoder.setMaxFrameSize(frameMax == 0L || frameMax > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)frameMax);
    }

    public void init(ConnectionSettings settings) {
        this._decoder = new ClientDecoder((ClientMethodProcessor)this._protocolSession.getMethodProcessor());
        this._protocolSession.init(settings);
    }

    public long getDefaultTimeout() {
        return this.DEFAULT_SYNC_TIMEOUT;
    }
}

