/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.error;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.reactivestreams.Publisher;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.action.error.FallbackAction;

public final class TimeoutAction<T>
extends FallbackAction<T> {
    private final Timer timer;
    private final long timeout;
    private final Consumer<Long> timeoutTask;
    private final Consumer<Void> timeoutRequest = new Consumer<Void>(){

        @Override
        public void accept(Void aVoid) {
            if (TimeoutAction.this.fallback != null) {
                TimeoutAction.this.doSwitch();
            } else {
                TimeoutAction.this.doError(new TimeoutException("No data signaled for " + TimeoutAction.this.timeout + "ms"));
            }
        }
    };
    private Pausable timeoutRegistration;

    public TimeoutAction(final Dispatcher dispatcher, Publisher<? extends T> fallback, Timer timer, long timeout) {
        super(fallback);
        Assert.state(timer != null, "Timer must be supplied");
        this.timeoutTask = new Consumer<Long>(){

            @Override
            public void accept(Long aLong) {
                dispatcher.dispatch(null, TimeoutAction.this.timeoutRequest, null);
            }
        };
        this.timer = timer;
        this.timeout = timeout;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestMore(long req) {
        TimeoutAction timeoutAction = this;
        synchronized (timeoutAction) {
            if (this.timeoutRegistration != null) {
                this.timeoutRegistration.cancel();
            }
            this.timeoutRegistration = this.timer.submit(this.timeoutTask, this.timeout, TimeUnit.MILLISECONDS);
        }
        super.requestMore(req);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void doNormalNext(T ev) {
        TimeoutAction timeoutAction = this;
        synchronized (timeoutAction) {
            if (this.timeoutRegistration != null) {
                this.timeoutRegistration.cancel();
            }
            this.timeoutRegistration = this.timer.submit(this.timeoutTask, this.timeout, TimeUnit.MILLISECONDS);
        }
        this.broadcastNext(ev);
    }

    @Override
    protected void doShutdown() {
        if (this.timeoutRegistration != null) {
            this.timeoutRegistration.cancel();
            this.timeoutRegistration = null;
        }
        super.doShutdown();
    }
}

