/*
 * Decompiled with CFR 0.152.
 */
package rx.util.async.operators;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import rx.Observable;
import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.util.async.operators.Functionals;
import rx.util.async.operators.LatchedObserver;

public final class OperatorForEachFuture {
    private OperatorForEachFuture() {
        throw new IllegalStateException("No instances!");
    }

    public static <T> FutureTask<Void> forEachFuture(Observable<? extends T> source, Action1<? super T> onNext) {
        return OperatorForEachFuture.forEachFuture(source, onNext, Functionals.emptyThrowable(), Functionals.empty());
    }

    public static <T> FutureTask<Void> forEachFuture(Observable<? extends T> source, Action1<? super T> onNext, Action1<? super Throwable> onError) {
        return OperatorForEachFuture.forEachFuture(source, onNext, onError, Functionals.empty());
    }

    public static <T> FutureTask<Void> forEachFuture(Observable<? extends T> source, Action1<? super T> onNext, Action1<? super Throwable> onError, Action0 onCompleted) {
        LatchedObserver<? super T> lo = LatchedObserver.create(onNext, onError, onCompleted);
        Subscription s = source.subscribe(lo);
        FutureTaskCancel<Void> task = new FutureTaskCancel<Void>(s, new RunAwait<T>(lo));
        return task;
    }

    private static final class RunAwait<T>
    implements Callable<Void> {
        final LatchedObserver<T> observer;

        public RunAwait(LatchedObserver<T> observer) {
            this.observer = observer;
        }

        @Override
        public Void call() throws Exception {
            this.observer.await();
            Throwable t = this.observer.getThrowable();
            if (t != null) {
                throw Exceptions.propagate((Throwable)t);
            }
            return null;
        }
    }

    private static final class FutureTaskCancel<T>
    extends FutureTask<T> {
        final Subscription cancel;

        public FutureTaskCancel(Subscription cancel, Callable<T> callable) {
            super(callable);
            this.cancel = cancel;
        }

        public FutureTaskCancel(Subscription cancel, Runnable runnable, T result) {
            super(runnable, result);
            this.cancel = cancel;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            this.cancel.unsubscribe();
            return super.cancel(mayInterruptIfRunning);
        }
    }
}

