/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.fragmentation;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.fragmentation.FrameFragmenter;
import io.rsocket.fragmentation.FrameReassembler;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameLengthFlyweight;
import io.rsocket.frame.FrameType;
import java.util.Objects;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

public final class FragmentationDuplexConnection
implements DuplexConnection {
    private static final int MIN_MTU_SIZE = 64;
    private static final Logger logger = LoggerFactory.getLogger(FragmentationDuplexConnection.class);
    private final DuplexConnection delegate;
    private final int mtu;
    private final ByteBufAllocator allocator;
    private final FrameReassembler frameReassembler;
    private final boolean encodeLength;
    private final String type;

    public FragmentationDuplexConnection(DuplexConnection delegate, ByteBufAllocator allocator, int mtu, boolean encodeLength, String type) {
        Objects.requireNonNull(delegate, "delegate must not be null");
        Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
        this.encodeLength = encodeLength;
        this.allocator = allocator;
        this.delegate = delegate;
        this.mtu = FragmentationDuplexConnection.assertMtu(mtu);
        this.frameReassembler = new FrameReassembler(allocator);
        this.type = type;
        delegate.onClose().doFinally(s -> this.frameReassembler.dispose()).subscribe();
    }

    private boolean shouldFragment(FrameType frameType, int readableBytes) {
        return frameType.isFragmentable() && readableBytes > this.mtu;
    }

    @Nullable
    public static <T> Mono<T> checkMtu(int mtu) {
        if (FragmentationDuplexConnection.isInsufficientMtu(mtu)) {
            String msg = String.format("smallest allowed mtu size is %d bytes, provided: %d", 64, mtu);
            return Mono.error((Throwable)new IllegalArgumentException(msg));
        }
        return null;
    }

    private static int assertMtu(int mtu) {
        if (FragmentationDuplexConnection.isInsufficientMtu(mtu)) {
            String msg = String.format("smallest allowed mtu size is %d bytes, provided: %d", 64, mtu);
            throw new IllegalArgumentException(msg);
        }
        return mtu;
    }

    private static boolean isInsufficientMtu(int mtu) {
        return mtu > 0 && mtu < 64 || mtu < 0;
    }

    @Override
    public Mono<Void> send(Publisher<ByteBuf> frames) {
        return Flux.from(frames).concatMap(this::sendOne).then();
    }

    @Override
    public Mono<Void> sendOne(ByteBuf frame) {
        int readableBytes;
        FrameType frameType = FrameHeaderFlyweight.frameType(frame);
        if (this.shouldFragment(frameType, readableBytes = frame.readableBytes())) {
            if (logger.isDebugEnabled()) {
                return this.delegate.send((Publisher<ByteBuf>)Flux.from(FrameFragmenter.fragmentFrame(this.allocator, this.mtu, frame, frameType, this.encodeLength)).doOnNext(byteBuf -> {
                    ByteBuf f = this.encodeLength ? FrameLengthFlyweight.frame(byteBuf) : byteBuf;
                    logger.debug("{} - stream id {} - frame type {} - \n {}", new Object[]{this.type, FrameHeaderFlyweight.streamId(f), FrameHeaderFlyweight.frameType(f), ByteBufUtil.prettyHexDump((ByteBuf)f)});
                }));
            }
            return this.delegate.send((Publisher<ByteBuf>)Flux.from(FrameFragmenter.fragmentFrame(this.allocator, this.mtu, frame, frameType, this.encodeLength)));
        }
        return this.delegate.sendOne(this.encode(frame));
    }

    private ByteBuf encode(ByteBuf frame) {
        if (this.encodeLength) {
            return FrameLengthFlyweight.encode(this.allocator, frame.readableBytes(), frame);
        }
        return frame;
    }

    private ByteBuf decode(ByteBuf frame) {
        if (this.encodeLength) {
            return FrameLengthFlyweight.frame(frame).retain();
        }
        return frame;
    }

    @Override
    public Flux<ByteBuf> receive() {
        return this.delegate.receive().handle((byteBuf, sink) -> {
            ByteBuf decode = this.decode((ByteBuf)byteBuf);
            this.frameReassembler.reassembleFrame(decode, (SynchronousSink<ByteBuf>)sink);
        });
    }

    @Override
    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    public void dispose() {
        this.delegate.dispose();
    }
}

