/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client;

import com.mongodb.MongoException;
import com.mongodb.assertions.Assertions;
import com.mongodb.diagnostics.logging.Logger;
import com.mongodb.diagnostics.logging.Loggers;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

abstract class SubscriptionSupport<TResult>
implements Subscription {
    private static final Logger LOGGER = Loggers.getLogger((String)"reactivestreams");
    private static final int DEFAULT_BATCHSIZE = 1024;
    private final int batchSize;
    private final Subscriber<? super TResult> subscriber;
    private volatile boolean started = false;
    private boolean completed = false;
    private boolean cancelled = false;
    private long demand = 0L;
    private final ConcurrentLinkedQueue<Signal> inboundSignals = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<TResult> resultsQueue = new ConcurrentLinkedQueue();
    private final AtomicBoolean on = new AtomicBoolean(false);

    public SubscriptionSupport(Subscriber<? super TResult> subscriber) {
        this(subscriber, 1024);
    }

    SubscriptionSupport(Subscriber<? super TResult> subscriber, int batchSize) {
        Assertions.isTrueArgument((String)"batchSize must be greater than zero!", (batchSize > 0 ? 1 : 0) != 0);
        if (subscriber == null) {
            throw new NullPointerException("Subscriber cannot be null");
        }
        this.subscriber = subscriber;
        this.batchSize = batchSize;
        this.log("constructor");
    }

    private void handleRequest(long n) {
        if (n < 1L) {
            this.terminateDueTo(new IllegalArgumentException(this.subscriber + " violated the Reactive Streams rule 3.9 by " + "requesting a non-positive number of elements."));
        } else if (this.demand + n < 1L) {
            this.demand = Long.MAX_VALUE;
            this.handleSend();
        } else {
            this.demand += n;
            this.handleSend();
        }
    }

    protected void handleCancel() {
        this.cancelled = true;
    }

    private void handleSend() {
        try {
            if (this.resultsQueue.peek() != null) {
                boolean finished;
                do {
                    this.subscriber.onNext(this.resultsQueue.poll());
                    boolean bl = finished = this.resultsQueue.peek() == null;
                } while (!finished && !this.cancelled);
            }
            if (this.completed) {
                this.handleCancel();
                this.subscriber.onComplete();
            } else if (this.started && !this.cancelled && this.demand > 0L) {
                long requestAmount = this.demand > (long)this.batchSize ? (long)this.batchSize : this.demand;
                this.demand -= requestAmount;
                this.doRequest(requestAmount);
            }
        }
        catch (MongoException t) {
            this.handleCancel();
            LOGGER.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext" + " or onComplete.", (Throwable)t);
        }
    }

    private void terminateDueTo(Throwable t) {
        this.cancelled = true;
        this.log("terminated: " + t);
        try {
            this.subscriber.onError(t);
        }
        catch (Throwable t2) {
            LOGGER.error(this.subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception" + " from onError.", t2);
        }
    }

    private void signal(Signal signal) {
        if (this.inboundSignals.offer(signal) && this.started) {
            this.next();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    final void next() {
        if (this.on.compareAndSet(false, true)) {
            try {
                while (!this.inboundSignals.isEmpty()) {
                    Signal s = this.inboundSignals.poll();
                    if (this.cancelled) continue;
                    if (s instanceof Request) {
                        this.handleRequest(((Request)s).n);
                        continue;
                    }
                    if (s == Send.Instance) {
                        this.handleSend();
                        continue;
                    }
                    if (s != Cancel.Instance) continue;
                    this.handleCancel();
                }
            }
            finally {
                this.on.set(false);
            }
            if (!this.cancelled && !this.inboundSignals.isEmpty()) {
                this.next();
            }
        }
    }

    public void onNext(TResult element) {
        if (element == null) {
            throw new NullPointerException("onNext called with a null value");
        }
        if (!this.cancelled) {
            this.log("onNext - queued");
            this.resultsQueue.add(element);
            this.signal(Send.Instance);
        } else {
            this.log("onNext - canceled");
        }
    }

    public void onError(Throwable t) {
        if (t == null) {
            throw new NullPointerException("onError called with a null value");
        }
        this.log("onError");
        this.terminateDueTo(t);
    }

    public void onComplete() {
        this.log("onComplete");
        this.completed = true;
        this.signal(Send.Instance);
    }

    public void request(long n) {
        this.signal(new Request(n));
    }

    public void cancel() {
        this.signal(Cancel.Instance);
    }

    void start() {
        try {
            this.log("Subscribing to subscriber");
            this.subscriber.onSubscribe((Subscription)this);
        }
        catch (Throwable t) {
            this.terminateDueTo(new IllegalStateException(this.subscriber + " violated the Reactive Streams rule 2.13 by " + "throwing an exception from onSubscribe.", t));
        }
        this.started = true;
        this.next();
    }

    protected abstract void doRequest(long var1);

    protected String getName() {
        return this.getClass().getSimpleName();
    }

    protected void log(String msg) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(this.getName() + ": " + msg);
        }
    }

    static final class Request
    implements Signal {
        private final long n;

        Request(long n) {
            this.n = n;
        }
    }

    static enum Send implements Signal
    {
        Instance;

    }

    static enum Cancel implements Signal
    {
        Instance;

    }

    static interface Signal {
    }
}

