/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.processors.DirectProcessor;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.SerializedSubscriber;
import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public final class MultiRetryWhenOp<T>
extends AbstractMultiOperator<T, T> {
    final Function<? super Multi<Throwable>, ? extends Publisher<?>> whenStreamFactory;

    public MultiRetryWhenOp(Multi<? extends T> upstream, Function<? super Multi<Throwable>, ? extends Publisher<?>> whenStreamFactory) {
        super(upstream);
        this.whenStreamFactory = ParameterValidation.nonNull(whenStreamFactory, "whenStreamFactory");
    }

    static <T> void subscribe(MultiSubscriber<? super T> downstream, Function<? super Multi<Throwable>, ? extends Publisher<?>> whenStreamFactory, Multi<? extends T> upstream) {
        Publisher<?> publisher;
        RetryWhenOtherSubscriber other = new RetryWhenOtherSubscriber();
        SerializedSubscriber<Throwable> signaller = new SerializedSubscriber<Throwable>((Subscriber<Throwable>)other.processor);
        signaller.onSubscribe(Subscriptions.empty());
        SerializedSubscriber<? super T> serialized = new SerializedSubscriber<T>(downstream);
        RetryWhenMainSubscriber<T> main = new RetryWhenMainSubscriber<T>(upstream, serialized, signaller);
        other.main = main;
        serialized.onSubscribe(main);
        try {
            publisher = whenStreamFactory.apply(other);
            if (publisher == null) {
                throw new NullPointerException("The stream factory returned `null`");
            }
        }
        catch (Throwable e) {
            downstream.onError(e);
            return;
        }
        publisher.subscribe((Subscriber)other);
        if (!main.isCancelled()) {
            upstream.subscribe(main);
        }
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> downstream) {
        MultiRetryWhenOp.subscribe(downstream, this.whenStreamFactory, this.upstream);
    }

    static final class RetryWhenOtherSubscriber
    extends AbstractMulti<Throwable>
    implements Multi<Throwable>,
    Subscriber<Object> {
        RetryWhenMainSubscriber<?> main;
        final DirectProcessor<Throwable> processor = new DirectProcessor();

        RetryWhenOtherSubscriber() {
        }

        public void onSubscribe(Subscription s) {
            this.main.setWhen(s);
        }

        public void onNext(Object t) {
            this.main.resubscribe();
        }

        public void onError(Throwable t) {
            this.main.whenFailure(t);
        }

        public void onComplete() {
            this.main.whenComplete();
        }

        @Override
        public void subscribe(Subscriber<? super Throwable> actual) {
            this.processor.subscribe(actual);
        }
    }

    static final class RetryWhenMainSubscriber<T>
    extends SwitchableSubscriptionSubscriber<T> {
        private final Publisher<? extends T> upstream;
        private final AtomicInteger wip = new AtomicInteger();
        private final Subscriber<Throwable> signaller;
        private final Subscriptions.DeferredSubscription arbiter = new Subscriptions.DeferredSubscription();
        long produced;

        RetryWhenMainSubscriber(Publisher<? extends T> upstream, MultiSubscriber<? super T> downstream, Subscriber<Throwable> signaller) {
            super(downstream);
            this.upstream = upstream;
            this.signaller = signaller;
        }

        @Override
        public void cancel() {
            if (!this.isCancelled()) {
                this.arbiter.cancel();
                super.cancel();
            }
        }

        public void setWhen(Subscription w) {
            this.arbiter.set(w);
        }

        @Override
        public void onItem(T t) {
            this.downstream.onItem(t);
            ++this.produced;
        }

        @Override
        public void onFailure(Throwable t) {
            long p = this.produced;
            if (p != 0L) {
                this.produced = 0L;
                this.emitted(p);
            }
            this.arbiter.request(1L);
            this.signaller.onNext((Object)t);
        }

        @Override
        public void onCompletion() {
            this.arbiter.cancel();
            this.downstream.onComplete();
        }

        void resubscribe() {
            if (this.wip.getAndIncrement() == 0) {
                do {
                    if (this.isCancelled()) {
                        return;
                    }
                    this.upstream.subscribe((Subscriber)this);
                } while (this.wip.decrementAndGet() != 0);
            }
        }

        void whenFailure(Throwable failure) {
            super.cancel();
            this.downstream.onFailure(failure);
        }

        void whenComplete() {
            super.cancel();
            this.downstream.onComplete();
        }
    }
}

