/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.BodyReadStream;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.MessageProducerImpl;
import io.vertx.core.eventbus.impl.codecs.BooleanMessageCodec;
import io.vertx.core.eventbus.impl.codecs.BufferMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ByteArrayMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ByteMessageCodec;
import io.vertx.core.eventbus.impl.codecs.CharMessageCodec;
import io.vertx.core.eventbus.impl.codecs.DoubleMessageCodec;
import io.vertx.core.eventbus.impl.codecs.FloatMessageCodec;
import io.vertx.core.eventbus.impl.codecs.IntMessageCodec;
import io.vertx.core.eventbus.impl.codecs.JsonArrayMessageCodec;
import io.vertx.core.eventbus.impl.codecs.JsonObjectMessageCodec;
import io.vertx.core.eventbus.impl.codecs.LongMessageCodec;
import io.vertx.core.eventbus.impl.codecs.NullMessageCodec;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ReplyExceptionMessageCodec;
import io.vertx.core.eventbus.impl.codecs.ShortMessageCodec;
import io.vertx.core.eventbus.impl.codecs.StringMessageCodec;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.Closeable;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.impl.LoggerFactory;
import io.vertx.core.metrics.spi.EventBusMetrics;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

public class EventBusImpl
implements EventBus {
    private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
    private static final MessageCodec<String, String> PING_MESSAGE_CODEC = new PingMessageCodec();
    private static final MessageCodec<String, String> NULL_MESSAGE_CODEC = new NullMessageCodec();
    private static final MessageCodec<String, String> STRING_MESSAGE_CODEC = new StringMessageCodec();
    private static final MessageCodec<Buffer, Buffer> BUFFER_MESSAGE_CODEC = new BufferMessageCodec();
    private static final MessageCodec<JsonObject, JsonObject> JSON_OBJECT_MESSAGE_CODEC = new JsonObjectMessageCodec();
    private static final MessageCodec<JsonArray, JsonArray> JSON_ARRAY_MESSAGE_CODEC = new JsonArrayMessageCodec();
    private static final MessageCodec<byte[], byte[]> BYTE_ARRAY_MESSAGE_CODEC = new ByteArrayMessageCodec();
    private static final MessageCodec<Integer, Integer> INT_MESSAGE_CODEC = new IntMessageCodec();
    private static final MessageCodec<Long, Long> LONG_MESSAGE_CODEC = new LongMessageCodec();
    private static final MessageCodec<Float, Float> FLOAT_MESSAGE_CODEC = new FloatMessageCodec();
    private static final MessageCodec<Double, Double> DOUBLE_MESSAGE_CODEC = new DoubleMessageCodec();
    private static final MessageCodec<Boolean, Boolean> BOOLEAN_MESSAGE_CODEC = new BooleanMessageCodec();
    private static final MessageCodec<Short, Short> SHORT_MESSAGE_CODEC = new ShortMessageCodec();
    private static final MessageCodec<Character, Character> CHAR_MESSAGE_CODEC = new CharMessageCodec();
    private static final MessageCodec<Byte, Byte> BYTE_MESSAGE_CODEC = new ByteMessageCodec();
    private static final MessageCodec<ReplyException, ReplyException> REPLY_EXCEPTION_MESSAGE_CODEC = new ReplyExceptionMessageCodec();
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});
    private static final String PING_ADDRESS = "__vertx_ping";
    private final VertxInternal vertx;
    private final long pingInterval;
    private final long pingReplyInterval;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<ServerID, ConnectionHolder>();
    private final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<String, Handlers>();
    private final ConcurrentMap<String, MessageCodec> userCodecMap = new ConcurrentHashMap<String, MessageCodec>();
    private final ConcurrentMap<Class, MessageCodec> defaultCodecMap = new ConcurrentHashMap<Class, MessageCodec>();
    private final ClusterManager clusterMgr;
    private final AtomicLong replySequence = new AtomicLong(0L);
    private final EventBusMetrics metrics;
    private final AsyncMultiMap<String, ServerID> subs;
    private final MessageCodec[] systemCodecs;
    private final ServerID serverID;
    private final NetServer server;
    private volatile boolean sendPong = true;

    public EventBusImpl(VertxInternal vertx) {
        this.vertx = vertx;
        this.pingInterval = -1L;
        this.pingReplyInterval = -1L;
        this.serverID = new ServerID(-1, "localhost");
        this.server = null;
        this.subs = null;
        this.clusterMgr = null;
        this.metrics = vertx.metricsSPI().createMetrics(this);
        this.systemCodecs = this.systemCodecs();
    }

    public EventBusImpl(VertxInternal vertx, long pingInterval, long pingReplyInterval, ClusterManager clusterManager, AsyncMultiMap<String, ServerID> subs, ServerID serverID, EventBusNetServer server) {
        this.vertx = vertx;
        this.clusterMgr = clusterManager;
        this.metrics = vertx.metricsSPI().createMetrics(this);
        this.pingInterval = pingInterval;
        this.pingReplyInterval = pingReplyInterval;
        this.subs = subs;
        this.systemCodecs = this.systemCodecs();
        this.serverID = serverID;
        this.server = server.netServer;
        this.setServerHandler(server);
    }

    @Override
    public EventBus send(String address, Object message) {
        return this.send(address, message, new DeliveryOptions(), null);
    }

    @Override
    public <T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
        return this.send(address, message, new DeliveryOptions(), replyHandler);
    }

    @Override
    public <T> EventBus send(String address, Object message, DeliveryOptions options) {
        return this.send(address, message, options, null);
    }

    @Override
    public <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        this.sendOrPub(null, this.createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
        return this;
    }

    @Override
    public <T> MessageProducer<T> sender(String address) {
        Objects.requireNonNull(address, "address");
        return new MessageProducerImpl(this, address, true, new DeliveryOptions());
    }

    @Override
    public <T> MessageProducer<T> sender(String address, DeliveryOptions options) {
        Objects.requireNonNull(address, "address");
        Objects.requireNonNull(options, "options");
        return new MessageProducerImpl(this, address, true, options);
    }

    @Override
    public <T> MessageProducer<T> publisher(String address) {
        Objects.requireNonNull(address, "address");
        return new MessageProducerImpl(this, address, false, new DeliveryOptions());
    }

    @Override
    public <T> MessageProducer<T> publisher(String address, DeliveryOptions options) {
        Objects.requireNonNull(address, "address");
        Objects.requireNonNull(options, "options");
        return new MessageProducerImpl(this, address, false, options);
    }

    @Override
    public EventBus publish(String address, Object message) {
        return this.publish(address, message, new DeliveryOptions());
    }

    @Override
    public EventBus publish(String address, Object message, DeliveryOptions options) {
        this.sendOrPub(null, this.createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
        return this;
    }

    @Override
    public <T> MessageConsumer<T> consumer(String address) {
        Objects.requireNonNull(address, "address");
        return new HandlerRegistration(address, false, false, -1L);
    }

    @Override
    public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = this.consumer(address);
        consumer.handler((Handler)handler);
        return consumer;
    }

    @Override
    public <T> MessageConsumer<T> localConsumer(String address) {
        Objects.requireNonNull(address, "address");
        return new HandlerRegistration(address, false, true, -1L);
    }

    @Override
    public <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = this.localConsumer(address);
        consumer.handler((Handler)handler);
        return consumer;
    }

    @Override
    public EventBus registerCodec(MessageCodec codec) {
        Objects.requireNonNull(codec, "codec");
        Objects.requireNonNull(codec.name(), "code.name()");
        this.checkSystemCodec(codec);
        if (this.userCodecMap.containsKey(codec.name())) {
            throw new IllegalStateException("Already a codec registered with name " + codec.name());
        }
        this.userCodecMap.put(codec.name(), codec);
        return this;
    }

    @Override
    public EventBus unregisterCodec(String name) {
        Objects.requireNonNull(name);
        this.userCodecMap.remove(name);
        return this;
    }

    @Override
    public <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) {
        Objects.requireNonNull(clazz);
        Objects.requireNonNull(codec, "codec");
        Objects.requireNonNull(codec.name(), "code.name()");
        this.checkSystemCodec(codec);
        if (this.defaultCodecMap.containsKey(clazz)) {
            throw new IllegalStateException("Already a default codec registered for class " + clazz);
        }
        if (this.userCodecMap.containsKey(codec.name())) {
            throw new IllegalStateException("Already a codec registered with name " + codec.name());
        }
        this.defaultCodecMap.put(clazz, codec);
        this.userCodecMap.put(codec.name(), codec);
        return this;
    }

    @Override
    public EventBus unregisterDefaultCodec(Class clazz) {
        Objects.requireNonNull(clazz);
        MessageCodec codec = (MessageCodec)this.defaultCodecMap.remove(clazz);
        if (codec != null) {
            this.userCodecMap.remove(codec.name());
        }
        return this;
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        if (this.server != null) {
            this.server.close(ar -> {
                if (ar.failed()) {
                    log.error("Failed to close server", ar.cause());
                }
                this.closeClusterManager(completionHandler);
            });
        } else {
            this.closeClusterManager(completionHandler);
        }
    }

    @Override
    public String metricBaseName() {
        return this.metrics.baseName();
    }

    @Override
    public Map<String, JsonObject> metrics() {
        String name = this.metricBaseName();
        return this.vertx.metrics().entrySet().stream().filter(e -> ((String)e.getKey()).startsWith(name)).collect(Collectors.toMap(e -> ((String)e.getKey()).substring(name.length() + 1), Map.Entry::getValue));
    }

    <T> void sendReply(ServerID dest, MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        if (message.address() == null) {
            this.sendNoHandlersFailure(null, replyHandler);
        } else {
            this.sendOrPub(dest, message, options, replyHandler);
        }
    }

    public void simulateUnresponsive() {
        this.sendPong = false;
    }

    MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
        MessageCodec<String, String> codec;
        Objects.requireNonNull(address, "no null address accepted");
        if (codecName != null) {
            codec = (MessageCodec<String, String>)this.userCodecMap.get(codecName);
            if (codec == null) {
                throw new IllegalArgumentException("No message codec for name: " + codecName);
            }
        } else if (body == null) {
            codec = NULL_MESSAGE_CODEC;
        } else if (body instanceof String) {
            codec = STRING_MESSAGE_CODEC;
        } else if (body instanceof Buffer) {
            codec = BUFFER_MESSAGE_CODEC;
        } else if (body instanceof JsonObject) {
            codec = JSON_OBJECT_MESSAGE_CODEC;
        } else if (body instanceof JsonArray) {
            codec = JSON_ARRAY_MESSAGE_CODEC;
        } else if (body instanceof byte[]) {
            codec = BYTE_ARRAY_MESSAGE_CODEC;
        } else if (body instanceof Integer) {
            codec = INT_MESSAGE_CODEC;
        } else if (body instanceof Long) {
            codec = LONG_MESSAGE_CODEC;
        } else if (body instanceof Float) {
            codec = FLOAT_MESSAGE_CODEC;
        } else if (body instanceof Double) {
            codec = DOUBLE_MESSAGE_CODEC;
        } else if (body instanceof Boolean) {
            codec = BOOLEAN_MESSAGE_CODEC;
        } else if (body instanceof Short) {
            codec = SHORT_MESSAGE_CODEC;
        } else if (body instanceof Character) {
            codec = CHAR_MESSAGE_CODEC;
        } else if (body instanceof Byte) {
            codec = BYTE_MESSAGE_CODEC;
        } else if (body instanceof ReplyException) {
            codec = REPLY_EXCEPTION_MESSAGE_CODEC;
        } else {
            codec = (MessageCodec<String, String>)this.defaultCodecMap.get(body.getClass());
            if (codec == null) {
                throw new IllegalArgumentException("No message codec for type: " + body.getClass());
            }
        }
        MessageImpl<String, String> msg = new MessageImpl<String, String>(this.serverID, address, null, headers, (String)body, codec, send);
        return msg;
    }

    private void checkSystemCodec(MessageCodec codec) {
        if (codec.systemCodecID() != -1) {
            throw new IllegalArgumentException("Can't register a system codec");
        }
    }

    private void closeClusterManager(Handler<AsyncResult<Void>> completionHandler) {
        if (this.clusterMgr != null) {
            this.clusterMgr.leave(ar -> {
                if (ar.failed()) {
                    log.error("Failed to leave cluster", ar.cause());
                }
                if (completionHandler != null) {
                    this.vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
                }
            });
        } else if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
        }
    }

    private void setServerHandler(EventBusNetServer server) {
        Handler<NetSocket> sockHandler = socket -> {
            final RecordParser parser = RecordParser.newFixed(4, null);
            Handler<Buffer> handler = new Handler<Buffer>(){
                int size = -1;

                @Override
                public void handle(Buffer buff) {
                    if (this.size == -1) {
                        this.size = buff.getInt(0);
                        parser.fixedSizeMode(this.size);
                    } else {
                        MessageImpl received = new MessageImpl();
                        received.readFromWire(socket, buff, EventBusImpl.this.userCodecMap, EventBusImpl.this.systemCodecs);
                        parser.fixedSizeMode(4);
                        this.size = -1;
                        if (received.codec() == PING_MESSAGE_CODEC) {
                            if (EventBusImpl.this.sendPong) {
                                socket.write(PONG);
                            }
                        } else {
                            EventBusImpl.this.receiveMessage(received, -1L, null, null);
                        }
                    }
                }
            };
            parser.setOutput(handler);
            socket.handler((Handler)parser);
        };
        server.setHandler(sockHandler);
    }

    private <T> void sendToSubs(ChoosableIterable<ServerID> subs, MessageImpl message, long timeoutID, Handler<AsyncResult<Message<T>>> asyncResultHandler, Handler<Message<T>> replyHandler) {
        if (message.send()) {
            ServerID sid = subs.choose();
            if (!sid.equals(this.serverID)) {
                this.sendRemote(sid, message);
            } else {
                this.receiveMessage(message, timeoutID, asyncResultHandler, replyHandler);
            }
        } else {
            for (ServerID sid : subs) {
                if (!sid.equals(this.serverID)) {
                    this.sendRemote(sid, message);
                    continue;
                }
                this.receiveMessage(message, timeoutID, null, replyHandler);
            }
        }
    }

    private MessageCodec[] systemCodecs() {
        return this.codecs(NULL_MESSAGE_CODEC, PING_MESSAGE_CODEC, STRING_MESSAGE_CODEC, BUFFER_MESSAGE_CODEC, JSON_OBJECT_MESSAGE_CODEC, JSON_ARRAY_MESSAGE_CODEC, BYTE_ARRAY_MESSAGE_CODEC, INT_MESSAGE_CODEC, LONG_MESSAGE_CODEC, FLOAT_MESSAGE_CODEC, DOUBLE_MESSAGE_CODEC, BOOLEAN_MESSAGE_CODEC, SHORT_MESSAGE_CODEC, CHAR_MESSAGE_CODEC, BYTE_MESSAGE_CODEC, REPLY_EXCEPTION_MESSAGE_CODEC);
    }

    private MessageCodec[] codecs(MessageCodec ... codecs) {
        MessageCodec[] arr = new MessageCodec[codecs.length];
        MessageCodec[] messageCodecArray = codecs;
        int n = messageCodecArray.length;
        for (int i = 0; i < n; ++i) {
            MessageCodec codec;
            arr[codec.systemCodecID()] = codec = messageCodecArray[i];
        }
        return arr;
    }

    private String generateReplyAddress() {
        if (this.clusterMgr != null) {
            return UUID.randomUUID().toString();
        }
        return Long.toString(this.replySequence.incrementAndGet());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void sendOrPub(ServerID replyDest, MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        this.checkStarted();
        this.metrics.messageSent(message.address(), !message.send());
        ContextImpl context = this.vertx.getOrCreateContext();
        Handler simpleReplyHandler = null;
        try {
            long timeoutID = -1L;
            if (replyHandler != null) {
                message.setReplyAddress(this.generateReplyAddress());
                AtomicReference<MessageConsumer> refReg = new AtomicReference<MessageConsumer>();
                timeoutID = this.vertx.setTimer(options.getSendTimeout(), timerID -> {
                    log.warn("Message reply handler timed out as no reply was received - it will be removed");
                    ((MessageConsumer)refReg.get()).unregister();
                    this.metrics.replyFailure(message.address(), ReplyFailure.TIMEOUT);
                    replyHandler.handle(Future.failedFuture(new ReplyException(ReplyFailure.TIMEOUT, "Timed out waiting for reply")));
                });
                simpleReplyHandler = this.convertHandler(replyHandler);
                MessageConsumer registration = this.registerHandler(message.replyAddress(), simpleReplyHandler, true, true, timeoutID);
                refReg.set(registration);
            }
            if (replyDest != null) {
                if (!replyDest.equals(this.serverID)) {
                    this.sendRemote(replyDest, message);
                } else {
                    this.receiveMessage(message, timeoutID, replyHandler, simpleReplyHandler);
                }
            } else if (this.subs != null) {
                long fTimeoutID = timeoutID;
                Handler fSimpleReplyHandler = simpleReplyHandler;
                this.subs.get(message.address(), asyncResult -> {
                    if (asyncResult.succeeded()) {
                        ChoosableIterable serverIDs = (ChoosableIterable)asyncResult.result();
                        if (serverIDs != null && !serverIDs.isEmpty()) {
                            this.sendToSubs(serverIDs, message, fTimeoutID, replyHandler, fSimpleReplyHandler);
                        } else {
                            this.receiveMessage(message, fTimeoutID, replyHandler, fSimpleReplyHandler);
                        }
                    } else {
                        log.error("Failed to send message", asyncResult.cause());
                    }
                });
            } else {
                this.receiveMessage(message, timeoutID, replyHandler, simpleReplyHandler);
            }
        }
        finally {
            if (context != null) {
                ContextImpl.setContext(context);
            }
        }
    }

    private <T> Handler<Message<T>> convertHandler(Handler<AsyncResult<Message<T>>> handler) {
        return reply -> {
            Future result;
            if (reply.body() instanceof ReplyException) {
                ReplyException exception = (ReplyException)reply.body();
                this.metrics.replyFailure(reply.address(), exception.failureType());
                result = Future.failedFuture(exception);
            } else {
                result = Future.succeededFuture(reply);
            }
            handler.handle(result);
        };
    }

    private <T> MessageConsumer registerHandler(String address, Handler<Message<T>> handler, boolean replyHandler, boolean localOnly, long timeoutID) {
        HandlerRegistration registration = new HandlerRegistration(address, replyHandler, localOnly, timeoutID);
        registration.handler((Handler)handler);
        return registration;
    }

    private <T> void registerHandler(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly, long timeoutID) {
        boolean hasContext;
        this.checkStarted();
        Objects.requireNonNull(address, "address");
        Objects.requireNonNull(registration.handler, "handler");
        ContextImpl context = this.vertx.getContext();
        boolean bl = hasContext = context != null;
        if (!hasContext) {
            context = this.vertx.createEventLoopContext(null, new JsonObject(), Thread.currentThread().getContextClassLoader());
        }
        HandlerHolder holder = new HandlerHolder(registration, replyHandler, localOnly, context, timeoutID);
        Handlers handlers = (Handlers)this.handlerMap.get(address);
        if (handlers == null) {
            handlers = new Handlers();
            Handlers prevHandlers = this.handlerMap.putIfAbsent(address, handlers);
            if (prevHandlers != null) {
                handlers = prevHandlers;
            }
            if (this.subs != null && !replyHandler && !localOnly) {
                this.subs.add(address, this.serverID, x$0 -> registration.setResult(x$0));
            } else {
                registration.setResult(Future.succeededFuture());
            }
        } else {
            registration.setResult(Future.succeededFuture());
        }
        handlers.list.add(holder);
        if (hasContext) {
            HandlerEntry entry = new HandlerEntry(address, registration);
            context.addCloseHook(entry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void unregisterHandler(String address, Handler<Message<T>> handler, Handler<AsyncResult<Void>> completionHandler) {
        this.checkStarted();
        Handlers handlers = (Handlers)this.handlerMap.get(address);
        if (handlers != null) {
            Handlers handlers2 = handlers;
            synchronized (handlers2) {
                int size = handlers.list.size();
                for (int i = 0; i < size; ++i) {
                    HandlerHolder holder = handlers.list.get(i);
                    if (holder.handler != handler) continue;
                    if (holder.timeoutID != -1L) {
                        this.vertx.cancelTimer(holder.timeoutID);
                    }
                    handlers.list.remove(i);
                    holder.setRemoved();
                    if (handlers.list.isEmpty()) {
                        this.handlerMap.remove(address);
                        if (this.subs != null && !holder.localOnly) {
                            this.removeSub(address, this.serverID, completionHandler);
                        } else {
                            this.callCompletionHandlerAsync(completionHandler);
                        }
                    } else {
                        this.callCompletionHandlerAsync(completionHandler);
                    }
                    holder.context.removeCloseHook(new HandlerEntry(address, handler));
                    break;
                }
            }
        }
    }

    private <T> void unregisterHandler(String address, Handler<Message<T>> handler) {
        this.unregisterHandler(address, handler, null);
    }

    private void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) {
        if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
        }
    }

    private void cleanSubsForServerID(ServerID theServerID) {
        if (this.subs != null) {
            this.subs.removeAllForValue(theServerID, ar -> {});
        }
    }

    private void sendRemote(ServerID theServerID, MessageImpl message) {
        ConnectionHolder holder = (ConnectionHolder)this.connections.get(theServerID);
        if (holder == null) {
            holder = new ConnectionHolder(theServerID);
            ConnectionHolder prevHolder = this.connections.putIfAbsent(theServerID, holder);
            if (prevHolder != null) {
                holder = prevHolder;
            } else {
                holder.connect();
            }
        }
        holder.writeMessage(message);
    }

    private void removeSub(String subName, ServerID theServerID, Handler<AsyncResult<Void>> completionHandler) {
        this.subs.remove(subName, theServerID, ar -> {
            if (!ar.succeeded()) {
                log.error("Couldn't find sub to remove");
            } else {
                completionHandler.handle(Future.succeededFuture());
            }
        });
    }

    private <T> void receiveMessage(MessageImpl msg, long timeoutID, Handler<AsyncResult<Message<T>>> replyHandler, Handler<Message<T>> simpleReplyHandler) {
        msg.setBus(this);
        Handlers handlers = (Handlers)this.handlerMap.get(msg.address());
        if (handlers != null) {
            if (msg.send()) {
                HandlerHolder holder = handlers.choose();
                if (holder != null) {
                    this.doReceive(msg, holder);
                }
            } else {
                for (HandlerHolder holder : handlers.list) {
                    this.doReceive(msg, holder);
                }
            }
        } else if (replyHandler != null) {
            this.sendNoHandlersFailure(msg.address(), replyHandler);
            if (timeoutID != -1L) {
                this.vertx.cancelTimer(timeoutID);
            }
            if (simpleReplyHandler != null) {
                this.unregisterHandler(msg.replyAddress(), simpleReplyHandler);
            }
        }
    }

    private <T> void sendNoHandlersFailure(final String address, final Handler<AsyncResult<Message<T>>> handler) {
        this.vertx.runOnContext(new Handler<Void>(){

            @Override
            public void handle(Void v) {
                EventBusImpl.this.metrics.replyFailure(address, ReplyFailure.NO_HANDLERS);
                handler.handle(Future.failedFuture(new ReplyException(ReplyFailure.NO_HANDLERS)));
            }
        });
    }

    private <T> void doReceive(MessageImpl msg, HandlerHolder<T> holder) {
        MessageImpl copied = msg.copyBeforeReceive();
        holder.context.runOnContext(v -> {
            try {
                if (!holder.isRemoved()) {
                    this.metrics.messageReceived(msg.address());
                    handlerHolder.handler.handle(copied);
                }
            }
            finally {
                if (handlerHolder.replyHandler) {
                    this.unregisterHandler(msg.address(), handlerHolder.handler);
                }
            }
        });
    }

    private void checkStarted() {
        if (this.serverID == null) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }

    protected void finalize() throws Throwable {
        this.close(ar -> {});
        super.finalize();
    }

    public static class EventBusNetServer {
        private final NetServer netServer;
        private Handler<NetSocket> handler;

        public EventBusNetServer(NetServer netServer) {
            this.netServer = netServer;
            netServer.connectHandler(conn -> {
                EventBusNetServer eventBusNetServer = this;
                synchronized (eventBusNetServer) {
                    this.handler.handle((NetSocket)conn);
                }
            });
        }

        public synchronized void setHandler(Handler<NetSocket> handler) {
            this.handler = handler;
        }
    }

    public class HandlerRegistration<T>
    implements MessageConsumer<T>,
    Handler<Message<T>> {
        private final String address;
        private final boolean replyHandler;
        private final boolean localOnly;
        private final long timeoutID;
        private boolean registered;
        private Handler<Message<T>> handler;
        private AsyncResult<Void> result;
        private Handler<AsyncResult<Void>> completionHandler;
        private Handler<Void> endHandler;
        private Handler<Throwable> exceptionHandler;
        private Handler<Message<T>> discardHandler;
        private int maxBufferedMessages;
        private final Queue<Message<T>> pending = new ArrayDeque<Message<T>>(8);
        private boolean paused;

        public HandlerRegistration(String address, boolean replyHandler, boolean localOnly, long timeoutID) {
            this.address = address;
            this.replyHandler = replyHandler;
            this.localOnly = localOnly;
            this.timeoutID = timeoutID;
        }

        @Override
        public synchronized MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
            Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
            while (this.pending.size() > maxBufferedMessages) {
                this.pending.poll();
            }
            this.maxBufferedMessages = maxBufferedMessages;
            return this;
        }

        @Override
        public synchronized int getMaxBufferedMessages() {
            return this.maxBufferedMessages;
        }

        @Override
        public String address() {
            return this.address;
        }

        @Override
        public synchronized void completionHandler(Handler<AsyncResult<Void>> completionHandler) {
            Objects.requireNonNull(completionHandler);
            if (this.result != null) {
                AsyncResult<Void> value = this.result;
                EventBusImpl.this.vertx.runOnContext(v -> completionHandler.handle(value));
            } else {
                this.completionHandler = completionHandler;
            }
        }

        @Override
        public synchronized void unregister() {
            this.doUnregister(null);
        }

        @Override
        public synchronized void unregister(Handler<AsyncResult<Void>> completionHandler) {
            Objects.requireNonNull(completionHandler);
            this.doUnregister(completionHandler);
        }

        private void doUnregister(Handler<AsyncResult<Void>> completionHandler) {
            if (this.endHandler != null) {
                Handler<Void> theEndHandler = this.endHandler;
                Handler<AsyncResult<Void>> handler = completionHandler;
                completionHandler = ar -> {
                    theEndHandler.handle(null);
                    if (handler != null) {
                        handler.handle((AsyncResult<Void>)ar);
                    }
                };
            }
            if (this.registered) {
                this.registered = false;
                EventBusImpl.this.unregisterHandler(this.address, this, completionHandler);
                EventBusImpl.this.metrics.handlerUnregistered(this.address);
            } else {
                EventBusImpl.this.callCompletionHandlerAsync(completionHandler);
            }
            this.registered = false;
        }

        private synchronized void setResult(AsyncResult<Void> result) {
            this.result = result;
            if (this.completionHandler != null) {
                if (result.succeeded()) {
                    EventBusImpl.this.metrics.handlerRegistered(this.address);
                }
                Handler<AsyncResult<Void>> callback = this.completionHandler;
                EventBusImpl.this.vertx.runOnContext(v -> callback.handle(result));
            } else if (result.failed()) {
                log.error("Failed to propagate registration for handler " + this.handler + " and address " + this.address);
            } else {
                EventBusImpl.this.metrics.handlerRegistered(this.address);
            }
        }

        @Override
        public synchronized void handle(Message<T> event) {
            if (this.paused) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(event);
                } else if (this.discardHandler != null) {
                    this.discardHandler.handle(event);
                }
            } else {
                this.checkNextTick();
                this.handler.handle(event);
            }
        }

        public synchronized void discardHandler(Handler<Message<T>> handler) {
            this.discardHandler = handler;
        }

        @Override
        public synchronized MessageConsumer<T> handler(Handler<Message<T>> handler) {
            this.handler = handler;
            if (this.handler != null && !this.registered) {
                this.registered = true;
                EventBusImpl.this.registerHandler(this.address, this, this.replyHandler, this.localOnly, this.timeoutID);
            } else if (this.handler == null && this.registered) {
                this.unregister();
            }
            return this;
        }

        @Override
        public ReadStream<T> bodyStream() {
            return new BodyReadStream(this);
        }

        @Override
        public synchronized boolean isRegistered() {
            return this.registered;
        }

        @Override
        public synchronized MessageConsumer<T> pause() {
            if (!this.paused) {
                this.paused = true;
            }
            return this;
        }

        @Override
        public synchronized MessageConsumer<T> resume() {
            if (this.paused) {
                this.paused = false;
                this.checkNextTick();
            }
            return this;
        }

        @Override
        public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
            this.endHandler = endHandler;
            return this;
        }

        @Override
        public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
            this.exceptionHandler = handler;
            return this;
        }

        private void checkNextTick() {
            if (!this.pending.isEmpty()) {
                EventBusImpl.this.vertx.runOnContext(v -> {
                    Message<T> message;
                    if (!this.paused && (message = this.pending.poll()) != null) {
                        this.handle(message);
                    }
                });
            }
        }
    }

    private class HandlerEntry<T>
    implements Closeable {
        final String address;
        final Handler<Message<T>> handler;

        private HandlerEntry(String address, Handler<Message<T>> handler) {
            this.address = address;
            this.handler = handler;
        }

        public boolean equals(Object o) {
            if (o == null) {
                return false;
            }
            if (this == o) {
                return true;
            }
            if (this.getClass() != o.getClass()) {
                return false;
            }
            HandlerEntry entry = (HandlerEntry)o;
            if (!this.address.equals(entry.address)) {
                return false;
            }
            return this.handler.equals(entry.handler);
        }

        public int hashCode() {
            int result = this.address != null ? this.address.hashCode() : 0;
            result = 31 * result + (this.handler != null ? this.handler.hashCode() : 0);
            return result;
        }

        @Override
        public void close(Handler<AsyncResult<Void>> completionHandler) {
            EventBusImpl.this.unregisterHandler(this.address, this.handler, null);
            completionHandler.handle(Future.succeededFuture());
        }
    }

    private static class Handlers {
        final List<HandlerHolder> list = new CopyOnWriteArrayList<HandlerHolder>();
        final AtomicInteger pos = new AtomicInteger(0);

        private Handlers() {
        }

        HandlerHolder choose() {
            int size;
            while ((size = this.list.size()) != 0) {
                int p = this.pos.getAndIncrement();
                if (p >= size - 1) {
                    this.pos.set(0);
                }
                try {
                    return this.list.get(p);
                }
                catch (IndexOutOfBoundsException e) {
                    this.pos.set(0);
                    continue;
                }
                break;
            }
            return null;
        }
    }

    private class ConnectionHolder {
        final NetClient client;
        final Queue<MessageImpl> pending = new ArrayDeque<MessageImpl>();
        final ServerID theServerID;
        volatile NetSocket socket;
        volatile boolean connected;
        long timeoutID = -1L;
        long pingTimeoutID = -1L;

        private ConnectionHolder(ServerID serverID) {
            this.theServerID = serverID;
            this.client = EventBusImpl.this.vertx.createNetClient(new NetClientOptions().setConnectTimeout(60000));
        }

        void close(boolean failed) {
            if (this.timeoutID != -1L) {
                EventBusImpl.this.vertx.cancelTimer(this.timeoutID);
            }
            if (this.pingTimeoutID != -1L) {
                EventBusImpl.this.vertx.cancelTimer(this.pingTimeoutID);
            }
            try {
                this.client.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (EventBusImpl.this.connections.remove(this.theServerID, this)) {
                log.debug("Cluster connection closed: " + this.theServerID + " holder " + this);
                if (failed) {
                    EventBusImpl.this.cleanSubsForServerID(this.theServerID);
                }
            }
        }

        void schedulePing() {
            this.pingTimeoutID = EventBusImpl.this.vertx.setTimer(EventBusImpl.this.pingInterval, id1 -> {
                this.timeoutID = EventBusImpl.this.vertx.setTimer(EventBusImpl.this.pingReplyInterval, id2 -> {
                    log.warn("No pong from server " + EventBusImpl.this.serverID + " - will consider it dead");
                    this.close(true);
                });
                MessageImpl<String, String> pingMessage = new MessageImpl<String, String>(EventBusImpl.this.serverID, EventBusImpl.PING_ADDRESS, null, null, null, new PingMessageCodec(), true);
                this.socket.write(pingMessage.encodeToWire());
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void writeMessage(MessageImpl message) {
            if (this.connected) {
                this.socket.write(message.encodeToWire());
            } else {
                ConnectionHolder connectionHolder = this;
                synchronized (connectionHolder) {
                    if (this.connected) {
                        this.socket.write(message.encodeToWire());
                    } else {
                        this.pending.add(message);
                    }
                }
            }
        }

        synchronized void connected(NetSocket socket) {
            this.socket = socket;
            this.connected = true;
            socket.exceptionHandler(t -> this.close(true));
            socket.closeHandler(v -> this.close(false));
            socket.handler(data -> {
                EventBusImpl.this.vertx.cancelTimer(this.timeoutID);
                this.schedulePing();
            });
            this.schedulePing();
            for (MessageImpl message : this.pending) {
                socket.write(message.encodeToWire());
            }
            this.pending.clear();
        }

        void connect() {
            this.client.connect(this.theServerID.port, this.theServerID.host, res -> {
                if (res.succeeded()) {
                    this.connected((NetSocket)res.result());
                } else {
                    this.close(true);
                }
            });
        }
    }

    private static class HandlerHolder<T> {
        final ContextImpl context;
        final Handler<Message<T>> handler;
        final boolean replyHandler;
        final boolean localOnly;
        final long timeoutID;
        boolean removed;

        synchronized void setRemoved() {
            this.removed = true;
        }

        synchronized boolean isRemoved() {
            return this.removed;
        }

        HandlerHolder(Handler<Message<T>> handler, boolean replyHandler, boolean localOnly, ContextImpl context, long timeoutID) {
            this.context = context;
            this.handler = handler;
            this.replyHandler = replyHandler;
            this.localOnly = localOnly;
            this.timeoutID = timeoutID;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            HandlerHolder that = (HandlerHolder)o;
            return !(this.handler != null ? !this.handler.equals(that.handler) : that.handler != null);
        }

        public int hashCode() {
            return this.handler != null ? this.handler.hashCode() : 0;
        }
    }
}

