/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.MessageProducerImpl;
import io.vertx.core.eventbus.impl.MessageTagExtractor;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

public class EventBusImpl
implements EventBus,
MetricsProvider {
    private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
    private final List<Handler<DeliveryContext>> sendInterceptors = new CopyOnWriteArrayList<Handler<DeliveryContext>>();
    private final List<Handler<DeliveryContext>> receiveInterceptors = new CopyOnWriteArrayList<Handler<DeliveryContext>>();
    private final AtomicLong replySequence = new AtomicLong(0L);
    protected final VertxInternal vertx;
    protected final EventBusMetrics metrics;
    protected final ConcurrentMap<String, ConcurrentCyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap<String, ConcurrentCyclicSequence<HandlerHolder>>();
    protected final CodecManager codecManager = new CodecManager();
    protected volatile boolean started;
    private final ContextInternal sendNoContext;

    public EventBusImpl(VertxInternal vertx) {
        VertxMetrics metrics = vertx.metricsSPI();
        this.vertx = vertx;
        this.metrics = metrics != null ? metrics.createEventBusMetrics() : null;
        this.sendNoContext = vertx.getOrCreateContext();
    }

    @Override
    public <T> EventBus addOutboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
        this.sendInterceptors.add(interceptor);
        return this;
    }

    @Override
    public <T> EventBus addInboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
        this.receiveInterceptors.add(interceptor);
        return this;
    }

    @Override
    public <T> EventBus removeOutboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
        this.sendInterceptors.remove(interceptor);
        return this;
    }

    Iterator<Handler<DeliveryContext>> receiveInterceptors() {
        return this.receiveInterceptors.iterator();
    }

    @Override
    public <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
        this.receiveInterceptors.remove(interceptor);
        return this;
    }

    @Override
    public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
        if (this.started) {
            throw new IllegalStateException("Already started");
        }
        this.started = true;
        completionHandler.handle(Future.succeededFuture());
    }

    @Override
    public EventBus send(String address, Object message) {
        return this.request(address, message, new DeliveryOptions(), null);
    }

    @Override
    public <T> EventBus request(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
        return this.request(address, message, new DeliveryOptions(), replyHandler);
    }

    @Override
    public EventBus send(String address, Object message, DeliveryOptions options) {
        return this.request(address, message, options, null);
    }

    @Override
    public <T> EventBus request(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        this.sendOrPubInternal(this.createMessage(true, true, address, options.getHeaders(), message, options.getCodecName(), null), options, replyHandler);
        return this;
    }

    @Override
    public <T> MessageProducer<T> sender(String address) {
        Objects.requireNonNull(address, "address");
        return new MessageProducerImpl(this.vertx, address, true, new DeliveryOptions());
    }

    @Override
    public <T> MessageProducer<T> sender(String address, DeliveryOptions options) {
        Objects.requireNonNull(address, "address");
        Objects.requireNonNull(options, "options");
        return new MessageProducerImpl(this.vertx, address, true, options);
    }

    @Override
    public <T> MessageProducer<T> publisher(String address) {
        Objects.requireNonNull(address, "address");
        return new MessageProducerImpl(this.vertx, address, false, new DeliveryOptions());
    }

    @Override
    public <T> MessageProducer<T> publisher(String address, DeliveryOptions options) {
        Objects.requireNonNull(address, "address");
        Objects.requireNonNull(options, "options");
        return new MessageProducerImpl(this.vertx, address, false, options);
    }

    @Override
    public EventBus publish(String address, Object message) {
        return this.publish(address, message, new DeliveryOptions());
    }

    @Override
    public EventBus publish(String address, Object message, DeliveryOptions options) {
        this.sendOrPubInternal(this.createMessage(false, true, address, options.getHeaders(), message, options.getCodecName(), null), options, null);
        return this;
    }

    @Override
    public <T> MessageConsumer<T> consumer(String address) {
        this.checkStarted();
        Objects.requireNonNull(address, "address");
        return new HandlerRegistration(this.vertx, this.metrics, this, address, null, false, false);
    }

    @Override
    public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = this.consumer(address);
        consumer.handler((Handler)handler);
        return consumer;
    }

    @Override
    public <T> MessageConsumer<T> localConsumer(String address) {
        this.checkStarted();
        Objects.requireNonNull(address, "address");
        return new HandlerRegistration(this.vertx, this.metrics, this, address, null, true, false);
    }

    @Override
    public <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
        Objects.requireNonNull(handler, "handler");
        MessageConsumer<T> consumer = this.localConsumer(address);
        consumer.handler((Handler)handler);
        return consumer;
    }

    @Override
    public EventBus registerCodec(MessageCodec codec) {
        this.codecManager.registerCodec(codec);
        return this;
    }

    @Override
    public EventBus unregisterCodec(String name) {
        this.codecManager.unregisterCodec(name);
        return this;
    }

    @Override
    public <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) {
        this.codecManager.registerDefaultCodec(clazz, codec);
        return this;
    }

    @Override
    public EventBus unregisterDefaultCodec(Class clazz) {
        this.codecManager.unregisterDefaultCodec(clazz);
        return this;
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        this.checkStarted();
        this.unregisterAll();
        if (this.metrics != null) {
            this.metrics.close();
        }
        if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
        }
    }

    @Override
    public boolean isMetricsEnabled() {
        return this.metrics != null;
    }

    @Override
    public EventBusMetrics<?> getMetrics() {
        return this.metrics;
    }

    public MessageImpl createMessage(boolean send, boolean src, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
        Objects.requireNonNull(address, "no null address accepted");
        MessageCodec codec = this.codecManager.lookupCodec(body, codecName);
        MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, src, this, writeHandler);
        return msg;
    }

    protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) {
        Objects.requireNonNull(registration.getHandler(), "handler");
        LocalRegistrationResult<T> result = this.addLocalRegistration(address, registration, replyHandler, localOnly);
        this.addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
        return result.holder;
    }

    protected <T> void addRegistration(boolean newAddress, String address, boolean replyHandler, boolean localOnly, Handler<AsyncResult<Void>> completionHandler) {
        completionHandler.handle(Future.succeededFuture());
    }

    private <T> LocalRegistrationResult<T> addLocalRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) {
        boolean hasContext;
        Objects.requireNonNull(address, "address");
        Context context = Vertx.currentContext();
        boolean bl = hasContext = context != null;
        if (!hasContext) {
            context = this.vertx.getOrCreateContext();
        }
        registration.setHandlerContext(context);
        HandlerHolder<T> holder = new HandlerHolder<T>(registration, replyHandler, localOnly, context);
        ConcurrentCyclicSequence<HandlerHolder<T>> handlers = new ConcurrentCyclicSequence<HandlerHolder<T>>().add(holder);
        ConcurrentCyclicSequence actualHandlers = this.handlerMap.merge(address, handlers, (old, prev) -> old.add(prev.first()));
        if (hasContext) {
            HandlerEntry<T> entry = new HandlerEntry<T>(address, registration);
            context.addCloseHook(entry);
        }
        boolean newAddress = handlers == actualHandlers;
        return new LocalRegistrationResult<T>(holder, newAddress);
    }

    protected <T> void removeRegistration(HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
        boolean last = this.removeLocalRegistration(holder);
        this.removeRegistration(last ? holder : null, holder.getHandler().address(), completionHandler);
    }

    protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, String address, Handler<AsyncResult<Void>> completionHandler) {
        this.callCompletionHandlerAsync(completionHandler);
    }

    private <T> boolean removeLocalRegistration(HandlerHolder<T> holder) {
        boolean last;
        String address = holder.getHandler().address();
        boolean bl = last = this.handlerMap.compute(address, (key, val) -> {
            if (val == null) {
                return null;
            }
            ConcurrentCyclicSequence<HandlerHolder> next = val.remove(holder);
            return next.size() == 0 ? null : next;
        }) == null;
        if (holder.setRemoved()) {
            holder.getContext().removeCloseHook(new HandlerEntry<T>(address, holder.getHandler()));
        }
        return last;
    }

    protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        if (replyMessage.address() == null) {
            throw new IllegalStateException("address not specified");
        }
        ContextInternal ctx = this.vertx.getOrCreateContext();
        if (ctx == null) {
            ctx = this.sendNoContext;
        }
        ReplyHandler<T> handler = this.createReplyHandler(replyMessage, replierMessage.src, options, replyHandler);
        new OutboundDeliveryContext(ctx, replyMessage, options, handler, replierMessage).next();
    }

    protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
        this.sendOrPub(sendContext);
    }

    protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
        this.sendLocally(sendContext);
    }

    protected final Object messageSent(OutboundDeliveryContext<?> sendContext, boolean local, boolean remote) {
        VertxTracer tracer;
        MessageImpl msg = sendContext.message;
        if (this.metrics != null) {
            MessageImpl message = msg;
            this.metrics.messageSent(message.address(), !message.send, local, remote);
        }
        if ((tracer = sendContext.ctx.tracer()) != null && msg.src) {
            BiConsumer<String, String> biConsumer = (key, val) -> msg.headers().set((String)key, (String)val);
            return tracer.sendRequest(sendContext.ctx, msg, msg.send ? "send" : "publish", biConsumer, MessageTagExtractor.INSTANCE);
        }
        return null;
    }

    protected void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) {
        if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
        }
    }

    private <T> void sendLocally(OutboundDeliveryContext<T> sendContext) {
        Object trace = this.messageSent(sendContext, true, false);
        ReplyException failure = this.deliverMessageLocally(sendContext.message);
        if (failure != null) {
            VertxTracer tracer = sendContext.ctx.tracer();
            if (sendContext.replyHandler != null) {
                sendContext.replyHandler.trace = trace;
                sendContext.replyHandler.fail(failure);
            } else if (tracer != null && sendContext.message.src) {
                tracer.receiveResponse(sendContext.ctx, null, trace, failure, TagExtractor.empty());
            }
        } else {
            failure = null;
            VertxTracer tracer = sendContext.ctx.tracer();
            if (tracer != null && sendContext.message.src) {
                if (sendContext.replyHandler == null) {
                    tracer.receiveResponse(sendContext.ctx, null, trace, null, TagExtractor.empty());
                } else {
                    sendContext.replyHandler.trace = trace;
                }
            }
        }
    }

    protected boolean isMessageLocal(MessageImpl msg) {
        return true;
    }

    protected ReplyException deliverMessageLocally(MessageImpl msg) {
        ConcurrentCyclicSequence handlers = (ConcurrentCyclicSequence)this.handlerMap.get(msg.address());
        if (handlers != null) {
            if (msg.isSend()) {
                HandlerHolder holder = (HandlerHolder)handlers.next();
                if (this.metrics != null) {
                    this.metrics.messageReceived(msg.address(), !msg.isSend(), this.isMessageLocal(msg), holder != null ? 1 : 0);
                }
                if (holder != null) {
                    this.deliverToHandler(msg, holder);
                    Handler<AsyncResult<Void>> handler = msg.writeHandler;
                    if (handler != null) {
                        handler.handle(Future.succeededFuture());
                    }
                }
            } else {
                if (this.metrics != null) {
                    this.metrics.messageReceived(msg.address(), !msg.isSend(), this.isMessageLocal(msg), handlers.size());
                }
                for (HandlerHolder holder : handlers) {
                    this.deliverToHandler(msg, holder);
                }
                Handler<AsyncResult<Void>> handler = msg.writeHandler;
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                }
            }
            return null;
        }
        if (this.metrics != null) {
            this.metrics.messageReceived(msg.address(), !msg.isSend(), this.isMessageLocal(msg), 0);
        }
        ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
        Handler<AsyncResult<Void>> handler = msg.writeHandler;
        if (handler != null) {
            handler.handle(Future.failedFuture(failure));
        }
        return failure;
    }

    protected void checkStarted() {
        if (!this.started) {
            throw new IllegalStateException("Event Bus is not started");
        }
    }

    protected String generateReplyAddress() {
        return "__vertx.reply." + Long.toString(this.replySequence.incrementAndGet());
    }

    private <T> ReplyHandler<T> createReplyHandler(MessageImpl message, boolean src, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        if (replyHandler != null) {
            long timeout = options.getSendTimeout();
            String replyAddress = this.generateReplyAddress();
            message.setReplyAddress(replyAddress);
            HandlerRegistration registration = new HandlerRegistration(this.vertx, this.metrics, this, replyAddress, message.address, true, src);
            ReplyHandler handler = new ReplyHandler(registration, timeout);
            handler.result.future().setHandler(replyHandler);
            registration.handler((Handler)handler);
            return handler;
        }
        return null;
    }

    public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
        this.checkStarted();
        ReplyHandler<T> handler = this.createReplyHandler(message, true, options, replyHandler);
        ContextInternal ctx = this.vertx.getContext();
        if (ctx == null) {
            ctx = this.sendNoContext;
        }
        OutboundDeliveryContext sendContext = new OutboundDeliveryContext(ctx, message, options, (ReplyHandler)handler);
        sendContext.next();
    }

    private void unregisterAll() {
        for (ConcurrentCyclicSequence handlers : this.handlerMap.values()) {
            for (HandlerHolder holder : handlers) {
                holder.getHandler().unregister();
            }
        }
    }

    private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
        MessageImpl copied = msg.copyBeforeReceive(holder.getHandler().src);
        if (this.metrics != null) {
            this.metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
        }
        holder.getContext().runOnContext(v -> {
            try {
                if (!holder.isRemoved()) {
                    holder.getHandler().handle(copied);
                }
            }
            finally {
                if (holder.isReplyHandler()) {
                    holder.getHandler().unregister();
                }
            }
        });
    }

    protected void finalize() throws Throwable {
        this.close(ar -> {});
        super.finalize();
    }

    public class HandlerEntry<T>
    implements Closeable {
        final String address;
        final HandlerRegistration<T> handler;

        public HandlerEntry(String address, HandlerRegistration<T> handler) {
            this.address = address;
            this.handler = handler;
        }

        public boolean equals(Object o) {
            if (o == null) {
                return false;
            }
            if (this == o) {
                return true;
            }
            if (this.getClass() != o.getClass()) {
                return false;
            }
            HandlerEntry entry = (HandlerEntry)o;
            if (!this.address.equals(entry.address)) {
                return false;
            }
            return this.handler.equals(entry.handler);
        }

        public int hashCode() {
            int result = this.address != null ? this.address.hashCode() : 0;
            result = 31 * result + (this.handler != null ? this.handler.hashCode() : 0);
            return result;
        }

        @Override
        public void close(Handler<AsyncResult<Void>> completionHandler) {
            this.handler.unregister(completionHandler);
        }
    }

    protected class OutboundDeliveryContext<T>
    implements DeliveryContext<T> {
        public final ContextInternal ctx;
        public final MessageImpl message;
        public final DeliveryOptions options;
        public final Iterator<Handler<DeliveryContext>> iter;
        public final ReplyHandler<T> replyHandler;
        private final MessageImpl replierMessage;

        private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler) {
            this(ctx, message, options, replyHandler, null);
        }

        private OutboundDeliveryContext(ContextInternal ctx, MessageImpl message, DeliveryOptions options, ReplyHandler<T> replyHandler, MessageImpl replierMessage) {
            this.ctx = ctx;
            this.message = message;
            this.options = options;
            this.iter = EventBusImpl.this.sendInterceptors.iterator();
            this.replierMessage = replierMessage;
            this.replyHandler = replyHandler;
        }

        @Override
        public Message<T> message() {
            return this.message;
        }

        @Override
        public void next() {
            block7: {
                if (this.iter.hasNext()) {
                    Handler<DeliveryContext> handler = this.iter.next();
                    try {
                        if (handler != null) {
                            handler.handle(this);
                            break block7;
                        }
                        this.next();
                    }
                    catch (Throwable t) {
                        log.error("Failure in interceptor", t);
                    }
                } else if (this.replierMessage == null) {
                    EventBusImpl.this.sendOrPub(this);
                } else {
                    EventBusImpl.this.sendReply(this, this.replierMessage);
                }
            }
        }

        @Override
        public boolean send() {
            return this.message.isSend();
        }

        @Override
        public Object body() {
            return this.message.sentBody;
        }
    }

    public class ReplyHandler<T>
    implements Handler<Message<T>> {
        final Promise<Message<T>> result = Promise.promise();
        final HandlerRegistration<T> registration;
        final long timeoutID;
        public Object trace;

        ReplyHandler(HandlerRegistration<T> registration, long timeout) {
            this.registration = registration;
            this.timeoutID = EventBusImpl.this.vertx.setTimer(timeout, id -> this.fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + registration.address + ", repliedAddress: " + registration.repliedAddress)));
        }

        private void trace(Object reply, Throwable failure) {
            ContextInternal ctx = this.registration.handlerContext();
            VertxTracer tracer = ctx.tracer();
            if (tracer != null && this.registration.src) {
                tracer.receiveResponse(ctx, reply, this.trace, failure, TagExtractor.empty());
            }
        }

        void fail(ReplyException failure) {
            this.registration.unregister();
            if (EventBusImpl.this.metrics != null) {
                EventBusImpl.this.metrics.replyFailure(this.registration.repliedAddress, failure.failureType());
            }
            this.trace(null, failure);
            this.result.tryFail(failure);
        }

        @Override
        public void handle(Message<T> reply) {
            EventBusImpl.this.vertx.cancelTimer(this.timeoutID);
            if (reply.body() instanceof ReplyException) {
                this.fail((ReplyException)reply.body());
            } else {
                this.trace(reply, null);
                this.result.complete(reply);
            }
        }
    }

    private static class LocalRegistrationResult<T> {
        final HandlerHolder<T> holder;
        final boolean newAddress;

        LocalRegistrationResult(HandlerHolder<T> holder, boolean newAddress) {
            this.holder = holder;
            this.newAddress = newAddress;
        }
    }
}

