/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.concurrent.TimeUnit;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.fn.timer.Timer;
import reactor.rx.Stream;
import reactor.rx.action.aggregation.BatchAction;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;
import reactor.rx.subscription.ReactiveSubscription;

public class WindowAction<T>
extends BatchAction<T, Stream<T>> {
    private final Environment environment;
    private ReactiveSubscription<T> currentWindow;

    public WindowAction(Environment environment, Dispatcher dispatcher, int backlog) {
        super(dispatcher, backlog, true, true, true);
        this.environment = environment;
    }

    public WindowAction(Environment environment, Dispatcher dispatcher, int backlog, long timespan, TimeUnit unit, Timer timer) {
        super(dispatcher, backlog, true, true, true, timespan, unit, timer);
        this.environment = environment;
    }

    public ReactiveSubscription<T> currentWindow() {
        return this.currentWindow;
    }

    protected Stream<T> createWindowStream() {
        Broadcaster action = this.timer != null && this.dispatcher == SynchronousDispatcher.INSTANCE ? SerializedBroadcaster.create(this.environment, this.dispatcher) : Broadcaster.create(this.environment, this.dispatcher);
        ReactiveSubscription _currentWindow = new ReactiveSubscription(null, action);
        this.currentWindow = _currentWindow;
        action.onSubscribe(_currentWindow);
        return action;
    }

    @Override
    protected void doError(Throwable ev) {
        if (this.currentWindow != null) {
            this.currentWindow.onError(ev);
        }
        super.doError(ev);
    }

    @Override
    protected void doComplete() {
        if (this.currentWindow != null) {
            this.currentWindow.onComplete();
            this.currentWindow = null;
        }
        super.doComplete();
    }

    @Override
    protected void firstCallback(T event) {
        this.broadcastNext(this.createWindowStream());
    }

    @Override
    protected void nextCallback(T event) {
        if (this.currentWindow != null) {
            this.currentWindow.onNext(event);
        }
    }

    @Override
    protected void flushCallback(T event) {
        if (this.currentWindow != null) {
            this.currentWindow.onComplete();
            this.currentWindow = null;
        }
    }

    @Override
    public final Environment getEnvironment() {
        return this.environment;
    }
}

