/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryContext;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.BodyReadStream;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.MessageTagExtractor;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.Arguments;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;

public class HandlerRegistration<T>
implements MessageConsumer<T>,
Handler<Message<T>> {
    private static final Logger log = LoggerFactory.getLogger(HandlerRegistration.class);
    public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final Vertx vertx;
    private final EventBusMetrics metrics;
    private final EventBusImpl eventBus;
    final String address;
    final String repliedAddress;
    private final boolean localOnly;
    protected final boolean src;
    private HandlerHolder<T> registered;
    private Handler<Message<T>> handler;
    private ContextInternal handlerContext;
    private AsyncResult<Void> result;
    private Handler<AsyncResult<Void>> completionHandler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages = 1000;
    private final Queue<Message<T>> pending = new ArrayDeque<Message<T>>(8);
    private long demand = Long.MAX_VALUE;
    private Object metric;

    public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address, String repliedAddress, boolean localOnly, boolean src) {
        this.vertx = vertx;
        this.metrics = metrics;
        this.eventBus = eventBus;
        this.address = address;
        this.repliedAddress = repliedAddress;
        this.localOnly = localOnly;
        this.src = src;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
        ArrayList<Message<T>> discarded;
        Handler<Message<T>> discardHandler;
        Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            this.maxBufferedMessages = maxBufferedMessages;
            int n = this.pending.size() - maxBufferedMessages;
            if (n <= 0) {
                return this;
            }
            discardHandler = this.discardHandler;
            if (discardHandler == null) {
                while (this.pending.size() > maxBufferedMessages) {
                    this.pending.poll();
                }
                return this;
            }
            discarded = new ArrayList<Message<T>>(n);
            while (this.pending.size() > maxBufferedMessages) {
                discarded.add(this.pending.poll());
            }
        }
        for (Message message : discarded) {
            discardHandler.handle(message);
        }
        return this;
    }

    @Override
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override
    public String address() {
        return this.address;
    }

    @Override
    public synchronized void completionHandler(Handler<AsyncResult<Void>> completionHandler) {
        Objects.requireNonNull(completionHandler);
        if (this.result != null) {
            AsyncResult<Void> value = this.result;
            this.vertx.runOnContext(v -> completionHandler.handle(value));
        } else {
            this.completionHandler = completionHandler;
        }
    }

    @Override
    public void unregister() {
        this.doUnregister(null);
    }

    @Override
    public void unregister(Handler<AsyncResult<Void>> completionHandler) {
        this.doUnregister(completionHandler);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doUnregister(Handler<AsyncResult<Void>> doneHandler) {
        Handler<Message<T>> discardHandler;
        ArrayDeque<Message<T>> discarded;
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            if (this.endHandler != null) {
                Handler<Void> theEndHandler = this.endHandler;
                Handler<AsyncResult<Void>> handler = doneHandler;
                doneHandler = ar -> {
                    theEndHandler.handle(null);
                    if (handler != null) {
                        handler.handle((AsyncResult<Void>)ar);
                    }
                };
            }
            HandlerHolder<T> holder = this.registered;
            if (this.pending.size() > 0) {
                discarded = new ArrayDeque<Message<T>>(this.pending);
                this.pending.clear();
            } else {
                discarded = null;
            }
            discardHandler = this.discardHandler;
            if (holder != null) {
                this.handler = null;
                this.registered = null;
                this.eventBus.removeRegistration(holder, doneHandler);
            } else {
                this.callHandlerAsync(Future.succeededFuture(), doneHandler);
            }
            if (this.result == null) {
                this.result = Future.failedFuture("Consumer unregistered before registration completed");
                this.callHandlerAsync(this.result, this.completionHandler);
            } else {
                EventBusMetrics metrics = this.eventBus.metrics;
                if (metrics != null) {
                    metrics.handlerUnregistered(this.metric);
                }
            }
        }
        if (discardHandler != null && discarded != null) {
            Message msg;
            while ((msg = (Message)discarded.poll()) != null) {
                discardHandler.handle(msg);
            }
        }
    }

    private void callHandlerAsync(AsyncResult<Void> result, Handler<AsyncResult<Void>> completionHandler) {
        if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(result));
        }
    }

    synchronized void setHandlerContext(Context context) {
        this.handlerContext = (ContextInternal)context;
    }

    public synchronized void setResult(AsyncResult<Void> result) {
        if (this.result != null) {
            return;
        }
        this.result = result;
        if (result.failed()) {
            log.error("Failed to propagate registration for handler " + this.handler + " and address " + this.address);
        } else {
            if (this.metrics != null) {
                this.metric = this.metrics.handlerRegistered(this.address, this.repliedAddress);
            }
            this.callHandlerAsync(result, this.completionHandler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handle(Message<T> message) {
        ContextInternal ctx;
        Handler<Message<T>> theHandler;
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            if (this.registered == null) {
                return;
            }
            if (this.demand == 0L) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                } else if (this.discardHandler != null) {
                    this.discardHandler.handle(message);
                } else {
                    log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address);
                }
                return;
            }
            if (this.pending.size() > 0) {
                this.pending.add(message);
                message = this.pending.poll();
            }
            if (this.demand != Long.MAX_VALUE) {
                --this.demand;
            }
            theHandler = this.handler;
            ctx = this.handlerContext;
        }
        this.deliver(theHandler, message, ctx);
    }

    private void deliver(Handler<Message<T>> theHandler, Message<T> message, ContextInternal context) {
        String creditsAddress = message.headers().get("__vertx.credit");
        if (creditsAddress != null) {
            this.eventBus.send(creditsAddress, 1);
        }
        InboundDeliveryContext deliveryCtx = new InboundDeliveryContext((MessageImpl)message, theHandler, context);
        deliveryCtx.context.dispatch(v -> deliveryCtx.next());
        this.checkNextTick();
    }

    ContextInternal handlerContext() {
        return this.handlerContext;
    }

    private synchronized void checkNextTick() {
        if (!this.pending.isEmpty() && this.demand > 0L) {
            this.handlerContext.runOnContext(v -> {
                ContextInternal ctx;
                Handler<Message<T>> theHandler;
                Message<T> message;
                HandlerRegistration handlerRegistration = this;
                synchronized (handlerRegistration) {
                    if (this.demand == 0L || (message = this.pending.poll()) == null) {
                        return;
                    }
                    if (this.demand != Long.MAX_VALUE) {
                        --this.demand;
                    }
                    theHandler = this.handler;
                    ctx = this.handlerContext;
                }
                this.deliver(theHandler, message, ctx);
            });
        }
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
        if (h != null) {
            HandlerRegistration handlerRegistration = this;
            synchronized (handlerRegistration) {
                this.handler = h;
                if (this.registered == null) {
                    this.registered = this.eventBus.addRegistration(this.address, this, this.repliedAddress != null, this.localOnly);
                }
            }
            return this;
        }
        this.unregister();
        return this;
    }

    @Override
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override
    public synchronized boolean isRegistered() {
        return this.registered != null;
    }

    @Override
    public synchronized MessageConsumer<T> pause() {
        this.demand = 0L;
        return this;
    }

    @Override
    public MessageConsumer<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized MessageConsumer<T> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException();
        }
        this.demand += amount;
        if (this.demand < 0L) {
            this.demand = Long.MAX_VALUE;
        }
        if (this.demand > 0L) {
            this.checkNextTick();
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
        if (endHandler != null) {
            Context endCtx = this.vertx.getOrCreateContext();
            this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public Handler<Message<T>> getHandler() {
        return this.handler;
    }

    public Object getMetric() {
        return this.metric;
    }

    protected class InboundDeliveryContext
    implements DeliveryContext<T> {
        private final MessageImpl<?, T> message;
        private final Iterator<Handler<DeliveryContext>> iter;
        private final Handler<Message<T>> handler;
        private final ContextInternal context;

        private InboundDeliveryContext(MessageImpl<?, T> message, Handler<Message<T>> handler, ContextInternal context) {
            this.message = message;
            this.handler = handler;
            this.iter = HandlerRegistration.this.eventBus.receiveInterceptors();
            this.context = message.src ? context : context.duplicate();
        }

        @Override
        public Message<T> message() {
            return this.message;
        }

        @Override
        public void next() {
            block13: {
                if (this.iter.hasNext()) {
                    try {
                        Handler<DeliveryContext> handler = this.iter.next();
                        if (handler != null) {
                            handler.handle(this);
                            break block13;
                        }
                        this.next();
                    }
                    catch (Throwable t) {
                        log.error("Failure in interceptor", t);
                    }
                } else {
                    ClusteredMessage cmsg;
                    boolean local = true;
                    if (this.message instanceof ClusteredMessage && (cmsg = (ClusteredMessage)this.message).isFromWire()) {
                        local = false;
                    }
                    try {
                        VertxTracer tracer;
                        if (HandlerRegistration.this.metrics != null) {
                            HandlerRegistration.this.metrics.beginHandleMessage(HandlerRegistration.this.metric, local);
                        }
                        if ((tracer = HandlerRegistration.this.handlerContext.tracer()) != null && !HandlerRegistration.this.src) {
                            Object trace = tracer.receiveRequest(this.context, this.message, this.message.isSend() ? "send" : "publish", this.message.headers, MessageTagExtractor.INSTANCE);
                            this.handler.handle(this.message);
                            tracer.sendResponse(this.context, null, trace, null, TagExtractor.empty());
                        } else {
                            this.handler.handle(this.message);
                        }
                        if (HandlerRegistration.this.metrics != null) {
                            HandlerRegistration.this.metrics.endHandleMessage(HandlerRegistration.this.metric, null);
                        }
                    }
                    catch (Exception e) {
                        log.error("Failed to handleMessage. address: " + this.message.address(), e);
                        if (HandlerRegistration.this.metrics != null) {
                            HandlerRegistration.this.metrics.endHandleMessage(HandlerRegistration.this.metric, e);
                        }
                        this.context.reportException(e);
                    }
                }
            }
        }

        @Override
        public boolean send() {
            return this.message.isSend();
        }

        @Override
        public Object body() {
            return this.message.receivedBody;
        }
    }
}

