package io.rsocket.fragmentation;

import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/fragmentation/FragmentationDuplexConnection.class */
public class FragmentationDuplexConnection implements DuplexConnection {
    private final DuplexConnection source;
    private final IntObjectHashMap<FrameReassembler> frameReassemblers = new IntObjectHashMap<>();
    private final FrameFragmenter frameFragmenter;

    public FragmentationDuplexConnection(DuplexConnection duplexConnection, int i) {
        this.source = duplexConnection;
        this.frameFragmenter = new FrameFragmenter(i);
    }

    public static int getDefaultMTU() {
        if (Boolean.getBoolean("io.rsocket.fragmentation.enable")) {
            return Integer.getInteger("io.rsocket.fragmentation.mtu", 1024).intValue();
        }
        return 0;
    }

    @Override // io.rsocket.Availability
    public double availability() {
        return this.source.availability();
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> send(Publisher<Frame> publisher) {
        return Flux.from(publisher).concatMap(this::sendOne).then();
    }

    @Override // io.rsocket.DuplexConnection
    public Mono<Void> sendOne(Frame frame) {
        return this.frameFragmenter.shouldFragment(frame) ? this.source.send(this.frameFragmenter.fragment(frame)) : this.source.sendOne(frame);
    }

    @Override // io.rsocket.DuplexConnection
    public Flux<Frame> receive() {
        return this.source.receive().concatMap(frame -> {
            if (128 == (frame.flags() & 128)) {
                getFrameReassembler(frame).append(frame);
                return Mono.empty();
            }
            if (!frameReassemblersContain(frame.getStreamId())) {
                return Mono.just(frame);
            }
            FrameReassembler removeFrameReassembler = removeFrameReassembler(frame.getStreamId());
            removeFrameReassembler.append(frame);
            return Mono.just(removeFrameReassembler.reassemble());
        });
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> close() {
        return this.source.close();
    }

    private synchronized FrameReassembler getFrameReassembler(Frame frame) {
        return (FrameReassembler) this.frameReassemblers.computeIfAbsent(Integer.valueOf(frame.getStreamId()), num -> {
            return new FrameReassembler(frame);
        });
    }

    private synchronized FrameReassembler removeFrameReassembler(int i) {
        return (FrameReassembler) this.frameReassemblers.remove(i);
    }

    private synchronized boolean frameReassemblersContain(int i) {
        return this.frameReassemblers.containsKey(i);
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.source.onClose().doFinally(signalType -> {
            synchronized (this) {
                this.frameReassemblers.values().forEach((v0) -> {
                    v0.dispose();
                });
                this.frameReassemblers.clear();
            }
        });
    }
}
