/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.frame.KeepAliveFrameFlyweight;
import java.time.Duration;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

abstract class KeepAliveHandler
implements Disposable {
    private final KeepAlive keepAlive;
    private final UnicastProcessor<ByteBuf> sent = UnicastProcessor.create();
    private final MonoProcessor<KeepAlive> timeout = MonoProcessor.create();
    private Disposable intervalDisposable;
    private volatile long lastReceivedMillis;

    private KeepAliveHandler(KeepAlive keepAlive) {
        this.keepAlive = keepAlive;
        this.lastReceivedMillis = System.currentTimeMillis();
        this.intervalDisposable = Flux.interval((Duration)Duration.ofMillis(keepAlive.getTickPeriod())).subscribe(v -> this.onIntervalTick());
    }

    static KeepAliveHandler ofServer(KeepAlive keepAlive) {
        return new Server(keepAlive);
    }

    static KeepAliveHandler ofClient(KeepAlive keepAlive) {
        return new Client(keepAlive);
    }

    public void dispose() {
        this.sent.onComplete();
        this.timeout.onComplete();
        this.intervalDisposable.dispose();
    }

    public void receive(ByteBuf keepAliveFrame) {
        this.lastReceivedMillis = System.currentTimeMillis();
        if (KeepAliveFrameFlyweight.respondFlag(keepAliveFrame)) {
            this.doSend(KeepAliveFrameFlyweight.encode(ByteBufAllocator.DEFAULT, false, 0L, KeepAliveFrameFlyweight.data(keepAliveFrame).retain()));
        }
    }

    public Flux<ByteBuf> send() {
        return this.sent;
    }

    public Mono<KeepAlive> timeout() {
        return this.timeout;
    }

    abstract void onIntervalTick();

    void doSend(ByteBuf frame) {
        this.sent.onNext((Object)frame);
    }

    void doCheckTimeout() {
        long now = System.currentTimeMillis();
        if (now - this.lastReceivedMillis >= this.keepAlive.getTimeoutMillis()) {
            this.timeout.onNext((Object)this.keepAlive);
        }
    }

    static final class KeepAlive {
        private final long tickPeriod;
        private final long timeoutMillis;

        KeepAlive(Duration tickPeriod, Duration timeoutMillis, int maxTicks) {
            this.tickPeriod = tickPeriod.toMillis();
            this.timeoutMillis = timeoutMillis.toMillis() + (long)maxTicks * tickPeriod.toMillis();
        }

        KeepAlive(long tickPeriod, long timeoutMillis) {
            this.tickPeriod = tickPeriod;
            this.timeoutMillis = timeoutMillis;
        }

        public long getTickPeriod() {
            return this.tickPeriod;
        }

        public long getTimeoutMillis() {
            return this.timeoutMillis;
        }
    }

    private static final class Client
    extends KeepAliveHandler {
        Client(KeepAlive keepAlive) {
            super(keepAlive);
        }

        @Override
        void onIntervalTick() {
            this.doCheckTimeout();
            this.doSend(KeepAliveFrameFlyweight.encode(ByteBufAllocator.DEFAULT, true, 0L, Unpooled.EMPTY_BUFFER));
        }
    }

    private static class Server
    extends KeepAliveHandler {
        Server(KeepAlive keepAlive) {
            super(keepAlive);
        }

        @Override
        void onIntervalTick() {
            this.doCheckTimeout();
        }
    }
}

