/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.reactive;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.api.RCollectionReactive;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

public class PublisherAdder<V> {
    private final RCollectionReactive<V> destination;

    public PublisherAdder(RCollectionReactive<V> destination) {
        this.destination = destination;
    }

    public Integer sum(Integer first, Integer second) {
        return first + second;
    }

    public Publisher<Integer> addAll(Publisher<? extends V> c) {
        final CompletableFuture promise = new CompletableFuture();
        c.subscribe(new BaseSubscriber<V>(){
            volatile boolean completed;
            AtomicLong values = new AtomicLong();
            Subscription s;
            Integer lastSize = 0;

            @Override
            protected void hookOnSubscribe(Subscription s) {
                this.s = s;
                s.request(1L);
            }

            @Override
            protected void hookOnNext(V o) {
                this.values.getAndIncrement();
                PublisherAdder.this.destination.add(o).subscribe((Subscriber<Integer>)new BaseSubscriber<Integer>(){

                    @Override
                    protected void hookOnSubscribe(Subscription s) {
                        s.request(1L);
                    }

                    @Override
                    protected void hookOnError(Throwable t) {
                        promise.completeExceptionally(t);
                    }

                    @Override
                    protected void hookOnNext(Integer o) {
                        lastSize = PublisherAdder.this.sum(lastSize, o);
                        s.request(1L);
                        if (values.decrementAndGet() == 0L && completed) {
                            promise.complete(lastSize);
                        }
                    }
                });
            }

            @Override
            protected void hookOnComplete() {
                this.completed = true;
                if (this.values.get() == 0L) {
                    promise.complete(this.lastSize);
                }
            }
        });
        return Mono.fromCompletionStage(promise);
    }
}

