/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameType;
import io.rsocket.internal.KeepAliveData;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.resume.ResumePositionsConnection;
import io.rsocket.resume.ResumeStateHolder;
import io.rsocket.util.DuplexConnectionProxy;
import io.rsocket.util.Function3;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class KeepAliveConnection
extends DuplexConnectionProxy
implements ResumePositionsConnection {
    private final MonoProcessor<KeepAliveHandler> keepAliveHandlerReady = MonoProcessor.create();
    private final ByteBufAllocator allocator;
    private final Function<ByteBuf, KeepAliveData> keepAliveData;
    private final Function3<ByteBufAllocator, Duration, Duration, KeepAliveHandler> keepAliveHandlerFactory;
    private final Consumer<Throwable> errorConsumer;
    private volatile KeepAliveHandler keepAliveHandler;
    private volatile ResumeStateHolder resumeStateHolder;
    private volatile boolean keepAliveHandlerStarted;

    public static KeepAliveConnection ofClient(ByteBufAllocator allocator, DuplexConnection duplexConnection, Function<ByteBuf, KeepAliveData> keepAliveData, Consumer<Throwable> errorConsumer) {
        return new KeepAliveConnection(allocator, duplexConnection, keepAliveData, KeepAliveHandler::ofClient, errorConsumer);
    }

    public static KeepAliveConnection ofServer(ByteBufAllocator allocator, DuplexConnection duplexConnection, Function<ByteBuf, KeepAliveData> keepAliveData, Consumer<Throwable> errorConsumer) {
        return new KeepAliveConnection(allocator, duplexConnection, keepAliveData, KeepAliveHandler::ofServer, errorConsumer);
    }

    private KeepAliveConnection(ByteBufAllocator allocator, DuplexConnection duplexConnection, Function<ByteBuf, KeepAliveData> keepAliveData, Function3<ByteBufAllocator, Duration, Duration, KeepAliveHandler> keepAliveHandlerFactory, Consumer<Throwable> errorConsumer) {
        super(duplexConnection);
        this.allocator = allocator;
        this.keepAliveData = keepAliveData;
        this.keepAliveHandlerFactory = keepAliveHandlerFactory;
        this.errorConsumer = errorConsumer;
        this.keepAliveHandlerReady.subscribe(this::startKeepAlives);
    }

    private void startKeepAlives(KeepAliveHandler keepAliveHandler) {
        this.keepAliveHandler = keepAliveHandler;
        this.send((Publisher<ByteBuf>)keepAliveHandler.send()).subscribe(null, err -> keepAliveHandler.dispose());
        keepAliveHandler.timeout().subscribe(keepAlive -> {
            String message = String.format("No keep-alive acks for %d ms", keepAlive.getTimeoutMillis());
            ConnectionErrorException err = new ConnectionErrorException(message);
            this.errorConsumer.accept(err);
            this.dispose();
        });
        keepAliveHandler.start();
    }

    @Override
    public Mono<Void> send(Publisher<ByteBuf> frames) {
        return super.send((Publisher<ByteBuf>)Flux.from(frames).doOnNext(this::startKeepAliveHandlerOnce));
    }

    @Override
    public Flux<ByteBuf> receive() {
        return super.receive().doOnNext(f -> {
            if (KeepAliveConnection.isKeepAliveFrame(f)) {
                ResumeStateHolder h;
                long receivedPos = this.keepAliveHandler.receive((ByteBuf)f);
                if (receivedPos > 0L && (h = this.resumeStateHolder) != null) {
                    h.onImpliedPosition(receivedPos);
                }
            } else {
                this.startKeepAliveHandlerOnce((ByteBuf)f);
            }
        });
    }

    @Override
    public Mono<Void> onClose() {
        return super.onClose().doFinally(s -> {
            KeepAliveHandler keepAliveHandler = (KeepAliveHandler)this.keepAliveHandlerReady.peek();
            if (keepAliveHandler != null) {
                keepAliveHandler.dispose();
            }
        });
    }

    @Override
    public void acceptResumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
        this.keepAliveHandlerReady.subscribe(h -> h.resumeState(resumeStateHolder));
    }

    private void startKeepAliveHandlerOnce(ByteBuf f) {
        if (!this.keepAliveHandlerStarted && KeepAliveConnection.isStartFrame(f)) {
            this.keepAliveHandlerStarted = true;
            this.startKeepAliveHandler(this.keepAliveData.apply(f));
        }
    }

    private static boolean isStartFrame(ByteBuf frame) {
        FrameType frameType = FrameHeaderFlyweight.frameType(frame);
        return frameType == FrameType.SETUP || frameType == FrameType.RESUME;
    }

    private static boolean isKeepAliveFrame(ByteBuf frame) {
        return FrameHeaderFlyweight.frameType(frame) == FrameType.KEEPALIVE;
    }

    private void startKeepAliveHandler(@Nullable KeepAliveData kad) {
        if (kad != null) {
            KeepAliveHandler handler = this.keepAliveHandlerFactory.apply(this.allocator, kad.getTickPeriod(), kad.getTimeout());
            this.keepAliveHandlerReady.onNext((Object)handler);
        }
    }
}

