/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.BodyReadStream;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.impl.Arguments;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Queue;

public class MessageConsumerImpl<T>
extends HandlerRegistration<T>
implements MessageConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);
    private static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final boolean localOnly;
    private Handler<Message<T>> handler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages = 1000;
    private Queue<Message<T>> pending = new ArrayDeque<Message<T>>(8);
    private long demand = Long.MAX_VALUE;
    private Promise<Void> result;
    private boolean registered;

    MessageConsumerImpl(ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly) {
        super(context, eventBus, address, false);
        this.localOnly = localOnly;
        this.result = context.promise();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
        ArrayList<Message<T>> discarded;
        Handler<Message<T>> discardHandler;
        Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            this.maxBufferedMessages = maxBufferedMessages;
            int n = this.pending.size() - maxBufferedMessages;
            if (n <= 0) {
                return this;
            }
            if (this.pending.isEmpty()) {
                return this;
            }
            discardHandler = this.discardHandler;
            discarded = new ArrayList<Message<T>>(n);
            while (this.pending.size() > maxBufferedMessages) {
                discarded.add(this.pending.poll());
            }
        }
        for (Message message : discarded) {
            if (discardHandler != null) {
                discardHandler.handle(message);
            }
            this.discard(message);
        }
        return this;
    }

    @Override
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override
    public synchronized Future<Void> completion() {
        return this.result.future();
    }

    @Override
    public synchronized Future<Void> unregister() {
        this.handler = null;
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
        if (this.pending.size() > 0) {
            Queue<Message<T>> discarded = this.pending;
            Handler<Message<T>> handler = this.discardHandler;
            this.pending = new ArrayDeque<Message<T>>(8);
            for (Message message : discarded) {
                this.discard(message);
                if (handler == null) continue;
                this.context.emit(message, handler);
            }
        }
        this.discardHandler = null;
        Future<Void> fut = super.unregister();
        if (this.registered) {
            this.registered = false;
            Promise<Void> res = this.result;
            fut.onComplete(ar -> res.tryFail("Consumer unregistered before registration completed"));
            this.result = this.context.promise();
        }
        return fut;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected boolean doReceive(Message<T> message) {
        Handler<Message<T>> theHandler;
        MessageConsumerImpl messageConsumerImpl = this;
        synchronized (messageConsumerImpl) {
            if (this.handler == null) {
                return false;
            }
            if (this.demand == 0L) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                    return true;
                }
                this.discard(message);
                if (this.discardHandler != null) {
                    this.discardHandler.handle(message);
                } else {
                    log.warn((Object)("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address));
                }
                return true;
            }
            if (this.pending.size() > 0) {
                this.pending.add(message);
                message = this.pending.poll();
            }
            if (this.demand != Long.MAX_VALUE) {
                --this.demand;
            }
            theHandler = this.handler;
        }
        this.deliver(theHandler, message);
        return true;
    }

    @Override
    protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
        if (handler == null) {
            throw new NullPointerException();
        }
        context.dispatch(msg, handler);
    }

    private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
        this.dispatch(theHandler, message, this.context.duplicate());
        this.checkNextTick();
    }

    private synchronized void checkNextTick() {
        if (!this.pending.isEmpty() && this.demand > 0L) {
            this.context.nettyEventLoop().execute(() -> {
                Handler<Message<T>> theHandler;
                Message<T> message;
                MessageConsumerImpl messageConsumerImpl = this;
                synchronized (messageConsumerImpl) {
                    if (this.demand == 0L || (message = this.pending.poll()) == null) {
                        return;
                    }
                    if (this.demand != Long.MAX_VALUE) {
                        --this.demand;
                    }
                    theHandler = this.handler;
                }
                this.deliver(theHandler, message);
            });
        }
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
        if (h != null) {
            MessageConsumerImpl messageConsumerImpl = this;
            synchronized (messageConsumerImpl) {
                this.handler = h;
                if (!this.registered) {
                    this.registered = true;
                    Promise<Void> p = this.result;
                    PromiseInternal<Void> registration = this.context.promise();
                    this.register(true, this.localOnly, registration);
                    registration.future().onComplete(ar -> {
                        if (ar.succeeded()) {
                            p.tryComplete();
                        } else {
                            p.tryFail(ar.cause());
                        }
                    });
                }
            }
        } else {
            this.unregister();
        }
        return this;
    }

    @Override
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override
    public synchronized MessageConsumer<T> pause() {
        this.demand = 0L;
        return this;
    }

    @Override
    public MessageConsumer<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized MessageConsumer<T> fetch(long amount) {
        if (amount < 0L) {
            throw new IllegalArgumentException();
        }
        this.demand += amount;
        if (this.demand < 0L) {
            this.demand = Long.MAX_VALUE;
        }
        if (this.demand > 0L) {
            this.checkNextTick();
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
        if (endHandler != null) {
            ContextInternal endCtx = this.context.owner().getOrCreateContext();
            this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public synchronized Handler<Message<T>> getHandler() {
        return this.handler;
    }
}

