/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.reactivestreams.PublisherFactory;
import reactor.core.reactivestreams.SubscriberWithContext;
import reactor.fn.BiConsumer;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.Supplier;
import reactor.fn.timer.Timer;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.fn.tuple.Tuple3;
import reactor.fn.tuple.Tuple4;
import reactor.fn.tuple.Tuple5;
import reactor.fn.tuple.Tuple6;
import reactor.fn.tuple.Tuple7;
import reactor.fn.tuple.Tuple8;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.action.combination.CombineLatestAction;
import reactor.rx.action.combination.ConcatAction;
import reactor.rx.action.combination.DynamicMergeAction;
import reactor.rx.action.combination.MergeAction;
import reactor.rx.action.combination.SwitchAction;
import reactor.rx.action.combination.ZipAction;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.stream.DeferredStream;
import reactor.rx.stream.ErrorStream;
import reactor.rx.stream.FutureStream;
import reactor.rx.stream.IterableStream;
import reactor.rx.stream.PeriodicTimerStream;
import reactor.rx.stream.PublisherStream;
import reactor.rx.stream.RangeStream;
import reactor.rx.stream.SingleTimerStream;
import reactor.rx.stream.SingleValueStream;
import reactor.rx.stream.SupplierStream;

public class Streams {
    private static final Stream NEVER = new Stream(){
        final Subscription NEVER_SUBSCRIPTION = new Subscription(){

            @Override
            public void request(long l) {
            }

            @Override
            public void cancel() {
            }
        };

        @Override
        public void subscribe(Subscriber subscriber) {
            if (subscriber != null) {
                subscriber.onSubscribe(this.NEVER_SUBSCRIPTION);
            }
        }
    };

    protected Streams() {
    }

    public static <T> Stream<T> create(Publisher<T> publisher) {
        if (Stream.class.isAssignableFrom(publisher.getClass())) {
            return (Stream)publisher;
        }
        return new PublisherStream<T>(publisher);
    }

    public static <T> Stream<T> createWith(BiConsumer<Long, SubscriberWithContext<T, Void>> requestConsumer) {
        return Streams.createWith(requestConsumer, null, null);
    }

    public static <T, C> Stream<T> createWith(BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory) {
        return Streams.createWith(requestConsumer, contextFactory, null);
    }

    public static <T, C> Stream<T> createWith(BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory, Consumer<C> shutdownConsumer) {
        return Streams.wrap(PublisherFactory.create(requestConsumer, contextFactory, shutdownConsumer));
    }

    public static <T> Stream<T> wrap(final Publisher<T> publisher) {
        if (Stream.class.isAssignableFrom(publisher.getClass())) {
            return (Stream)publisher;
        }
        return new Stream<T>(){

            @Override
            public void subscribe(Subscriber<? super T> s) {
                publisher.subscribe(s);
            }
        };
    }

    public static <T> Stream<T> defer(Supplier<? extends Publisher<T>> supplier) {
        return new DeferredStream(supplier);
    }

    public static <T> Stream<T> empty() {
        return SingleValueStream.EMPTY;
    }

    public static <T> Stream<T> never() {
        return NEVER;
    }

    public static <O, T extends Throwable> Stream<O> fail(T throwable) {
        return new ErrorStream(throwable);
    }

    public static <T> Stream<T> from(Iterable<? extends T> values) {
        return IterableStream.create(values);
    }

    public static <T> Stream<T> from(T[] values) {
        return IterableStream.create(Arrays.asList(values));
    }

    public static <T> Stream<T> from(Future<? extends T> future) {
        return new FutureStream<T>(future);
    }

    public static <T> Stream<T> from(Future<? extends T> future, long time, TimeUnit unit) {
        return new FutureStream<T>(future, time, unit);
    }

    public static Stream<Long> range(long start, long end) {
        return RangeStream.create(start, end);
    }

    public static Stream<Long> timer(long delay) {
        return Streams.timer(Environment.timer(), delay, TimeUnit.SECONDS);
    }

    public static Stream<Long> timer(Timer timer, long delay) {
        return Streams.timer(timer, delay, TimeUnit.SECONDS);
    }

    public static Stream<Long> timer(long delay, TimeUnit unit) {
        return new SingleTimerStream(delay, unit, Environment.timer());
    }

    public static Stream<Long> timer(Timer timer, long delay, TimeUnit unit) {
        return new SingleTimerStream(delay, unit, timer);
    }

