/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.transformation;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.queue.CompletableQueue;
import reactor.core.reactivestreams.SerializedSubscriber;
import reactor.core.support.Assert;
import reactor.fn.Function;
import reactor.rx.action.Action;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.stream.GroupedStream;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

public class GroupByAction<T, K>
extends Action<T, GroupedStream<K, T>> {
    private final Function<? super T, ? extends K> fn;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private final Map<K, ReactiveSubscription<T>> groupByMap = new ConcurrentHashMap<K, ReactiveSubscription<T>>();
    private final SerializedSubscriber<Long> serialized = SerializedSubscriber.create(new DefaultSubscriber<Long>(){

        @Override
        public void onNext(Long aLong) {
            Action.checkRequest(aLong);
            if (GroupByAction.this.upstreamSubscription != null) {
                GroupByAction.this.upstreamSubscription.request(aLong);
            }
        }
    });

    public GroupByAction(Environment environment, Function<? super T, ? extends K> fn, Dispatcher dispatcher) {
        Assert.notNull(fn, "Key mapping function cannot be null.");
        this.dispatcher = dispatcher;
        this.fn = fn;
        this.environment = environment;
    }

    public Map<K, ReactiveSubscription<T>> groupByMap() {
        return this.groupByMap;
    }

    @Override
    protected void doNext(T value) {
        final K key = this.fn.apply(value);
        ReactiveSubscription<Object> child = this.groupByMap.get(key);
        if (child == null) {
            child = new ReactiveSubscription(null, null);
            child.getBuffer().add(value);
            this.groupByMap.put(key, child);
            final CompletableQueue<Object> queue = child.getBuffer();
            GroupedStream action = new GroupedStream<K, T>(key){

                @Override
                public long getCapacity() {
                    return GroupByAction.this.getCapacity();
                }

                @Override
                public Dispatcher getDispatcher() {
                    return GroupByAction.this.dispatcher;
                }

                @Override
                public Environment getEnvironment() {
                    return GroupByAction.this.environment;
                }

                @Override
                public void subscribe(Subscriber<? super T> s) {
                    final AtomicBoolean last = new AtomicBoolean();
                    ReactiveSubscription finalSub = new ReactiveSubscription<T>(this, s, queue){

                        @Override
                        public void cancel() {
                            super.cancel();
                            if (last.compareAndSet(false, true)) {
                                GroupByAction.this.removeGroupedStream(key);
                            }
                        }

                        @Override
                        public void onComplete() {
                            super.onComplete();
                            if (last.compareAndSet(false, true)) {
                                GroupByAction.this.removeGroupedStream(key);
                            }
                        }

                        @Override
                        protected void onRequest(long n) {
                            GroupByAction.this.serialized.onNext(n);
                        }
                    };
                    GroupByAction.this.groupByMap.put(key, finalSub);
                    s.onSubscribe(finalSub);
                }
            };
            this.broadcastNext(action);
        } else {
            child.onNext(value);
        }
    }

    private void removeGroupedStream(K key) {
        PushSubscription parentSub = this.upstreamSubscription;
        ReactiveSubscription<T> innerSub = this.groupByMap.remove(key);
        if (innerSub != null && this.groupByMap.isEmpty() && (parentSub == null || parentSub.isComplete())) {
            PushSubscription childSub = this.downstreamSubscription;
            if (childSub == null || childSub.isComplete()) {
                this.cancel();
            }
            if (innerSub.getBufferSize() == 0L) {
                this.broadcastComplete();
            }
        }
    }

    @Override
    protected void doComplete() {
        for (ReactiveSubscription<T> stream : this.groupByMap.values()) {
            stream.onComplete();
        }
        super.doComplete();
    }

    @Override
    public void requestMore(long n) {
        this.serialized.onNext(n);
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }
}

