/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.processor.CancelException;
import reactor.core.queue.CompletableLinkedQueue;
import reactor.core.queue.CompletableQueue;
import reactor.core.support.Exceptions;
import reactor.core.support.NonBlocking;
import reactor.core.support.Recyclable;
import reactor.core.support.SpecificationExceptions;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.rx.Stream;
import reactor.rx.StreamUtils;
import reactor.rx.action.CompositeAction;
import reactor.rx.action.Control;
import reactor.rx.action.combination.FanInAction;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.DropSubscription;
import reactor.rx.subscription.FanOutSubscription;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

public abstract class Action<I, O>
extends Stream<O>
implements Processor<I, O>,
Consumer<I>,
Recyclable,
Control {
    public static final int RESERVED_SLOTS = 4;
    public static final int NO_CAPACITY = -1;
    protected PushSubscription<I> upstreamSubscription;
    protected PushSubscription<O> downstreamSubscription;
    protected long capacity;

    public static void checkRequest(long n) {
        if (n <= 0L) {
            throw SpecificationExceptions.spec_3_09_exception(n);
        }
    }

    public static long evaluateCapacity(long n) {
        return n != Long.MAX_VALUE ? Math.max(4L, n - 4L) : Long.MAX_VALUE;
    }

    public Action() {
        this(Long.MAX_VALUE);
    }

    public Action(long batchSize) {
        this.capacity = batchSize;
    }

    @Override
    public void subscribe(Subscriber<? super O> subscriber) {
        try {
            NonBlocking asyncSubscriber = NonBlocking.class.isAssignableFrom(subscriber.getClass()) ? (NonBlocking)((Object)subscriber) : null;
            boolean isReactiveCapacity = null == asyncSubscriber || asyncSubscriber.isReactivePull(this.getDispatcher(), this.capacity);
            PushSubscription<O> subscription = this.createSubscription(subscriber, isReactiveCapacity);
            if (subscription == null) {
                return;
            }
            if (null != asyncSubscriber && isReactiveCapacity) {
                subscription.maxCapacity(asyncSubscriber.getCapacity());
            }
            this.subscribeWithSubscription(subscriber, subscription);
        }
        catch (Throwable throwable) {
            Exceptions.throwIfFatal(throwable);
            subscriber.onError(throwable);
        }
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        boolean hasRequestTracker;
        if (subscription == null) {
            throw new NullPointerException("Spec 2.13: Subscription cannot be null");
        }
        boolean bl = hasRequestTracker = this.upstreamSubscription != null;
        if (hasRequestTracker) {
            subscription.cancel();
            return;
        }
        this.upstreamSubscription = this.createTrackingSubscription(subscription);
        this.upstreamSubscription.maxCapacity(this.getCapacity());
        try {
            this.doOnSubscribe(subscription);
            this.doStart();
        }
        catch (Throwable t) {
            Exceptions.throwIfFatal(t);
            this.doError(t);
        }
    }

    protected final void doStart() {
        PushSubscription<O> downSub = this.downstreamSubscription;
        if (downSub != null) {
            downSub.start();
        }
    }

    @Override
    public final void accept(I i) {
        this.onNext(i);
    }

    @Override
    public void onNext(I ev) {
        if (ev == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (this.upstreamSubscription == null && this.downstreamSubscription == null) {
            throw CancelException.get();
        }
        try {
            this.doNext(ev);
        }
        catch (CancelException uae) {
            throw uae;
        }
        catch (Throwable cause) {
            this.doError(Exceptions.addValueAsLastCause(cause, ev));
        }
    }

    @Override
    public void onComplete() {
        try {
            this.doComplete();
            this.doShutdown();
        }
        catch (Throwable t) {
            this.doError(t);
        }
    }

    @Override
    public void onError(Throwable cause) {
        if (cause == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.updatePendingRequests(0L);
        }
        this.doError(cause);
        this.doShutdown();
    }

    public Action<I, O> capacity(long elements) {
        long dispatcherCapacity;
        Dispatcher dispatcher = this.getDispatcher();
        this.capacity = dispatcher != SynchronousDispatcher.INSTANCE && dispatcher.getClass() != TailRecurseDispatcher.class ? (elements > (dispatcherCapacity = Action.evaluateCapacity(dispatcher.backlogSize())) ? dispatcherCapacity : elements) : elements;
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.maxCapacity(this.capacity);
        }
        return this;
    }

    protected void broadcastNext(O ev) {
        PushSubscription<O> downstreamSubscription = this.downstreamSubscription;
        if (downstreamSubscription == null) {
            throw CancelException.get();
        }
        try {
            downstreamSubscription.onNext(ev);
        }
        catch (CancelException ce) {
            throw ce;
        }
        catch (Throwable throwable) {
            this.doError(Exceptions.addValueAsLastCause(throwable, ev));
        }
    }

    protected void broadcastError(Throwable throwable) {
        if (this.downstreamSubscription == null) {
            if (Environment.alive()) {
                Environment.get().routeError(throwable);
            }
            return;
        }
        this.downstreamSubscription.onError(throwable);
    }

    protected void broadcastComplete() {
        if (this.downstreamSubscription == null) {
            return;
        }
        try {
            this.downstreamSubscription.onComplete();
        }
        catch (Throwable throwable) {
            this.doError(throwable);
        }
    }

    @Override
    public boolean isPublishing() {
        PushSubscription<I> parentSubscription = this.upstreamSubscription;
        return parentSubscription != null && !parentSubscription.isComplete();
    }

    @Override
    public void cancel() {
        PushSubscription<I> parentSub = this.upstreamSubscription;
        if (parentSub != null) {
            this.upstreamSubscription = null;
            parentSub.cancel();
        }
    }

    @Override
    public void requestAll() {
        if (this.downstreamSubscription == null) {
            this.requestMore(Long.MAX_VALUE);
        }
    }

    @Override
    public StreamUtils.StreamVisitor debug() {
        return StreamUtils.browse(this.findOldestUpstream(Action.class));
    }

    public final <E> Action<I, O> control(Stream<E> controlStream, final Consumer<Tuple2<Action<I, O>, ? super E>> controller) {
        final Action thiz = this;
        controlStream.consume(new Consumer<E>(){

            @Override
            public void accept(E e) {
                controller.accept(Tuple.of(thiz, e));
            }
        });
        return this;
    }

    @Override
    public final Stream<O> onOverflowBuffer(final Supplier<? extends CompletableQueue<O>> queueSupplier) {
        return this.lift(new Supplier<Action<O, O>>(){

            @Override
            public Action<O, O> get() {
                Action newStream = Broadcaster.create(Action.this.getEnvironment(), Action.this.getDispatcher()).capacity(Action.this.capacity);
                if (queueSupplier == null) {
                    Action.this.subscribeWithSubscription(newStream, new DropSubscription<O>(Action.this, newStream){

                        @Override
                        public void request(long elements) {
                            super.request(elements);
                            Action.this.requestUpstream(this.capacity, this.isComplete(), elements);
                        }
                    });
                } else {
                    Action.this.subscribeWithSubscription(newStream, Action.this.createSubscription(newStream, (CompletableQueue)queueSupplier.get()));
                }
                return newStream;
            }
        });
    }

    @Override
    public final <E> CompositeAction<E, O> combine() {
        Action subscriber = this.findOldestUpstream(Action.class);
        subscriber.upstreamSubscription = null;
        return new CompositeAction(subscriber, this);
    }

    public final Consumer<?> toBroadcastCompleteConsumer() {
        return new Consumer<Object>(){

            @Override
            public void accept(Object o) {
                Action.this.broadcastComplete();
            }
        };
    }

    public final Consumer<O> toBroadcastNextConsumer() {
        return new Consumer<O>(){

            @Override
            public void accept(O o) {
                Action.this.broadcastNext(o);
            }
        };
    }

    public final Consumer<Throwable> toBroadcastErrorConsumer() {
        return new Consumer<Throwable>(){

            @Override
            public void accept(Throwable o) {
                Action.this.broadcastError(o);
            }
        };
    }

    public <P extends Publisher<?>> P findOldestUpstream(Class<P> clazz) {
        Action that = this;
        while (this.inspectPublisher(that, Action.class)) {
            that = (Action)that.upstreamSubscription.getPublisher();
            if (that == null || !FanInAction.class.isAssignableFrom(that.getClass())) continue;
            that = ((FanInAction)that).dynamicMergeAction() != null ? ((FanInAction)that).dynamicMergeAction() : that;
        }
        if (this.inspectPublisher(that, clazz)) {
            return (P)that.upstreamSubscription.getPublisher();
        }
        return (P)that;
    }

    @Override
    public final long getCapacity() {
        return this.capacity;
    }

    public PushSubscription<I> getSubscription() {
        return this.upstreamSubscription;
    }

    @Override
    public final PushSubscription<O> downstreamSubscription() {
        return this.downstreamSubscription;
    }

    @Override
    public boolean cancelSubscription(PushSubscription<O> subscription) {
        FanOutSubscription fsub;
        if (this.downstreamSubscription == null) {
            return false;
        }
        if (subscription == this.downstreamSubscription) {
            this.downstreamSubscription = null;
            this.cancel();
            return true;
        }
        PushSubscription<O> dsub = this.downstreamSubscription;
        if (FanOutSubscription.class.isAssignableFrom(dsub.getClass()) && (fsub = (FanOutSubscription)this.downstreamSubscription).remove(subscription) && fsub.isEmpty()) {
            this.cancel();
            return true;
        }
        return false;
    }

    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, boolean reactivePull) {
        return this.createSubscription(subscriber, reactivePull ? new CompletableLinkedQueue() : null);
    }

    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, CompletableQueue<O> queue) {
        if (queue != null) {
            return new ReactiveSubscription<O>(this, subscriber, queue){

                @Override
                protected void onRequest(long elements) {
                    Action.this.requestUpstream(Action.this.capacity, this.buffer.isComplete(), elements);
                }
            };
        }
        return new PushSubscription<O>(this, subscriber){

            @Override
            protected void onRequest(long elements) {
                Action.this.requestUpstream(-1L, this.isComplete(), elements);
            }
        };
    }

    protected void requestUpstream(long capacity, boolean terminated, long elements) {
        if (this.upstreamSubscription != null && !terminated) {
            this.requestMore(elements);
        } else {
            PushSubscription<O> _downstreamSubscription = this.downstreamSubscription;
            if (_downstreamSubscription != null) {
                _downstreamSubscription.updatePendingRequests(elements);
            }
        }
    }

    protected PushSubscription<I> createTrackingSubscription(Subscription subscription) {
        if (!PushSubscription.class.isAssignableFrom(subscription.getClass())) {
            return PushSubscription.wrap(subscription, this);
        }
        return (PushSubscription)subscription;
    }

    protected void doOnSubscribe(Subscription subscription) {
    }

    protected void doComplete() {
        this.broadcastComplete();
    }

    protected abstract void doNext(I var1);

    protected void doError(Throwable ev) {
        if (this.downstreamSubscription != null) {
            try {
                this.downstreamSubscription.onError(ev);
                return;
            }
            catch (Throwable t) {
                Environment.get().routeError(t);
            }
        }
        if (Environment.alive()) {
            Environment.get().routeError(ev);
        }
    }

    @Override
    public void requestMore(long n) {
        Action.checkRequest(n);
        if (this.upstreamSubscription != null) {
            this.upstreamSubscription.request(n);
        }
    }

    protected void subscribeWithSubscription(Subscriber<? super O> subscriber, PushSubscription<O> subscription) {
        try {
            if (!this.addSubscription(subscription)) {
                subscriber.onError(new IllegalStateException("The subscription cannot be linked to this Stream"));
            } else {
                subscription.markAsDeferredStart();
                if (this.upstreamSubscription != null) {
                    subscription.start();
                }
            }
        }
        catch (Exception e) {
            Exceptions.throwIfFatal(e);
            subscriber.onError(e);
        }
    }

    protected boolean addSubscription(PushSubscription<O> subscription) {
        PushSubscription<O> currentSubscription = this.downstreamSubscription;
        if (currentSubscription == null) {
            this.downstreamSubscription = subscription;
            return true;
        }
        if (currentSubscription.equals(subscription)) {
            subscription.onError(SpecificationExceptions.spec_2_12_exception());
            return false;
        }
        if (FanOutSubscription.class.isAssignableFrom(currentSubscription.getClass())) {
            if (((FanOutSubscription)currentSubscription).contains(subscription)) {
                subscription.onError(SpecificationExceptions.spec_2_12_exception());
                return false;
            }
            return ((FanOutSubscription)currentSubscription).add(subscription);
        }
        this.downstreamSubscription = new FanOutSubscription<O>(this, currentSubscription, subscription);
        return true;
    }

    protected void doShutdown() {
    }

    private boolean inspectPublisher(Action<?, ?> that, Class<?> actionClass) {
        return that.upstreamSubscription != null && that.upstreamSubscription.getPublisher() != null && actionClass.isAssignableFrom(that.upstreamSubscription.getPublisher().getClass());
    }

    @Override
    public void recycle() {
        this.downstreamSubscription = null;
        this.upstreamSubscription = null;
    }

    @Override
    public String toString() {
        return "{" + (this.capacity != Long.MAX_VALUE || this.upstreamSubscription == null ? "{dispatcher=" + this.getDispatcher() + (!SynchronousDispatcher.class.isAssignableFrom(this.getDispatcher().getClass()) ? ":" + this.getDispatcher().remainingSlots() : "") + ", max-capacity=" + (this.capacity == Long.MAX_VALUE ? "infinite" : Long.valueOf(this.capacity)) + "}" : "") + (this.upstreamSubscription != null ? this.upstreamSubscription : "") + '}';
    }
}

