/*
 * Decompiled with CFR 0.152.
 */
package com.tc.object;

import com.tc.async.api.Stage;
import com.tc.async.api.StageManager;
import com.tc.bytes.TCByteBufferFactory;
import com.tc.entity.NetworkVoltronEntityMessage;
import com.tc.entity.ResendVoltronEntityMessage;
import com.tc.entity.VoltronEntityMessage;
import com.tc.entity.VoltronEntityMultiResponse;
import com.tc.entity.VoltronEntityResponse;
import com.tc.exception.EntityBusyException;
import com.tc.exception.EntityReferencedException;
import com.tc.exception.WrappedEntityException;
import com.tc.logging.ClientIDLogger;
import com.tc.net.ClientID;
import com.tc.net.NodeID;
import com.tc.net.ServerID;
import com.tc.net.protocol.tcm.ClientMessageChannel;
import com.tc.net.protocol.tcm.MessageChannel;
import com.tc.net.protocol.tcm.TCMessageType;
import com.tc.net.protocol.tcm.UnknownNameException;
import com.tc.object.ClientConfigurationContext;
import com.tc.object.ClientEntityManager;
import com.tc.object.ClientEntityStateManager;
import com.tc.object.ClientInstanceID;
import com.tc.object.EntityClientEndpointImpl;
import com.tc.object.EntityDescriptor;
import com.tc.object.EntityID;
import com.tc.object.FetchID;
import com.tc.object.InFlightMessage;
import com.tc.object.InFlightMonitor;
import com.tc.object.TransactionSource;
import com.tc.object.msg.ClientEntityReferenceContext;
import com.tc.object.msg.ClientHandshakeMessage;
import com.tc.object.session.SessionID;
import com.tc.object.tx.TransactionID;
import com.tc.text.MapListPrettyPrint;
import com.tc.text.PrettyPrintable;
import com.tc.util.Assert;
import com.tc.util.Throwables;
import com.tc.util.Util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.entity.EntityClientEndpoint;
import org.terracotta.entity.EntityMessage;
import org.terracotta.entity.EntityResponse;
import org.terracotta.entity.MessageCodec;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.exception.ConnectionClosedException;
import org.terracotta.exception.EntityException;
import org.terracotta.exception.EntityNotFoundException;
import org.terracotta.exception.EntityServerUncaughtException;

