/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.IndexScanCommand;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.RangeSliceReply;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.SnapshotCommand;
import org.apache.cassandra.db.TruncateResponse;
import org.apache.cassandra.db.Truncation;
import org.apache.cassandra.db.WriteResponse;
import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.gms.GossipDigestAck;
import org.apache.cassandra.gms.GossipDigestAck2;
import org.apache.cassandra.gms.GossipDigestSyn;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencySubscriber;
import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.metrics.DroppedMessageMetrics;
import org.apache.cassandra.net.AsyncResult;
import org.apache.cassandra.net.CallbackInfo;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.IMessageCallback;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.IncomingTcpConnection;
import org.apache.cassandra.net.MessageDeliveryTask;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.AntiEntropyService;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.PBSPredictor;
import org.apache.cassandra.service.ReadCallback;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
import org.apache.cassandra.streaming.StreamReply;
import org.apache.cassandra.streaming.StreamRequest;
import org.apache.cassandra.streaming.StreamingRepairTask;
import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.utils.StatusLogger;
import org.apache.cassandra.utils.UUIDSerializer;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessagingService
implements MessagingServiceMBean {
    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
    public static final int VERSION_10 = 3;
    public static final int VERSION_11 = 4;
    public static final int VERSION_117 = 5;
    public static final int VERSION_12 = 6;
    public static final int current_version = 6;
    static final int PROTOCOL_MAGIC = -900387334;
    public static final Verb[] VERBS = Verb.values();
    public static final EnumMap<Verb, Stage> verbStages = new EnumMap<Verb, Stage>(Verb.class){
        {
            this.put(Verb.MUTATION, Stage.MUTATION);
            this.put(Verb.BINARY, Stage.MUTATION);
            this.put(Verb.READ_REPAIR, Stage.MUTATION);
            this.put(Verb.TRUNCATE, Stage.MUTATION);
            this.put(Verb.READ, Stage.READ);
            this.put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
            this.put(Verb.STREAM_REPLY, Stage.MISC);
            this.put(Verb.STREAM_REQUEST, Stage.MISC);
            this.put(Verb.RANGE_SLICE, Stage.READ);
            this.put(Verb.BOOTSTRAP_TOKEN, Stage.MISC);
            this.put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
            this.put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
            this.put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
            this.put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
            this.put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
            this.put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
            this.put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
            this.put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP);
            this.put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION);
            this.put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
            this.put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
            this.put(Verb.INDEX_SCAN, Stage.READ);
            this.put(Verb.REPLICATION_FINISHED, Stage.MISC);
            this.put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
            this.put(Verb.COUNTER_MUTATION, Stage.MUTATION);
            this.put(Verb.SNAPSHOT, Stage.MISC);
            this.put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
            this.put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
            this.put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
        }
    };
    public static final EnumMap<Verb, IVersionedSerializer<?>> verbSerializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class){
        {
            this.put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance);
            this.put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
            this.put(Verb.MUTATION, RowMutation.serializer);
            this.put(Verb.READ_REPAIR, RowMutation.serializer);
            this.put(Verb.READ, ReadCommand.serializer);
            this.put(Verb.STREAM_REPLY, StreamReply.serializer);
            this.put(Verb.STREAM_REQUEST, StreamRequest.serializer);
            this.put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
            this.put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
            this.put(Verb.TREE_REQUEST, AntiEntropyService.TreeRequest.serializer);
            this.put(Verb.TREE_RESPONSE, AntiEntropyService.Validator.serializer);
            this.put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer);
            this.put(Verb.STREAMING_REPAIR_RESPONSE, UUIDSerializer.serializer);
            this.put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
            this.put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2.serializer);
            this.put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSyn.serializer);
            this.put(Verb.DEFINITIONS_UPDATE, MigrationManager.MigrationsSerializer.instance);
            this.put(Verb.TRUNCATE, Truncation.serializer);
            this.put(Verb.INDEX_SCAN, IndexScanCommand.serializer);
            this.put(Verb.REPLICATION_FINISHED, null);
            this.put(Verb.COUNTER_MUTATION, CounterMutation.serializer);
            this.put(Verb.SNAPSHOT, SnapshotCommand.serializer);
        }
    };
    public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class){
        {
            this.put(Verb.MUTATION, WriteResponse.serializer);
            this.put(Verb.READ_REPAIR, WriteResponse.serializer);
            this.put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
            this.put(Verb.RANGE_SLICE, RangeSliceReply.serializer);
            this.put(Verb.READ, ReadResponse.serializer);
            this.put(Verb.TRUNCATE, TruncateResponse.serializer);
            this.put(Verb.SNAPSHOT, null);
            this.put(Verb.MIGRATION_REQUEST, MigrationManager.MigrationsSerializer.instance);
            this.put(Verb.SCHEMA_CHECK, UUIDSerializer.serializer);
            this.put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
            this.put(Verb.REPLICATION_FINISHED, null);
        }
    };
    private final ExpiringMap<String, CallbackInfo> callbacks;
    private final Map<Verb, IVerbHandler> verbHandlers;
    private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new NonBlockingHashMap();
    private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap();
    private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
    private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
    private final List<SocketThread> socketThreads = Lists.newArrayList();
    private final SimpleCondition listenGate;
    public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb.BINARY, new Verb[]{Verb._TRACE, Verb.MUTATION, Verb.READ_REPAIR, Verb.READ, Verb.RANGE_SLICE, Verb.REQUEST_RESPONSE});
    private final Map<Verb, DroppedMessageMetrics> droppedMessages = new EnumMap<Verb, DroppedMessageMetrics>(Verb.class);
    private final Map<Verb, Integer> lastDroppedInternal = new EnumMap<Verb, Integer>(Verb.class);
    private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
    private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap();
    private static final AtomicInteger idGen = new AtomicInteger(0);

    public static MessagingService instance() {
        return MSHandle.instance;
    }

    private MessagingService() {
        for (Verb verb : DROPPABLE_VERBS) {
            this.droppedMessages.put(verb, new DroppedMessageMetrics(verb));
            this.lastDroppedInternal.put(verb, 0);
        }
        this.listenGate = new SimpleCondition();
        this.verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
        Runnable logDropped = new Runnable(){

            @Override
            public void run() {
                MessagingService.this.logDroppedMessages();
            }
        };
        StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, 5000L, 5000L, TimeUnit.MILLISECONDS);
        Function<Pair<String, ExpiringMap.CacheableObject<CallbackInfo>>, Object> timeoutReporter = new Function<Pair<String, ExpiringMap.CacheableObject<CallbackInfo>>, Object>(){

            public Object apply(Pair<String, ExpiringMap.CacheableObject<CallbackInfo>> pair) {
                CallbackInfo expiredCallbackInfo = (CallbackInfo)((ExpiringMap.CacheableObject)pair.right).value;
                MessagingService.this.maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, ((ExpiringMap.CacheableObject)pair.right).timeout);
                ConnectionMetrics.totalTimeouts.mark();
                MessagingService.this.getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
                if (expiredCallbackInfo.shouldHint()) {
                    assert (expiredCallbackInfo.sentMessage != null);
                    RowMutation rm = (RowMutation)expiredCallbackInfo.sentMessage.payload;
                    return StorageProxy.submitHint(rm, expiredCallbackInfo.target, null, null);
                }
                return null;
            }
        };
        this.callbacks = new ExpiringMap<String, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void maybeAddLatency(IMessageCallback cb, InetAddress address, long latency) {
        if (cb.isLatencyForSnitch()) {
            this.addLatency(address, latency);
        }
    }

    public void addLatency(InetAddress address, long latency) {
        for (ILatencySubscriber subscriber : this.subscribers) {
            subscriber.receiveTiming(address, latency);
        }
    }

    public void convict(InetAddress ep) {
        logger.debug("Resetting pool for " + ep);
        this.getConnectionPool(ep).reset();
    }

    public void listen(InetAddress localEp) throws ConfigurationException {
        this.callbacks.reset();
        for (ServerSocket ss : this.getServerSocket(localEp)) {
            SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
            th.start();
            this.socketThreads.add(th);
        }
        this.listenGate.signalAll();
    }

    private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException {
        ArrayList<ServerSocket> ss = new ArrayList<ServerSocket>(2);
        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none) {
            try {
                ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getServerEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
            }
            catch (IOException e) {
                throw new ConfigurationException("Unable to create ssl socket", e);
            }
            logger.info("Starting Encrypted Messaging Service on SSL port {}", (Object)DatabaseDescriptor.getSSLStoragePort());
        }
        ServerSocketChannel serverChannel = null;
        try {
            serverChannel = ServerSocketChannel.open();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        ServerSocket socket = serverChannel.socket();
        try {
            socket.setReuseAddress(true);
        }
        catch (SocketException e) {
            throw new ConfigurationException("Insufficient permissions to setReuseAddress", e);
        }
        InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
        try {
            socket.bind(address);
        }
        catch (BindException e) {
            if (e.getMessage().contains("in use")) {
                throw new ConfigurationException(address + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
            }
            if (e.getMessage().contains("Cannot assign requested address")) {
                throw new ConfigurationException("Unable to bind to address " + address + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
            }
            throw new RuntimeException(e);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        logger.info("Starting Messaging Service on port {}", (Object)DatabaseDescriptor.getStoragePort());
        ss.add(socket);
        return ss;
    }

    public void waitUntilListening() {
        try {
            this.listenGate.await();
        }
        catch (InterruptedException ie) {
            logger.debug("await interrupted");
        }
    }

    public void destroyConnectionPool(InetAddress to) {
        OutboundTcpConnectionPool cp = (OutboundTcpConnectionPool)this.connectionManagers.get((Object)to);
        if (cp == null) {
            return;
        }
        cp.close();
        this.connectionManagers.remove((Object)to);
    }

    public OutboundTcpConnectionPool getConnectionPool(InetAddress to) {
        OutboundTcpConnectionPool cp = (OutboundTcpConnectionPool)this.connectionManagers.get((Object)to);
        if (cp == null) {
            this.connectionManagers.putIfAbsent((Object)to, (Object)new OutboundTcpConnectionPool(to));
            cp = (OutboundTcpConnectionPool)this.connectionManagers.get((Object)to);
        }
        return cp;
    }

    public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg) {
        return this.getConnectionPool(to).getConnection(msg);
    }

    public void registerVerbHandlers(Verb verb, IVerbHandler verbHandler) {
        assert (!this.verbHandlers.containsKey((Object)verb));
        this.verbHandlers.put(verb, verbHandler);
    }

    public IVerbHandler getVerbHandler(Verb type) {
        return this.verbHandlers.get((Object)type);
    }

    public String addCallback(IMessageCallback cb, MessageOut message, InetAddress to, long timeout) {
        String messageId = MessagingService.nextId();
        CallbackInfo previous = DatabaseDescriptor.hintedHandoffEnabled() && message.verb == Verb.MUTATION ? this.callbacks.put(messageId, new CallbackInfo(to, cb, message, callbackDeserializers.get((Object)message.verb)), timeout) : this.callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get((Object)message.verb)), timeout);
        assert (previous == null);
        return messageId;
    }

    private static String nextId() {
        return Integer.toString(idGen.incrementAndGet());
    }

    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb) {
        return this.sendRR(message, to, cb, message.getTimeout());
    }

    public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout) {
        String id = this.addCallback(cb, message, to, timeout);
        if (cb instanceof AbstractWriteResponseHandler) {
            PBSPredictor.instance().startWriteOperation(id);
        } else if (cb instanceof ReadCallback) {
            PBSPredictor.instance().startReadOperation(id);
        }
        this.sendOneWay(message, id, to);
        return id;
    }

    public void sendOneWay(MessageOut message, InetAddress to) {
        this.sendOneWay(message, MessagingService.nextId(), to);
    }

    public void sendReply(MessageOut message, String id, InetAddress to) {
        this.sendOneWay(message, id, to);
    }

    public void sendOneWay(MessageOut message, String id, InetAddress to) {
        MessageOut processedMessage;
        if (logger.isTraceEnabled()) {
            logger.trace(FBUtilities.getBroadcastAddress() + " sending " + (Object)((Object)message.verb) + " to " + id + "@" + to);
        }
        if (to.equals(FBUtilities.getBroadcastAddress())) {
            logger.trace("Message-to-self {} going over MessagingService", (Object)message);
        }
        if ((processedMessage = SinkManager.processOutboundMessage(message, id, to)) == null) {
            return;
        }
        OutboundTcpConnection connection = this.getConnection(to, processedMessage);
        connection.enqueue(processedMessage, id);
    }

    public <T> IAsyncResult<T> sendRR(MessageOut message, InetAddress to) {
        AsyncResult iar = new AsyncResult();
        this.sendRR(message, to, iar);
        return iar;
    }

    public void stream(StreamHeader header, InetAddress to) {
        DebuggableThreadPoolExecutor old;
        DebuggableThreadPoolExecutor executor = (DebuggableThreadPoolExecutor)this.streamExecutors.get(to);
        if (executor == null && (old = this.streamExecutors.putIfAbsent(to, executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("Streaming to " + to, 1, 1, TimeUnit.SECONDS))) != null) {
            executor.shutdown();
            executor = old;
        }
        executor.execute(header.file == null || header.file.compressionInfo == null ? new FileStreamTask(header, to) : new CompressedFileStreamTask(header, to));
    }

    public void register(ILatencySubscriber subcriber) {
        this.subscribers.add(subcriber);
    }

    public void clearCallbacksUnsafe() {
        this.callbacks.reset();
    }

    public void waitForStreaming() throws InterruptedException {
        for (DebuggableThreadPoolExecutor e : this.streamExecutors.values()) {
            e.shutdown();
        }
        for (DebuggableThreadPoolExecutor e : this.streamExecutors.values()) {
            if (e.awaitTermination(24L, TimeUnit.HOURS)) continue;
            logger.error("Stream took more than 24H to complete; skipping");
        }
    }

    public void shutdown() {
        logger.info("Waiting for messaging service to quiesce");
        assert (!StageManager.getStage(Stage.MUTATION).isShutdown());
        this.callbacks.shutdownBlocking();
        try {
            for (SocketThread th : this.socketThreads) {
                th.close();
            }
        }
        catch (IOException e) {
            throw new IOError(e);
        }
    }

    public void receive(MessageIn message, String id, long timestamp) {
        TraceState state = Tracing.instance().initializeFromMessage(message);
        if (state != null) {
            state.trace("Message received from {}", message.from);
        }
        if ((message = SinkManager.processInboundMessage(message, id)) == null) {
            return;
        }
        MessageDeliveryTask runnable = new MessageDeliveryTask(message, id, timestamp);
        TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
        assert (stage != null) : "No stage for message type " + (Object)((Object)message.verb);
        if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled()) {
            IMessageCallback cb = MessagingService.instance().getRegisteredCallback((String)id).callback;
            if (cb instanceof AbstractWriteResponseHandler) {
                PBSPredictor.instance().logWriteResponse(id, timestamp);
            } else if (cb instanceof ReadCallback) {
                PBSPredictor.instance().logReadResponse(id, timestamp);
            }
        }
        stage.execute(runnable, state);
    }

    public void setCallbackForTests(String messageId, CallbackInfo callback) {
        this.callbacks.put(messageId, callback);
    }

    public CallbackInfo getRegisteredCallback(String messageId) {
        return this.callbacks.get(messageId);
    }

    public CallbackInfo removeRegisteredCallback(String messageId) {
        return this.callbacks.remove(messageId);
    }

    public long getRegisteredCallbackAge(String messageId) {
        return this.callbacks.getAge(messageId);
    }

    public static void validateMagic(int magic) throws IOException {
        if (magic != -900387334) {
            throw new IOException("invalid protocol header");
        }
    }

    public static int getBits(int packed, int start, int count) {
        return packed >>> start + 1 - count & ~(-1 << count);
    }

    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version) {
        byte[] bytes;
        Object buffer;
        int header = 0;
        if (compress) {
            header |= 4;
        }
        header |= 8;
        header |= version << 8;
        try {
            buffer = new DataOutputBuffer();
            StreamHeader.serializer.serialize(streamHeader, (DataOutput)buffer, version);
            bytes = ((DataOutputBuffer)buffer).getData();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        assert (bytes.length > 0);
        buffer = ByteBuffer.allocate(12 + bytes.length);
        ((ByteBuffer)buffer).putInt(-900387334);
        ((ByteBuffer)buffer).putInt(header);
        ((ByteBuffer)buffer).putInt(bytes.length);
        ((ByteBuffer)buffer).put(bytes);
        ((ByteBuffer)buffer).flip();
        return buffer;
    }

    public int setVersion(InetAddress address, int version) {
        logger.debug("Setting version {} for {}", (Object)version, (Object)address);
        Integer v = this.versions.put(address, version);
        return v == null ? version : v;
    }

    public void resetVersion(InetAddress endpoint) {
        logger.debug("Reseting version for {}", (Object)endpoint);
        this.versions.remove(endpoint);
    }

    public Integer getVersion(InetAddress address) {
        Integer v = (Integer)this.versions.get(address);
        if (v == null) {
            logger.trace("Assuming current protocol version for {}", (Object)address);
            return 6;
        }
        return v;
    }

    @Override
    public int getVersion(String address) throws UnknownHostException {
        return this.getVersion(InetAddress.getByName(address));
    }

    public boolean knowsVersion(InetAddress endpoint) {
        return this.versions.get(endpoint) != null;
    }

    public void incrementDroppedMessages(Verb verb) {
        assert (DROPPABLE_VERBS.contains((Object)verb)) : "Verb " + (Object)((Object)verb) + " should not legally be dropped";
        this.droppedMessages.get((Object)((Object)verb)).dropped.mark();
    }

    private void logDroppedMessages() {
        boolean logTpstats = false;
        for (Map.Entry<Verb, DroppedMessageMetrics> entry : this.droppedMessages.entrySet()) {
            Verb verb;
            int dropped = (int)entry.getValue().dropped.count();
            int recent = dropped - this.lastDroppedInternal.get((Object)(verb = entry.getKey()));
            if (recent <= 0) continue;
            logTpstats = true;
            logger.info("{} {} messages dropped in last {}ms", new Object[]{recent, verb, 5000});
            this.lastDroppedInternal.put(verb, dropped);
        }
        if (logTpstats) {
            StatusLogger.log();
        }
    }

    @Override
    public Map<String, Integer> getCommandPendingTasks() {
        HashMap<String, Integer> pendingTasks = new HashMap<String, Integer>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            pendingTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).cmdCon.getPendingMessages());
        }
        return pendingTasks;
    }

    public int getCommandPendingTasks(InetAddress address) {
        OutboundTcpConnectionPool connection = (OutboundTcpConnectionPool)this.connectionManagers.get((Object)address);
        return connection == null ? 0 : connection.cmdCon.getPendingMessages();
    }

    @Override
    public Map<String, Long> getCommandCompletedTasks() {
        HashMap<String, Long> completedTasks = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            completedTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).cmdCon.getCompletedMesssages());
        }
        return completedTasks;
    }

    @Override
    public Map<String, Long> getCommandDroppedTasks() {
        HashMap<String, Long> droppedTasks = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            droppedTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).cmdCon.getDroppedMessages());
        }
        return droppedTasks;
    }

    @Override
    public Map<String, Integer> getResponsePendingTasks() {
        HashMap<String, Integer> pendingTasks = new HashMap<String, Integer>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            pendingTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).ackCon.getPendingMessages());
        }
        return pendingTasks;
    }

    @Override
    public Map<String, Long> getResponseCompletedTasks() {
        HashMap<String, Long> completedTasks = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            completedTasks.put(((InetAddress)entry.getKey()).getHostAddress(), ((OutboundTcpConnectionPool)entry.getValue()).ackCon.getCompletedMesssages());
        }
        return completedTasks;
    }

    @Override
    public Map<String, Integer> getDroppedMessages() {
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        for (Map.Entry<Verb, DroppedMessageMetrics> entry : this.droppedMessages.entrySet()) {
            map.put(entry.getKey().toString(), (int)entry.getValue().dropped.count());
        }
        return map;
    }

    @Override
    public Map<String, Integer> getRecentlyDroppedMessages() {
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        for (Map.Entry<Verb, DroppedMessageMetrics> entry : this.droppedMessages.entrySet()) {
            map.put(entry.getKey().toString(), entry.getValue().getRecentlyDropped());
        }
        return map;
    }

    @Override
    public long getTotalTimeouts() {
        return ConnectionMetrics.totalTimeouts.count();
    }

    @Override
    public long getRecentTotalTimouts() {
        return ConnectionMetrics.getRecentTotalTimeout();
    }

    @Override
    public Map<String, Long> getTimeoutsPerHost() {
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            String ip = ((InetAddress)entry.getKey()).getHostAddress();
            long recent = ((OutboundTcpConnectionPool)entry.getValue()).getTimeouts();
            result.put(ip, recent);
        }
        return result;
    }

    @Override
    public Map<String, Long> getRecentTimeoutsPerHost() {
        HashMap<String, Long> result = new HashMap<String, Long>();
        for (Map.Entry entry : this.connectionManagers.entrySet()) {
            String ip = ((InetAddress)entry.getKey()).getHostAddress();
            long recent = ((OutboundTcpConnectionPool)entry.getValue()).getRecentTimeouts();
            result.put(ip, recent);
        }
        return result;
    }

    private static class SocketThread
    extends Thread {
        private final ServerSocket server;

        SocketThread(ServerSocket server, String name) {
            super(name);
            this.server = server;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Socket socket;
                    if (this.authenticate(socket = this.server.accept())) {
                        new IncomingTcpConnection(socket).start();
                        continue;
                    }
                    socket.close();
                }
            }
            catch (AsynchronousCloseException e) {
                logger.info("MessagingService shutting down server thread.");
            }
            catch (ClosedChannelException e) {
                logger.debug("MessagingService server thread already closed.");
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        void close() throws IOException {
            this.server.close();
        }

        private boolean authenticate(Socket socket) {
            return DatabaseDescriptor.getInternodeAuthenticator().authenticate(socket.getInetAddress(), socket.getPort());
        }
    }

    private static class MSHandle {
        public static final MessagingService instance = new MessagingService();

        private MSHandle() {
        }
    }

    static class CallbackDeterminedSerializer
    implements IVersionedSerializer<Object> {
        public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();

        CallbackDeterminedSerializer() {
        }

        @Override
        public Object deserialize(DataInput in, int version) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public void serialize(Object o, DataOutput out, int version) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public long serializedSize(Object o, int version) {
            throw new UnsupportedOperationException();
        }
    }

    public static enum Verb {
        MUTATION,
        BINARY,
        READ_REPAIR,
        READ,
        REQUEST_RESPONSE,
        STREAM_INITIATE,
        STREAM_INITIATE_DONE,
        STREAM_REPLY,
        STREAM_REQUEST,
        RANGE_SLICE,
        BOOTSTRAP_TOKEN,
        TREE_REQUEST,
        TREE_RESPONSE,
        JOIN,
        GOSSIP_DIGEST_SYN,
        GOSSIP_DIGEST_ACK,
        GOSSIP_DIGEST_ACK2,
        DEFINITIONS_ANNOUNCE,
        DEFINITIONS_UPDATE,
        TRUNCATE,
        SCHEMA_CHECK,
        INDEX_SCAN,
        REPLICATION_FINISHED,
        INTERNAL_RESPONSE,
        COUNTER_MUTATION,
        STREAMING_REPAIR_REQUEST,
        STREAMING_REPAIR_RESPONSE,
        SNAPSHOT,
        MIGRATION_REQUEST,
        GOSSIP_SHUTDOWN,
        _TRACE,
        UNUSED_1,
        UNUSED_2,
        UNUSED_3;

    }
}

