/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.error;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

public class RetryWhenAction<T>
extends Action<T, T> {
    private final Broadcaster<Throwable> retryStream;
    private final Publisher<? extends T> rootPublisher;
    private Dispatcher dispatcher;

    public RetryWhenAction(Dispatcher dispatcher, Function<? super Stream<? extends Throwable>, ? extends Publisher<?>> predicate, Publisher<? extends T> rootPublisher) {
        this.retryStream = Broadcaster.create(null, dispatcher);
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? Environment.tailRecurse() : dispatcher;
        this.rootPublisher = rootPublisher;
        Publisher<?> afterRetryPublisher = predicate.apply(this.retryStream);
        afterRetryPublisher.subscribe(new RestartSubscriber());
    }

    @Override
    protected void doNext(T ev) {
        this.broadcastNext(ev);
    }

    @Override
    protected void doComplete() {
        this.retryStream.onComplete();
        super.doComplete();
    }

    @Override
    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected void doRetry() {
        this.dispatcher.dispatch(null, new Consumer<Void>(){

            @Override
            public void accept(Void o) {
                long pendingRequests = Long.MAX_VALUE;
                if (RetryWhenAction.this.rootPublisher != null) {
                    PushSubscription upstream = RetryWhenAction.this.upstreamSubscription;
                    if (upstream == null) {
                        RetryWhenAction.this.rootPublisher.subscribe(RetryWhenAction.this);
                        upstream = RetryWhenAction.this.upstreamSubscription;
                    } else {
                        pendingRequests = upstream.pendingRequestSignals();
                        if (TailRecurseDispatcher.class.isAssignableFrom(RetryWhenAction.this.dispatcher.getClass())) {
                            RetryWhenAction.this.dispatcher.shutdown();
                            RetryWhenAction.this.dispatcher = Environment.tailRecurse();
                        }
                    }
                    if (upstream != null) {
                        upstream.request(pendingRequests != Long.MAX_VALUE ? pendingRequests + 1L : pendingRequests);
                    }
                }
            }
        }, null);
    }

    @Override
    public void onError(Throwable cause) {
        this.cancel();
        this.retryStream.onNext(cause);
    }

    public Broadcaster<Throwable> retryStream() {
        return this.retryStream;
    }

    private class RestartSubscriber
    implements Subscriber<Object>,
    NonBlocking {
        Subscription s;

        private RestartSubscriber() {
        }

        @Override
        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return RetryWhenAction.this.isReactivePull(dispatcher, producerCapacity);
        }

        @Override
        public long getCapacity() {
            return RetryWhenAction.this.capacity;
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(1L);
        }

        @Override
        public void onNext(Object o) {
            RetryWhenAction.this.doRetry();
            if (this.s != null) {
                this.s.request(1L);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (this.s != null) {
                this.s.cancel();
            }
            RetryWhenAction.this.onError(t);
        }

        @Override
        public void onComplete() {
            RetryWhenAction.this.onComplete();
        }
    }
}