public class ClientEntityManagerImpl
implements ClientEntityManager {
    private final Logger logger;
    private final ClientMessageChannel channel;
    private final ConcurrentMap<TransactionID, InFlightMessage> inFlightMessages;
    private final MessagePendingCount requestTickets = new MessagePendingCount();
    private final TransactionSource transactionSource;
    private final ClientEntityStateManager stateManager;
    private final ConcurrentMap<ClientInstanceID, EntityClientEndpointImpl<?, ?>> objectStoreMap;
    private final StageManager stages;
    private final ExecutorService endpointCloser = Executors.newWorkStealingPool();
    private boolean wasBusy = false;
    private final LongAdder msgCount = new LongAdder();
    private final LongAdder inflights = new LongAdder();
    private final LongAdder addWindow = new LongAdder();

    public ClientEntityManagerImpl(ClientMessageChannel channel, StageManager mgr) {
        this.logger = new ClientIDLogger(channel::getClientID, LoggerFactory.getLogger(ClientEntityManager.class));
        this.channel = channel;
        this.inFlightMessages = new ConcurrentHashMap<TransactionID, InFlightMessage>();
        this.transactionSource = new TransactionSource();
        this.stateManager = new ClientEntityStateManager();
        this.objectStoreMap = new ConcurrentHashMap(10240, 0.75f, 128);
        this.stages = mgr;
    }

    public boolean checkBusy() {
        try {
            boolean bl = this.wasBusy;
            return bl;
        }
        finally {
            this.wasBusy = false;
        }
    }

    private synchronized boolean enqueueMessage(InFlightMessage msg) throws RejectedExecutionException {
        if (!this.stateManager.isRunning()) {
            return false;
        }
        if (!this.requestTickets.messagePendingSlotAvailable()) {
            throw new RejectedExecutionException("Output queue is full");
        }
        this.inFlightMessages.put(msg.getTransactionID(), msg);
        this.requestTickets.messagePending();
        return true;
    }

    private synchronized boolean enqueueMessage(InFlightMessage msg, long timeout, TimeUnit unit, boolean waitUntilRunning) throws TimeoutException {
        long end;
        boolean enqueued = true;
        boolean interrupted = false;
        long l = end = timeout > 0L ? System.nanoTime() + unit.toNanos(timeout) : 0L;
        while (waitUntilRunning && !this.stateManager.isRunning() || !this.requestTickets.messagePendingSlotAvailable()) {
            try {
                if (!this.stateManager.isShutdown()) {
                    long timing;
                    long l2 = timing = end > 0L ? end - System.nanoTime() : 0L;
                    if (timing < 0L) {
                        throw new TimeoutException();
                    }
                    this.wait(timing / TimeUnit.MILLISECONDS.toNanos(1L), (int)(timing % TimeUnit.MILLISECONDS.toNanos(1L)));
                    continue;
                }
                enqueued = false;
                break;
            }
            catch (InterruptedException ie) {
                interrupted = true;
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        if (enqueued) {
            this.inFlightMessages.put(msg.getTransactionID(), msg);
            this.requestTickets.messagePending();
        }
        return enqueued;
    }

    @Override
    public EntityClientEndpoint fetchEntity(EntityID entity, long version, ClientInstanceID instance, MessageCodec<? extends EntityMessage, ? extends EntityResponse> codec, Runnable closeHook) throws EntityException {
        return this.internalLookup(entity, version, instance, codec, closeHook);
    }

    @Override
    public void handleMessage(TransactionID tid, byte[] message) {
        InFlightMessage msg = (InFlightMessage)this.inFlightMessages.get(tid);
        if (msg != null) {
            msg.handleMessage(message);
        } else {
            this.logger.info("transaction " + tid + " not found. Ignoring message.");
        }
    }

    @Override
    public void handleStatistics(TransactionID tid, long[] message) {
        InFlightMessage msg = (InFlightMessage)this.inFlightMessages.get(tid);
        if (msg != null) {
            msg.addServerStatistics(message);
        } else {
            this.addWindow.add(message[0]);
            if (message[0] > TimeUnit.MILLISECONDS.toNanos(500L)) {
                this.logger.debug("add window " + message[0]);
            }
        }
    }

    @Override
    public void handleMessage(ClientInstanceID clientInstance, byte[] message) {
        EntityClientEndpoint endpoint = (EntityClientEndpoint)this.objectStoreMap.get(clientInstance);
        if (endpoint != null) {
            this.deliverInboundMessage(endpoint, message);
        } else {
            this.logger.info("Instance " + clientInstance + " not found. Ignoring message.");
        }
    }

    private void deliverInboundMessage(EntityClientEndpoint endpoint, byte[] msg) {
        EntityClientEndpointImpl endpointImpl = (EntityClientEndpointImpl)endpoint;
        try {
            endpointImpl.handleMessage(msg);
        }
        catch (MessageCodecException e) {
            Assert.fail(e.getLocalizedMessage());
        }
    }

    @Override
    public byte[] createEntity(EntityID entityID, long version, byte[] config) throws EntityException {
        boolean requiresReplication = true;
        return this.sendMessageWhileBusy(entityID, () -> this.createMessageWithoutClientInstance(entityID, version, requiresReplication, config, VoltronEntityMessage.Type.CREATE_ENTITY, this.lifecycleAcks()), this.lifecycleAcks(), "ClientEntityManagerImpl.createEntity");
    }

    @Override
    public byte[] reconfigureEntity(EntityID entityID, long version, byte[] config) throws EntityException {
        boolean requiresReplication = true;
        return this.sendMessageWhileBusy(entityID, () -> this.createMessageWithoutClientInstance(entityID, version, requiresReplication, config, VoltronEntityMessage.Type.RECONFIGURE_ENTITY, this.lifecycleAcks()), this.lifecycleAcks(), "ClientEntityManagerImpl.reconfigureEntity");
    }

    private Set<VoltronEntityMessage.Acks> lifecycleAcks() {
        return Collections.singleton(VoltronEntityMessage.Acks.RETIRED);
    }

    @Override
    public boolean destroyEntity(EntityID entityID, long version) throws EntityException {
        boolean requiresReplication = true;
        byte[] emtpyExtendedData = new byte[]{};
        try {
            this.sendMessageWhileBusy(entityID, () -> this.createMessageWithoutClientInstance(entityID, version, requiresReplication, emtpyExtendedData, VoltronEntityMessage.Type.DESTROY_ENTITY, this.lifecycleAcks()), this.lifecycleAcks(), "ClientEntityManagerImpl.destroyEntity");
        }
        catch (EntityReferencedException r) {
            return false;
        }
        return true;
    }

    @Override
    public InFlightMessage invokeActionWithTimeout(EntityID eid, EntityDescriptor entityDescriptor, Set<VoltronEntityMessage.Acks> acks, InFlightMonitor monitor, boolean requiresReplication, boolean shouldBlockGetOnRetire, long invokeTimeout, TimeUnit units, byte[] payload) throws InterruptedException, TimeoutException {
        long start = System.nanoTime();
        InFlightMessage inFlightMessage = this.queueInFlightMessage(eid, () -> this.createMessageWithDescriptor(eid, entityDescriptor, requiresReplication, payload, VoltronEntityMessage.Type.INVOKE_ACTION, this.makeServerAcks(shouldBlockGetOnRetire, acks)), acks, monitor, invokeTimeout, units, shouldBlockGetOnRetire);
        long timeLeft = units.toNanos(invokeTimeout) - (System.nanoTime() - start);
        if (invokeTimeout == 0L) {
            inFlightMessage.waitForAcks();
        } else if (timeLeft > 0L) {
            inFlightMessage.waitForAcks(timeLeft, TimeUnit.NANOSECONDS);
        } else {
            throw new TimeoutException(invokeTimeout + " " + (Object)((Object)units));
        }
        return inFlightMessage;
    }

    private Set<VoltronEntityMessage.Acks> makeServerAcks(boolean blockOnRetire, Set<VoltronEntityMessage.Acks> requestedAcks) {
        Set<VoltronEntityMessage.Acks> serverAcks = requestedAcks;
        if (blockOnRetire) {
            serverAcks = EnumSet.copyOf(requestedAcks);
            serverAcks.add(VoltronEntityMessage.Acks.RETIRED);
        }
        return serverAcks;
    }

    @Override
    public InFlightMessage invokeAction(EntityID eid, EntityDescriptor entityDescriptor, Set<VoltronEntityMessage.Acks> requestedAcks, InFlightMonitor monitor, boolean requiresReplication, boolean shouldBlockGetOnRetire, byte[] payload) {
        try {
            InFlightMessage inFlightMessage = this.queueInFlightMessage(eid, () -> this.createMessageWithDescriptor(eid, entityDescriptor, requiresReplication, payload, VoltronEntityMessage.Type.INVOKE_ACTION, this.makeServerAcks(shouldBlockGetOnRetire, requestedAcks)), requestedAcks, monitor, 0L, TimeUnit.MILLISECONDS, shouldBlockGetOnRetire);
            inFlightMessage.waitForAcks();
            return inFlightMessage;
        }
        catch (TimeoutException to) {
            throw new RuntimeException(to);
        }
    }

    @Override
    public void asyncInvokeAction(EntityID eid, EntityDescriptor entityDescriptor, Set<VoltronEntityMessage.Acks> requestedAcks, InFlightMonitor monitor, boolean requiresReplication, byte[] payload, long timeout, TimeUnit unit) throws RejectedExecutionException {
        if (unit == null) {
            this.asyncQueueInFlightMessage(eid, () -> this.createMessageWithDescriptor(eid, entityDescriptor, requiresReplication, payload, VoltronEntityMessage.Type.INVOKE_ACTION, requestedAcks), requestedAcks, monitor);
        } else {
            try {
                this.queueInFlightMessage(eid, () -> this.createMessageWithDescriptor(eid, entityDescriptor, requiresReplication, payload, VoltronEntityMessage.Type.INVOKE_ACTION, requestedAcks), requestedAcks, monitor, timeout, unit, false, true);
            }
            catch (TimeoutException te) {
                throw new RejectedExecutionException(te);
            }
        }
    }

    private void asyncQueueInFlightMessage(EntityID eid, Supplier<NetworkVoltronEntityMessage> message, Set<VoltronEntityMessage.Acks> requestedAcks, InFlightMonitor monitor) throws RejectedExecutionException {
        boolean queued;
        InFlightMessage inFlight = new InFlightMessage(eid, message, requestedAcks, monitor, false, true);
        try {
            this.msgCount.increment();
            this.inflights.add(ClientConfigurationContext.MAX_PENDING_REQUESTS - this.requestTickets.messagesPending);
            queued = this.enqueueMessage(inFlight);
        }
        catch (Throwable t) {
            this.transactionSource.retire(inFlight.getTransactionID());
            throw t;
        }
        if (queued) {
            inFlight.sent();
            if (!inFlight.send()) {
                this.logger.debug("message not sent.  Make sure resend happens : {}", (Object)inFlight);
            }
        } else {
            this.transactionSource.retire(inFlight.getTransactionID());
            this.throwClosedExceptionOnMessage(inFlight, "Connection closed before sending message");
        }
    }

    @Override
    public Map<String, ?> getStateMap() {
        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
        for (EntityClientEndpointImpl s : this.objectStoreMap.values()) {
            map.put(s.getEntityID().toString(), s.getStatistics().getStateMap());
        }
        map.put("messagesOut", this.msgCount.sum());
        if (this.msgCount.sum() > 0L) {
            map.put("averagePending", this.inflights.sum() / this.msgCount.sum());
            map.put("averageServerWindow", this.addWindow.sum() / this.msgCount.sum());
        }
        Object stats = this.channel.getAttachment("ChannelStats");
        LinkedHashMap<String, Object> sub = new LinkedHashMap<String, Object>();
        sub.put("connection", this.channel.getConnectionID());
        sub.put("local", this.channel.getLocalAddress());
        sub.put("remote", this.channel.getRemoteAddress());
        sub.put("product", (Object)this.channel.getProductID());
        sub.put("client", this.channel.getClientID());
        map.put("channel", sub);
        if (stats instanceof PrettyPrintable) {
            sub.put("stats", ((PrettyPrintable)stats).getStateMap());
        }
        return map;
    }

    @Override
    public void received(TransactionID id) {
        InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.get(id);
        if (inFlight != null) {
            inFlight.received();
        }
    }

    @Override
    public void complete(TransactionID id) {
        this.complete(id, null);
    }

    @Override
    public void complete(TransactionID id, byte[] value) {
        InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.get(id);
        if (inFlight != null) {
            inFlight.setResult(value, null);
        }
    }

    @Override
    public void failed(TransactionID id, Exception error) {
        InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.get(id);
        if (inFlight != null) {
            inFlight.setResult(null, error);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void retired(TransactionID id) {
        block6: {
            try {
                InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.remove(id);
                if (inFlight == null) break block6;
                inFlight.retired();
                ClientEntityManagerImpl clientEntityManagerImpl = this;
                synchronized (clientEntityManagerImpl) {
                    this.requestTickets.messageRetired();
                    this.notify();
                }
            }
            finally {
                this.transactionSource.retire(id);
            }
        }
    }

    @Override
    public synchronized void pause() {
        this.stateManager.pause();
    }

    @Override
    public synchronized void unpause() {
        this.stateManager.running();
        this.notifyAll();
    }

    @Override
    public synchronized void initializeHandshake(ClientHandshakeMessage handshakeMessage) {
        for (EntityClientEndpointImpl endpoint : this.objectStoreMap.values()) {
            EntityDescriptor descriptor = endpoint.getEntityDescriptor();
            EntityID entityID = endpoint.getEntityID();
            long entityVersion = endpoint.getVersion();
            byte[] extendedReconnectData = endpoint.getExtendedReconnectData();
            ClientEntityReferenceContext context = new ClientEntityReferenceContext(entityID, entityVersion, descriptor.getClientInstanceID(), extendedReconnectData);
            handshakeMessage.addReconnectReference(context);
        }
        Stage<VoltronEntityMultiResponse> responderMulti = this.stages.getStage("multi_request_ack_stage", VoltronEntityMultiResponse.class);
        if (!responderMulti.isEmpty()) {
            FlushResponse flush = new FlushResponse();
            responderMulti.getSink().addToSink(flush);
            flush.waitForAccess();
        }
        for (InFlightMessage inFlight : this.inFlightMessages.values()) {
            VoltronEntityMessage message = inFlight.getMessage();
            ResendVoltronEntityMessage packaged = new ResendVoltronEntityMessage(message.getSource(), message.getTransactionID(), message.getEntityDescriptor(), message.getVoltronType(), message.doesRequireReplication(), message.getExtendedData(), message.getOldestTransactionOnClient());
            handshakeMessage.addResendMessage(packaged);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        Iterator iterator = this;
        synchronized (iterator) {
            if (this.stateManager.isShutdown()) {
                return;
            }
            this.requestTickets.stop();
            this.stateManager.stop();
            this.notifyAll();
        }
        for (InFlightMessage msg : this.inFlightMessages.values()) {
            this.throwClosedExceptionOnMessage(msg, "Connection closed under in-flight message");
        }
        for (EntityClientEndpointImpl endpoint : this.objectStoreMap.values()) {
            try {
                endpoint.didCloseUnexpectedly();
            }
            catch (Throwable t) {
                this.logger.error("error in shutdown", t);
            }
        }
        this.endpointCloser.shutdownNow();
        if (this.logger.isDebugEnabled()) {
            MapListPrettyPrint print = new MapListPrettyPrint();
            this.prettyPrint(print);
            this.logger.debug(print.toString());
        }
        this.objectStoreMap.clear();
    }

    private void throwClosedExceptionOnMessage(InFlightMessage msg, String description) {
        msg.received();
        ConnectionClosedException closed = new ConnectionClosedException(msg.getEntityID().getClassName(), msg.getEntityID().getEntityName(), description, false, null);
        msg.setResult(null, closed);
        msg.retired();
    }

    private <M extends EntityMessage, R extends EntityResponse> EntityClientEndpoint<M, R> internalLookup(final EntityID entity, long version, ClientInstanceID instance, MessageCodec<M, R> codec, final Runnable closeHook) throws EntityException {
        Assert.assertNotNull("Can't lookup null entity descriptor", instance);
        final EntityDescriptor fetchDescriptor = EntityDescriptor.createDescriptorForFetch(entity, version, instance);
        EntityClientEndpointImpl<M, R> resolvedEndpoint = null;
        try {
            byte[] raw = this.internalRetrieve(fetchDescriptor);
            ByteBuffer br = ByteBuffer.wrap(raw);
            long fetchID = br.getLong();
            FetchID fetch = new FetchID(fetchID);
            byte[] config = new byte[br.remaining()];
            br.get(config);
            Assert.assertTrue(null != raw);
            Runnable compoundRunnable = new Runnable(){

                @Override
                public void run() {
                    try {
                        ClientEntityManagerImpl.this.internalRelease(entity, fetchDescriptor, closeHook);
                    }
                    catch (EntityException e) {
                        Util.printLogAndRethrowError(e, ClientEntityManagerImpl.this.logger);
                    }
                }
            };
            resolvedEndpoint = new EntityClientEndpointImpl<M, R>(entity, version, EntityDescriptor.createDescriptorForInvoke(fetch, instance), this, config, codec, compoundRunnable, this.endpointCloser);
            if (null != this.objectStoreMap.get(instance)) {
                throw Assert.failure("Attempt to add an object that already exists: Object of class " + resolvedEndpoint.getClass() + " [Identity Hashcode : 0x" + Integer.toHexString(System.identityHashCode(resolvedEndpoint)) + "] ");
            }
            this.objectStoreMap.put(instance, resolvedEndpoint);
        }
        catch (EntityNotFoundException notfound) {
            throw notfound;
        }
        catch (EntityException e) {
            this.internalRelease(entity, fetchDescriptor, null);
            throw e;
        }
        catch (Throwable t) {
            this.internalRelease(entity, fetchDescriptor, null);
            throw Throwables.propagate(t);
        }
        return resolvedEndpoint;
    }

    private void internalRelease(EntityID eid, EntityDescriptor entityDescriptor, Runnable closeHook) throws EntityException {
        EnumSet<VoltronEntityMessage.Acks> requestedAcks = EnumSet.of(VoltronEntityMessage.Acks.COMPLETED);
        boolean requiresReplication = true;
        byte[] payload = new byte[]{};
        this.sendMessageWhileBusy(eid, () -> this.createMessageWithDescriptor(eid, entityDescriptor, requiresReplication, payload, VoltronEntityMessage.Type.RELEASE_ENTITY, requestedAcks), requestedAcks, "ClientEntityManagerImpl.internalRelease");
        EntityClientEndpointImpl ref = (EntityClientEndpointImpl)this.objectStoreMap.remove(entityDescriptor.getClientInstanceID());
        if (ref != null && this.logger.isDebugEnabled()) {
            MapListPrettyPrint print = new MapListPrettyPrint();
            ref.getStatistics().prettyPrint(print);
            this.logger.debug("Releasing " + ref.getEntityID() + "=" + print.toString());
        }
        if (closeHook != null) {
            closeHook.run();
        }
    }

    /*
     * Loose catch block
     */
    private byte[] sendMessageWhileBusy(EntityID eid, Supplier<NetworkVoltronEntityMessage> msg, Set<VoltronEntityMessage.Acks> requestedAcks, String traceComponentName) throws EntityException {
        while (true) {
            try {
                InFlightMessage inflight = this.queueInFlightMessage(eid, msg, requestedAcks, null, 0L, TimeUnit.MILLISECONDS, false);
                inflight.waitForAcks();
                return inflight.get();
            }
            catch (EntityBusyException busy) {
                this.wasBusy = true;
                this.logger.info("Cluster is busy. Requested operation will be retried in 2 seconds");
                try {
                    TimeUnit.SECONDS.sleep(2L);
                }
                catch (InterruptedException in) {
                    throw new WrappedEntityException(new EntityServerUncaughtException(eid.getClassName(), eid.getEntityName(), "", in));
                }
            }
        }
        catch (InterruptedException | TimeoutException ie) {
            throw new WrappedEntityException(new EntityServerUncaughtException(eid.getClassName(), eid.getEntityName(), "", ie));
        }
    }

    private byte[] internalRetrieve(EntityDescriptor entityDescriptor) throws EntityException {
        EnumSet<VoltronEntityMessage.Acks> requestedAcks = EnumSet.of(VoltronEntityMessage.Acks.COMPLETED);
        boolean requiresReplication = true;
        byte[] payload = new byte[]{};
        return this.sendMessageWhileBusy(entityDescriptor.getEntityID(), () -> this.createMessageWithDescriptor(entityDescriptor.getEntityID(), entityDescriptor, requiresReplication, payload, VoltronEntityMessage.Type.FETCH_ENTITY, requestedAcks), requestedAcks, "ClientEntityManagerImpl.internalRetrieve");
    }

    private InFlightMessage queueInFlightMessage(EntityID eid, Supplier<NetworkVoltronEntityMessage> message, Set<VoltronEntityMessage.Acks> requestedAcks, InFlightMonitor monitor, long timeout, TimeUnit units, boolean shouldBlockGetOnRetire) throws TimeoutException {
        return this.queueInFlightMessage(eid, message, requestedAcks, monitor, timeout, units, shouldBlockGetOnRetire, false);
    }

    private InFlightMessage queueInFlightMessage(EntityID eid, Supplier<NetworkVoltronEntityMessage> message, Set<VoltronEntityMessage.Acks> requestedAcks, InFlightMonitor monitor, long timeout, TimeUnit units, boolean shouldBlockGetOnRetire, boolean asyncMode) throws TimeoutException {
        boolean queued;
        InFlightMessage inFlight = new InFlightMessage(eid, message, requestedAcks, monitor, shouldBlockGetOnRetire, asyncMode);
        try {
            this.msgCount.increment();
            this.inflights.add(ClientConfigurationContext.MAX_PENDING_REQUESTS - this.requestTickets.messagesPending);
            queued = this.enqueueMessage(inFlight, timeout, units, inFlight.getMessage().getVoltronType() != VoltronEntityMessage.Type.INVOKE_ACTION);
        }
        catch (Throwable t) {
            this.transactionSource.retire(inFlight.getTransactionID());
            throw t;
        }
        if (queued) {
            inFlight.sent();
            if (inFlight.send()) {
                if (inFlight.getMessage().getVoltronType() != VoltronEntityMessage.Type.INVOKE_ACTION) {
                    inFlight.waitForAcks();
                }
            } else {
                this.logger.debug("message not sent.  Make sure resend happens " + inFlight);
            }
        } else {
            this.transactionSource.retire(inFlight.getTransactionID());
            this.throwClosedExceptionOnMessage(inFlight, "Connection closed before sending message");
        }
        return inFlight;
    }

    private NetworkVoltronEntityMessage createMessageWithoutClientInstance(EntityID entityID, long version, boolean requiresReplication, byte[] config, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) {
        EntityDescriptor entityDescriptor = EntityDescriptor.createDescriptorForLifecycle(entityID, version);
        return this.createMessageWithDescriptor(entityID, entityDescriptor, requiresReplication, config, type, acks);
    }

    private NetworkVoltronEntityMessage createMessageWithDescriptor(EntityID entityID, EntityDescriptor entityDescriptor, boolean requiresReplication, byte[] config, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) {
        TransactionID oldestTransactionInFlight;
        NetworkVoltronEntityMessage message = (NetworkVoltronEntityMessage)this.channel.createMessage(TCMessageType.VOLTRON_ENTITY_MESSAGE);
        ClientID clientID = this.channel.getClientID();
        TransactionID transactionID = this.transactionSource.create();
        TransactionID oldestTransactionPending = this.transactionSource.oldest();
        assert ((oldestTransactionInFlight = this.oldestTransactionIn(this.inFlightMessages.keySet())) == null || oldestTransactionPending.compareTo(oldestTransactionInFlight) <= 0);
        message.setContents(clientID, transactionID, entityID, entityDescriptor, type, requiresReplication, TCByteBufferFactory.wrap(config), oldestTransactionPending, acks);
        return message;
    }

    private TransactionID oldestTransactionIn(Collection<TransactionID> transactionIds) {
        Iterator<TransactionID> it = transactionIds.iterator();
        if (it.hasNext()) {
            TransactionID oldest = it.next();
            for (TransactionID txnId : transactionIds) {
                if (oldest.compareTo(txnId) <= 0) continue;
                oldest = txnId;
            }
            return oldest;
        }
        return null;
    }

    private static class MessagePendingCount {
        private int messagesPending = ClientConfigurationContext.MAX_PENDING_REQUESTS;

        private MessagePendingCount() {
        }

        private int messagePending() {
            Assert.assertTrue(this.messagesPending > 0);
            return --this.messagesPending;
        }

        private int messageRetired() {
            Assert.assertTrue(this.messagesPending < ClientConfigurationContext.MAX_PENDING_REQUESTS);
            return ++this.messagesPending;
        }

        private boolean messagePendingSlotAvailable() {
            return this.messagesPending > 0;
        }

        private void stop() {
            this.messagesPending = 0;
        }
    }

    private static class FlushResponse
    implements VoltronEntityResponse,
    VoltronEntityMultiResponse {
        private boolean accessed = false;

        private FlushResponse() {
        }

        @Override
        public synchronized TransactionID getTransactionID() {
            this.notifyAll();
            this.accessed = true;
            return TransactionID.NULL_ID;
        }

        @Override
        public VoltronEntityMessage.Acks getAckType() {
            return VoltronEntityMessage.Acks.RECEIVED;
        }

        @Override
        public TCMessageType getMessageType() {
            return TCMessageType.VOLTRON_ENTITY_RECEIVED_RESPONSE;
        }

        @Override
        public void hydrate() throws IOException, UnknownNameException {
        }

        @Override
        public void dehydrate() {
        }

        @Override
        public synchronized int replay(VoltronEntityMultiResponse.ReplayReceiver receiver) {
            this.notifyAll();
            this.accessed = true;
            return 0;
        }

        @Override
        public boolean send() {
            return true;
        }

        @Override
        public MessageChannel getChannel() {
            return null;
        }

        @Override
        public NodeID getSourceNodeID() {
            return ServerID.NULL_ID;
        }

        @Override
        public NodeID getDestinationNodeID() {
            return ClientID.NULL_ID;
        }

        @Override
        public SessionID getLocalSessionID() {
            return SessionID.NULL_ID;
        }

        @Override
        public int getTotalLength() {
            return 0;
        }

        @Override
        public boolean addReceived(TransactionID tid) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addRetired(TransactionID tid) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addResult(TransactionID tid, byte[] result) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addResultAndRetire(TransactionID tid, byte[] result) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addServerMessage(TransactionID cid, byte[] message) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addServerMessage(ClientInstanceID cid, byte[] message) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addStats(TransactionID cid, long[] timings) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public void stopAdding() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean startAdding() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public synchronized void waitForAccess() {
            boolean interrupted = false;
            while (!this.accessed) {
                try {
                    this.wait();
                }
                catch (InterruptedException ie) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

