/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.fragmentation;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.fragmentation.FrameFragmenter;
import io.rsocket.fragmentation.FrameReassembler;
import io.rsocket.util.AbstractionLeakingFrameUtils;
import io.rsocket.util.NumberUtils;
import java.util.Objects;
import org.jctools.maps.NonBlockingHashMapLong;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class FragmentationDuplexConnection
implements DuplexConnection {
    private final ByteBufAllocator byteBufAllocator;
    private final DuplexConnection delegate;
    private final FrameFragmenter frameFragmenter;
    private final NonBlockingHashMapLong<FrameReassembler> frameReassemblers = new NonBlockingHashMapLong();

    public FragmentationDuplexConnection(DuplexConnection delegate, int maxFragmentSize) {
        this((ByteBufAllocator)PooledByteBufAllocator.DEFAULT, delegate, maxFragmentSize);
    }

    public FragmentationDuplexConnection(ByteBufAllocator byteBufAllocator, DuplexConnection delegate, int maxFragmentSize) {
        this.byteBufAllocator = Objects.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
        this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");
        NumberUtils.requireNonNegative(maxFragmentSize, "maxFragmentSize must be positive");
        this.frameFragmenter = new FrameFragmenter(byteBufAllocator, maxFragmentSize);
        delegate.onClose().doFinally(signalType -> this.frameReassemblers.values().forEach(FrameReassembler::dispose)).subscribe();
    }

    @Override
    public double availability() {
        return this.delegate.availability();
    }

    public void dispose() {
        this.delegate.dispose();
    }

    public boolean isDisposed() {
        return this.delegate.isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.delegate.onClose();
    }

    @Override
    public Flux<Frame> receive() {
        return this.delegate.receive().map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame).concatMap(t2 -> this.toReassembledFrames((Integer)t2.getT1(), (io.rsocket.framing.Frame)t2.getT2()));
    }

    @Override
    public Mono<Void> send(Publisher<Frame> frames) {
        Objects.requireNonNull(frames, "frames must not be null");
        return this.delegate.send((Publisher<Frame>)Flux.from(frames).map(AbstractionLeakingFrameUtils::fromAbstractionLeakingFrame).concatMap(t2 -> this.toFragmentedFrames((Integer)t2.getT1(), (io.rsocket.framing.Frame)t2.getT2())));
    }

    private Flux<Frame> toFragmentedFrames(int streamId, io.rsocket.framing.Frame frame) {
        return this.frameFragmenter.fragment(frame).map(fragment -> AbstractionLeakingFrameUtils.toAbstractionLeakingFrame(this.byteBufAllocator, streamId, fragment));
    }

    private Mono<Frame> toReassembledFrames(int streamId, io.rsocket.framing.Frame fragment) {
        FrameReassembler frameReassembler = (FrameReassembler)this.frameReassemblers.computeIfAbsent((Object)streamId, i -> FrameReassembler.createFrameReassembler(this.byteBufAllocator));
        return Mono.justOrEmpty((Object)frameReassembler.reassemble(fragment)).map(frame -> AbstractionLeakingFrameUtils.toAbstractionLeakingFrame(this.byteBufAllocator, streamId, frame));
    }
}