    public static Stream<Long> period(long period) {
        return Streams.period(Environment.timer(), -1L, period, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(Timer timer, long period) {
        return Streams.period(timer, -1L, period, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(long delay, long period) {
        return Streams.period(Environment.timer(), delay, period, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(Timer timer, long delay, long period) {
        return Streams.period(timer, delay, period, TimeUnit.SECONDS);
    }

    public static Stream<Long> period(long period, TimeUnit unit) {
        return Streams.period(Environment.timer(), -1L, period, unit);
    }

    public static Stream<Long> period(Timer timer, long period, TimeUnit unit) {
        return Streams.period(timer, -1L, period, unit);
    }

    public static Stream<Long> period(long delay, long period, TimeUnit unit) {
        return Streams.period(Environment.timer(), delay, period, unit);
    }

    public static Stream<Long> period(Timer timer, long delay, long period, TimeUnit unit) {
        return new PeriodicTimerStream(TimeUnit.MILLISECONDS.convert(delay, unit), period, unit, timer);
    }

    public static <T> Stream<T> just(T value1) {
        return new SingleValueStream<T>(value1);
    }

    public static <T> Stream<T> just(T value1, T value2) {
        return Streams.from(Arrays.asList(value1, value2));
    }

    public static <T> Stream<T> just(T value1, T value2, T value3) {
        return Streams.from(Arrays.asList(value1, value2, value3));
    }

    public static <T> Stream<T> just(T value1, T value2, T value3, T value4) {
        return Streams.from(Arrays.asList(value1, value2, value3, value4));
    }

    public static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5) {
        return Streams.from(Arrays.asList(value1, value2, value3, value4, value5));
    }

    public static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5, T value6) {
        return Streams.from(Arrays.asList(value1, value2, value3, value4, value5, value6));
    }

    public static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5, T value6, T value7) {
        return Streams.from(Arrays.asList(value1, value2, value3, value4, value5, value6, value7));
    }

    public static <T> Stream<T> just(T value1, T value2, T value3, T value4, T value5, T value6, T value7, T value8) {
        return Streams.from(Arrays.asList(value1, value2, value3, value4, value5, value6, value7, value8));
    }

    public static <T> Stream<T> generate(Supplier<? extends T> value) {
        if (value == null) {
            throw new IllegalArgumentException("Supplier must be provided");
        }
        return new SupplierStream<T>(SynchronousDispatcher.INSTANCE, value);
    }

    public static <T> Action<Publisher<? extends T>, T> switchOnNext() {
        return Streams.switchOnNext(SynchronousDispatcher.INSTANCE);
    }

    public static <T> Action<Publisher<? extends T>, T> switchOnNext(Dispatcher dispatcher) {
        SwitchAction switchAction = new SwitchAction(dispatcher);
        switchAction.onSubscribe(Broadcaster.HOT_SUBSCRIPTION);
        return switchAction;
    }

    public static <T> Stream<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers) {
        return Streams.switchOnNext(mergedPublishers, SynchronousDispatcher.INSTANCE);
    }

    public static <T> Stream<T> switchOnNext(Publisher<? extends Publisher<? extends T>> mergedPublishers, Dispatcher dispatcher) {
        SwitchAction mergeAction = new SwitchAction(dispatcher);
        mergedPublishers.subscribe(mergeAction);
        return mergeAction;
    }

    public static <T> Stream<T> concat(Iterable<? extends Publisher<? extends T>> mergedPublishers) {
        ArrayList<Publisher<T>> publishers = new ArrayList<Publisher<T>>();
        for (Publisher<T> publisher : mergedPublishers) {
            publishers.add(publisher);
        }
        int size = publishers.size();
        if (size == 1) {
            return Streams.wrap((Publisher)publishers.get(0));
        }
        if (size == 0) {
            return Streams.empty();
        }
        ConcatAction concatAction = new ConcatAction();
        Streams.from(mergedPublishers).subscribe(concatAction);
        return concatAction;
    }

    public static <T> Stream<T> concat(Publisher<? extends Publisher<? extends T>> concatdPublishers) {
        ConcatAction concatAction = new ConcatAction();
        concatdPublishers.subscribe(concatAction);
        return concatAction;
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2) {
        return Streams.concat(Arrays.asList(source1, source2));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3) {
        return Streams.concat(Arrays.asList(source1, source2, source3));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4) {
        return Streams.concat(Arrays.asList(source1, source2, source3, source4));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5) {
        return Streams.concat(Arrays.asList(source1, source2, source3, source4, source5));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6) {
        return Streams.concat(Arrays.asList(source1, source2, source3, source4, source5, source6));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6, Publisher<? extends T> source7) {
        return Streams.concat(Arrays.asList(source1, source2, source3, source4, source5, source6, source7));
    }

    public static <T> Stream<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6, Publisher<? extends T> source7, Publisher<? extends T> source8) {
        return Streams.concat(Arrays.asList(source1, source2, source3, source4, source5, source6, source7, source8));
    }

    public static <T> Stream<T> merge(Iterable<? extends Publisher<? extends T>> mergedPublishers) {
        ArrayList<Publisher<T>> publishers = new ArrayList<Publisher<T>>();
        for (Publisher<T> publisher : mergedPublishers) {
            publishers.add(publisher);
        }
        if (publishers.size() == 0) {
            return Streams.empty();
        }
        if (publishers.size() == 1) {
            return Streams.wrap((Publisher)publishers.get(0));
        }
        return new MergeAction((Dispatcher)SynchronousDispatcher.INSTANCE, publishers);
    }

    public static <T, E extends T> Stream<E> merge(Publisher<? extends Publisher<E>> mergedPublishers) {
        DynamicMergeAction mergeAction = new DynamicMergeAction(null);
        mergedPublishers.subscribe(mergeAction);
        return mergeAction;
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2) {
        return Streams.merge(Arrays.asList(source1, source2));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3) {
        return Streams.merge(Arrays.asList(source1, source2, source3));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4) {
        return Streams.merge(Arrays.asList(source1, source2, source3, source4));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5) {
        return Streams.merge(Arrays.asList(source1, source2, source3, source4, source5));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6) {
        return Streams.merge(Arrays.asList(source1, source2, source3, source4, source5, source6));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6, Publisher<? extends T> source7) {
        return Streams.merge(Arrays.asList(source1, source2, source3, source4, source5, source6, source7));
    }

    public static <T> Stream<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6, Publisher<? extends T> source7, Publisher<? extends T> source8) {
        return Streams.merge(Arrays.asList(source1, source2, source3, source4, source5, source6, source7, source8));
    }

