/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.control;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.dispatch.TailRecurseDispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;

public class RepeatWhenAction<T>
extends Action<T, T> {
    private final Broadcaster<Long> retryStream;
    private final Publisher<? extends T> rootPublisher;
    private Dispatcher dispatcher;
    private long pendingRequests = 0L;

    public RepeatWhenAction(Dispatcher dispatcher, Function<? super Stream<? extends Long>, ? extends Publisher<?>> predicate, Publisher<? extends T> rootPublisher) {
        this.retryStream = Broadcaster.create(null, dispatcher);
        this.dispatcher = SynchronousDispatcher.INSTANCE == dispatcher ? Environment.tailRecurse() : dispatcher;
        this.rootPublisher = rootPublisher;
        Publisher<?> afterRetryPublisher = predicate.apply(this.retryStream);
        afterRetryPublisher.subscribe(new RestartSubscriber());
    }

    @Override
    protected void doNext(T ev) {
        this.broadcastNext(ev);
    }

    @Override
    protected void doComplete() {
        this.retryStream.onComplete();
        super.doComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestMore(long n) {
        RepeatWhenAction repeatWhenAction = this;
        synchronized (repeatWhenAction) {
            if ((this.pendingRequests += n) < 0L) {
                this.pendingRequests = Long.MAX_VALUE;
            }
        }
        super.requestMore(n);
    }

    @Override
    protected void doOnSubscribe(Subscription subscription) {
        long pendingRequests = this.pendingRequests;
        if (pendingRequests > 0L) {
            subscription.request(pendingRequests);
        }
    }

    protected void doRetry() {
        this.dispatcher.dispatch(null, new Consumer<Void>(){

            @Override
            public void accept(Void aVoid) {
                if (RepeatWhenAction.this.rootPublisher != null) {
                    if (TailRecurseDispatcher.class.isAssignableFrom(RepeatWhenAction.this.dispatcher.getClass())) {
                        RepeatWhenAction.this.dispatcher.shutdown();
                        RepeatWhenAction.this.dispatcher = Environment.tailRecurse();
                    }
                    RepeatWhenAction.this.rootPublisher.subscribe(RepeatWhenAction.this);
                }
            }
        }, null);
    }

    @Override
    public void onComplete() {
        try {
            this.cancel();
            this.retryStream.onNext(System.currentTimeMillis());
        }
        catch (Exception e) {
            this.doError(e);
        }
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    private class RestartSubscriber
    implements Subscriber<Object>,
    NonBlocking {
        Subscription s;

        private RestartSubscriber() {
        }

        @Override
        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return RepeatWhenAction.this.isReactivePull(dispatcher, producerCapacity);
        }

        @Override
        public long getCapacity() {
            return RepeatWhenAction.this.capacity;
        }

        @Override
        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(1L);
        }

        @Override
        public void onNext(Object o) {
            RepeatWhenAction.this.doRetry();
            this.s.request(1L);
        }

        @Override
        public void onError(Throwable t) {
            this.s.cancel();
            RepeatWhenAction.this.doError(t);
        }

        @Override
        public void onComplete() {
            this.s.cancel();
            RepeatWhenAction.this.doComplete();
        }
    }
}

