/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.OutboundDeliveryContext;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.PromiseInternal;
import io.vertx.core.impl.VertxInternal;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.UUID;

public class MessageProducerImpl<T>
implements MessageProducer<T> {
    public static final String CREDIT_ADDRESS_HEADER_NAME = "__vertx.credit";
    private final Vertx vertx;
    private final EventBusImpl bus;
    private final boolean send;
    private final String address;
    private final Queue<OutboundDeliveryContext<T>> pending = new ArrayDeque<OutboundDeliveryContext<T>>();
    private final MessageConsumer<Integer> creditConsumer;
    private DeliveryOptions options;
    private int maxSize = 1000;
    private int credits = 1000;
    private Handler<Void> drainHandler;

    public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
        this.vertx = vertx;
        this.bus = (EventBusImpl)vertx.eventBus();
        this.address = address;
        this.send = send;
        this.options = options;
        if (send) {
            String creditAddress = UUID.randomUUID().toString() + "-credit";
            this.creditConsumer = this.bus.consumer(creditAddress, msg -> this.doReceiveCredit((Integer)msg.body()));
            options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
        } else {
            this.creditConsumer = null;
        }
    }

    @Override
    public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions options) {
        if (this.creditConsumer != null) {
            options = new DeliveryOptions(options);
            options.addHeader(CREDIT_ADDRESS_HEADER_NAME, this.options.getHeaders().get(CREDIT_ADDRESS_HEADER_NAME));
        }
        this.options = options;
        return this;
    }

    @Override
    public MessageProducer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    @Override
    public synchronized MessageProducer<T> setWriteQueueMaxSize(int s) {
        int delta = s - this.maxSize;
        this.maxSize = s;
        this.credits += delta;
        return this;
    }

    @Override
    public synchronized Future<Void> write(T data) {
        PromiseInternal<Void> promise = ((VertxInternal)this.vertx).getOrCreateContext().promise();
        this.write(data, (Promise<Void>)promise);
        return promise.future();
    }

    @Override
    public void write(T data, Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = null;
        if (handler != null) {
            promise = ((VertxInternal)this.vertx).getOrCreateContext().promise();
            promise.future().setHandler(handler);
        }
        this.write(data, promise);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void write(T data, Promise<Void> handler) {
        MessageImpl msg = this.bus.createMessage(this.send, this.address, this.options.getHeaders(), data, this.options.getCodecName());
        OutboundDeliveryContext sendCtx = this.bus.newSendContext(msg, this.options, null, handler);
        if (this.send) {
            MessageProducerImpl messageProducerImpl = this;
            synchronized (messageProducerImpl) {
                if (this.credits > 0) {
                    --this.credits;
                } else {
                    this.pending.add(sendCtx);
                    return;
                }
            }
        }
        this.bus.sendOrPubInternal(msg, this.options, null, handler);
    }

    @Override
    public synchronized boolean writeQueueFull() {
        return this.credits == 0;
    }

    @Override
    public synchronized MessageProducer<T> drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        if (handler != null) {
            this.checkDrained();
        }
        return this;
    }

    private void checkDrained() {
        Handler<Void> handler = this.drainHandler;
        if (handler != null && this.credits >= this.maxSize / 2) {
            this.drainHandler = null;
            this.vertx.runOnContext(v -> handler.handle(null));
        }
    }

    @Override
    public String address() {
        return this.address;
    }

    @Override
    public Future<Void> end() {
        return this.close();
    }

    @Override
    public void end(Handler<AsyncResult<Void>> handler) {
        this.close(null);
    }

    @Override
    public Future<Void> close() {
        if (this.creditConsumer != null) {
            return this.creditConsumer.unregister();
        }
        return ((ContextInternal)this.vertx.getOrCreateContext()).succeededFuture();
    }

    @Override
    public void close(Handler<AsyncResult<Void>> handler) {
        Future<Void> fut = this.close();
        if (handler != null) {
            fut.setHandler(handler);
        }
    }

    protected void finalize() throws Throwable {
        this.close();
        super.finalize();
    }

    private synchronized void doReceiveCredit(int credit) {
        OutboundDeliveryContext<T> sendContext;
        this.credits += credit;
        while (this.credits > 0 && (sendContext = this.pending.poll()) != null) {
            --this.credits;
            this.bus.sendOrPubInternal(sendContext);
        }
        this.checkDrained();
    }
}