    public static <T1, T2, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Function<Tuple2<T1, T2>, ? extends V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2), combinator);
    }

    public static <T1, T2, T3, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Tuple3<T1, T2, T3>, ? extends V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2, source3), combinator);
    }

    public static <T1, T2, T3, T4, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Tuple4<T1, T2, T3, T4>, V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2, source3, source4), combinator);
    }

    public static <T1, T2, T3, T4, T5, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Tuple5<T1, T2, T3, T4, T5>, V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2, source3, source4, source5), combinator);
    }

    public static <T1, T2, T3, T4, T5, T6, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Tuple6<T1, T2, T3, T4, T5, T6>, V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2, source3, source4, source5, source6), combinator);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Function<Tuple7<T1, T2, T3, T4, T5, T6, T7>, V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2, source3, source4, source5, source6, source7), combinator);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, T8, V> Stream<V> combineLatest(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8, Function<Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>, ? extends V> combinator) {
        return Streams.combineLatest(Arrays.asList(source1, source2, source3, source4, source5, source6, source7, source8), combinator);
    }

    public static <TUPLE extends Tuple, V> Stream<V> combineLatest(List<? extends Publisher<?>> sources, Function<TUPLE, ? extends V> combinator) {
        return new CombineLatestAction(SynchronousDispatcher.INSTANCE, combinator, sources);
    }

    public static <E, TUPLE extends Tuple, V> Stream<V> combineLatest(Publisher<? extends Publisher<E>> sources, Function<TUPLE, ? extends V> combinator) {
        DynamicMergeAction mergeAction = new DynamicMergeAction(new CombineLatestAction(SynchronousDispatcher.INSTANCE, combinator, null));
        sources.subscribe(mergeAction);
        return mergeAction;
    }

    public static <T1, T2, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Function<Tuple2<T1, T2>, ? extends V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2), combinator);
    }

    public static <T1, T2, T3, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Function<Tuple3<T1, T2, T3>, ? extends V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2, source3), combinator);
    }

    public static <T1, T2, T3, T4, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Function<Tuple4<T1, T2, T3, T4>, V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2, source3, source4), combinator);
    }

    public static <T1, T2, T3, T4, T5, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Function<Tuple5<T1, T2, T3, T4, T5>, V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2, source3, source4, source5), combinator);
    }

    public static <T1, T2, T3, T4, T5, T6, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Function<Tuple6<T1, T2, T3, T4, T5, T6>, V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2, source3, source4, source5, source6), combinator);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Function<Tuple7<T1, T2, T3, T4, T5, T6, T7>, V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2, source3, source4, source5, source6, source7), combinator);
    }

    public static <T1, T2, T3, T4, T5, T6, T7, T8, V> Stream<V> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, Publisher<? extends T3> source3, Publisher<? extends T4> source4, Publisher<? extends T5> source5, Publisher<? extends T6> source6, Publisher<? extends T7> source7, Publisher<? extends T8> source8, Function<Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>, ? extends V> combinator) {
        return Streams.zip(Arrays.asList(source1, source2, source3, source4, source5, source6, source7, source8), combinator);
    }

    public static <TUPLE extends Tuple, V> Stream<V> zip(List<? extends Publisher<?>> sources, Function<TUPLE, ? extends V> combinator) {
        return new ZipAction(SynchronousDispatcher.INSTANCE, combinator, sources);
    }

    public static <E, TUPLE extends Tuple, V> Stream<V> zip(Publisher<? extends Publisher<E>> sources, Function<TUPLE, ? extends V> combinator) {
        DynamicMergeAction mergeAction = new DynamicMergeAction(new ZipAction(SynchronousDispatcher.INSTANCE, combinator, null));
        sources.subscribe(mergeAction);
        return mergeAction;
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2) {
        return Streams.join(Arrays.asList(source1, source2));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3) {
        return Streams.join(Arrays.asList(source1, source2, source3));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4) {
        return Streams.join(Arrays.asList(source1, source2, source3, source4));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5) {
        return Streams.join(Arrays.asList(source1, source2, source3, source4, source5));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6) {
        return Streams.join(Arrays.asList(source1, source2, source3, source4, source5, source6));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6, Publisher<? extends T> source7) {
        return Streams.join(Arrays.asList(source1, source2, source3, source4, source5, source6, source7));
    }

    public static <T> Stream<List<T>> join(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3, Publisher<? extends T> source4, Publisher<? extends T> source5, Publisher<? extends T> source6, Publisher<? extends T> source7, Publisher<? extends T> source8) {
        return Streams.join(Arrays.asList(source1, source2, source3, source4, source5, source6, source7, source8));
    }

    public static <T> Stream<List<T>> join(List<? extends Publisher<? extends T>> sources) {
        return (Action)Streams.zip(sources, ZipAction.joinZipper());
    }

    public static <T> Stream<List<T>> join(Publisher<? extends Publisher<T>> source) {
        return Streams.zip(source, ZipAction.joinZipper());
    }

    public static void await(Publisher<?> publisher) throws Throwable {
        long timeout = 30000L;
        if (Environment.alive()) {
            timeout = Environment.get().getLongProperty("reactor.await.defaultTimeout", 30000L);
        }
        Streams.await(publisher, timeout, TimeUnit.MILLISECONDS, true);
    }

    public static void await(Publisher<?> publisher, long timeout) throws Throwable {
        Streams.await(publisher, timeout, TimeUnit.SECONDS, true);
    }

    public static void await(Publisher<?> publisher, long timeout, TimeUnit unit) throws Throwable {
        Streams.await(publisher, timeout, unit, true);
    }

    public static void await(Publisher<?> publisher, long timeout, TimeUnit unit, final boolean request) throws Throwable {
        final AtomicReference exception = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(1);
        publisher.subscribe(new DefaultSubscriber<Object>(){
            Subscription s;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.s = subscription;
                if (request) {
                    subscription.request(Long.MAX_VALUE);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                exception.set(throwable);
                this.cancel();
                latch.countDown();
            }

            @Override
            public void onComplete() {
                this.cancel();
                latch.countDown();
            }

            void cancel() {
                if (this.s != null) {
                    try {
                        this.s.cancel();
                    }
                    catch (Throwable t) {
                        exception.set(t);
                    }
                }
            }
        });
        latch.await(timeout, unit);
        if (exception.get() != null) {
            throw (Throwable)exception.get();
        }
    }
}

