package io.rsocket;

import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.Frame;
import io.rsocket.exceptions.ApplicationException;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:io/rsocket/RSocketServer.class */
class RSocketServer implements RSocket {
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final Function<Frame, ? extends Payload> frameDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final IntObjectHashMap<Subscription> sendingSubscriptions = new IntObjectHashMap<>();
    private final IntObjectHashMap<UnicastProcessor<Payload>> channelProcessors = new IntObjectHashMap<>();
    private final UnboundedProcessor<Frame> sendProcessor = new UnboundedProcessor<>();
    private Disposable receiveDisposable;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.rsocket.RSocketServer$1, reason: invalid class name */
    /* loaded from: input_file:io/rsocket/RSocketServer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$rsocket$FrameType = new int[FrameType.values().length];

        static {
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.FIRE_AND_FORGET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.REQUEST_RESPONSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.CANCEL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.KEEPALIVE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.REQUEST_N.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.REQUEST_STREAM.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.REQUEST_CHANNEL.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.PAYLOAD.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.METADATA_PUSH.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.LEASE.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.NEXT.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.COMPLETE.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.ERROR.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.NEXT_COMPLETE.ordinal()] = 14;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$io$rsocket$FrameType[FrameType.SETUP.ordinal()] = 15;
            } catch (NoSuchFieldError e15) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RSocketServer(DuplexConnection duplexConnection, RSocket rSocket, Function<Frame, ? extends Payload> function, Consumer<Throwable> consumer) {
        this.connection = duplexConnection;
        this.requestHandler = rSocket;
        this.frameDecoder = function;
        this.errorConsumer = consumer;
        duplexConnection.send(this.sendProcessor).doOnError(this::handleSendProcessorError).doFinally(this::handleSendProcessorCancel).subscribe();
        this.receiveDisposable = duplexConnection.receive().flatMapSequential(frame -> {
            return handleFrame(frame).onErrorResume(th -> {
                consumer.accept(th);
                return Mono.empty();
            });
        }).doOnError(consumer).then().subscribe();
        this.connection.onClose().doOnError(consumer).doFinally(signalType -> {
            cleanup();
            this.receiveDisposable.dispose();
        }).subscribe();
    }

    private void handleSendProcessorError(Throwable th) {
        Collection values;
        Collection values2;
        synchronized (this) {
            values = this.sendingSubscriptions.values();
            values2 = this.channelProcessors.values();
        }
        Iterator it = values.iterator();
        while (it.hasNext()) {
            try {
                ((Subscription) it.next()).cancel();
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        }
        Iterator it2 = values2.iterator();
        while (it2.hasNext()) {
            try {
                ((UnicastProcessor) it2.next()).cancel();
            } catch (Throwable th3) {
                this.errorConsumer.accept(th3);
            }
        }
    }

    private void handleSendProcessorCancel(SignalType signalType) {
        Collection values;
        Collection values2;
        if (SignalType.ON_ERROR == signalType) {
            return;
        }
        synchronized (this) {
            values = this.sendingSubscriptions.values();
            values2 = this.channelProcessors.values();
        }
        Iterator it = values.iterator();
        while (it.hasNext()) {
            try {
                ((Subscription) it.next()).cancel();
            } catch (Throwable th) {
                this.errorConsumer.accept(th);
            }
        }
        Iterator it2 = values2.iterator();
        while (it2.hasNext()) {
            try {
                ((UnicastProcessor) it2.next()).cancel();
            } catch (Throwable th2) {
                this.errorConsumer.accept(th2);
            }
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        try {
            return this.requestHandler.requestChannel(publisher);
        } catch (Throwable th) {
            return Flux.error(th);
        }
    }

    @Override // io.rsocket.RSocket
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        } catch (Throwable th) {
            return Mono.error(th);
        }
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> close() {
        return this.connection.close();
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup() {
        cleanUpSendingSubscriptions();
        cleanUpChannelProcessors();
        this.requestHandler.close().subscribe();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach((v0) -> {
            v0.cancel();
        });
        this.sendingSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors() {
        this.channelProcessors.values().forEach((v0) -> {
            v0.cancel();
        });
        this.channelProcessors.clear();
    }

    private Mono<Void> handleFrame(Frame frame) {
        try {
            int streamId = frame.getStreamId();
            switch (AnonymousClass1.$SwitchMap$io$rsocket$FrameType[frame.getType().ordinal()]) {
                case ErrorFrameFlyweight.INVALID_SETUP /* 1 */:
                    Mono<Void> handleFireAndForget = handleFireAndForget(streamId, fireAndForget(this.frameDecoder.apply(frame)));
                    frame.release();
                    return handleFireAndForget;
                case ErrorFrameFlyweight.UNSUPPORTED_SETUP /* 2 */:
                    Mono<Void> handleRequestResponse = handleRequestResponse(streamId, requestResponse(this.frameDecoder.apply(frame)));
                    frame.release();
                    return handleRequestResponse;
                case 3:
                    Mono<Void> handleCancelFrame = handleCancelFrame(streamId);
                    frame.release();
                    return handleCancelFrame;
                case ErrorFrameFlyweight.REJECTED_RESUME /* 4 */:
                    Mono<Void> handleKeepAliveFrame = handleKeepAliveFrame(frame);
                    frame.release();
                    return handleKeepAliveFrame;
                case 5:
                    Mono<Void> handleRequestN = handleRequestN(streamId, frame);
                    frame.release();
                    return handleRequestN;
                case 6:
                    Mono<Void> handleStream = handleStream(streamId, requestStream(this.frameDecoder.apply(frame)), Frame.Request.initialRequestN(frame));
                    frame.release();
                    return handleStream;
                case 7:
                    Mono<Void> handleChannel = handleChannel(streamId, frame);
                    frame.release();
                    return handleChannel;
                case 8:
                    Mono<Void> empty = Mono.empty();
                    frame.release();
                    return empty;
                case 9:
                    Mono<Void> metadataPush = metadataPush(this.frameDecoder.apply(frame));
                    frame.release();
                    return metadataPush;
                case 10:
                    Mono<Void> empty2 = Mono.empty();
                    frame.release();
                    return empty2;
                case 11:
                    UnicastProcessor<Payload> channelProcessor = getChannelProcessor(streamId);
                    if (channelProcessor != null) {
                        channelProcessor.onNext(this.frameDecoder.apply(frame));
                    }
                    Mono<Void> empty3 = Mono.empty();
                    frame.release();
                    return empty3;
                case 12:
                    UnicastProcessor<Payload> channelProcessor2 = getChannelProcessor(streamId);
                    if (channelProcessor2 != null) {
                        channelProcessor2.onComplete();
                    }
                    Mono<Void> empty4 = Mono.empty();
                    frame.release();
                    return empty4;
                case 13:
                    UnicastProcessor<Payload> channelProcessor3 = getChannelProcessor(streamId);
                    if (channelProcessor3 != null) {
                        channelProcessor3.onError(new ApplicationException(Frame.Error.message(frame)));
                    }
                    Mono<Void> empty5 = Mono.empty();
                    frame.release();
                    return empty5;
                case 14:
                    UnicastProcessor<Payload> channelProcessor4 = getChannelProcessor(streamId);
                    if (channelProcessor4 != null) {
                        channelProcessor4.onNext(this.frameDecoder.apply(frame));
                        channelProcessor4.onComplete();
                    }
                    Mono<Void> empty6 = Mono.empty();
                    frame.release();
                    return empty6;
                case 15:
                    Mono<Void> handleError = handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    frame.release();
                    return handleError;
                default:
                    Mono<Void> handleError2 = handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + frame.getType()));
                    frame.release();
                    return handleError2;
            }
        } catch (Throwable th) {
            frame.release();
            throw th;
        }
    }

    private Mono<Void> handleFireAndForget(int i, Mono<Void> mono) {
        return mono.doOnSubscribe(subscription -> {
            addSubscription(i, subscription);
        }).doOnError(this.errorConsumer).doFinally(signalType -> {
            removeSubscription(i);
        }).ignoreElement();
    }

    private Mono<Void> handleRequestResponse(int i, Mono<Payload> mono) {
        Mono onErrorResume = mono.doOnSubscribe(subscription -> {
            addSubscription(i, subscription);
        }).map(payload -> {
            int i2 = 64;
            if (payload.hasMetadata()) {
                i2 = Frame.setFlag(64, FrameHeaderFlyweight.FLAGS_M);
            }
            Frame from = Frame.PayloadFrame.from(i, FrameType.NEXT_COMPLETE, payload, i2);
            payload.release();
            return from;
        }).doOnError(this.errorConsumer).onErrorResume(th -> {
            return Mono.just(Frame.Error.from(i, th));
        });
        UnboundedProcessor<Frame> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        return onErrorResume.doOnNext((v1) -> {
            r1.onNext(v1);
        }).doFinally(signalType -> {
            removeSubscription(i);
        }).then();
    }

    private Mono<Void> handleStream(int i, Flux<Payload> flux, int i2) {
        Flux onErrorResume = flux.map(payload -> {
            Frame from = Frame.PayloadFrame.from(i, FrameType.NEXT, payload);
            payload.release();
            return from;
        }).transform(flux2 -> {
            LimitableRequestPublisher wrap = LimitableRequestPublisher.wrap(flux2);
            synchronized (this) {
                this.sendingSubscriptions.put(i, wrap);
            }
            wrap.increaseRequestLimit(i2);
            return wrap;
        }).concatWith(Mono.just(Frame.PayloadFrame.from(i, FrameType.COMPLETE))).onErrorResume(th -> {
            return Mono.just(Frame.Error.from(i, th));
        });
        UnboundedProcessor<Frame> unboundedProcessor = this.sendProcessor;
        unboundedProcessor.getClass();
        onErrorResume.doOnNext((v1) -> {
            r1.onNext(v1);
        }).doFinally(signalType -> {
            removeSubscription(i);
        }).subscribe();
        return Mono.empty();
    }

    private Mono<Void> handleChannel(int i, Frame frame) {
        UnicastProcessor<Payload> create = UnicastProcessor.create();
        addChannelProcessor(i, create);
        Flux doFinally = create.doOnCancel(() -> {
            this.sendProcessor.onNext(Frame.Cancel.from(i));
        }).doOnError(th -> {
            this.sendProcessor.onNext(Frame.Error.from(i, th));
        }).doOnRequest(j -> {
            this.sendProcessor.onNext(Frame.RequestN.from(i, j));
        }).doFinally(signalType -> {
            removeChannelProcessor(i);
        });
        create.onNext(this.frameDecoder.apply(frame));
        return handleStream(i, requestChannel(doFinally), Frame.Request.initialRequestN(frame));
    }

    private Mono<Void> handleKeepAliveFrame(Frame frame) {
        return Mono.fromRunnable(() -> {
            if (Frame.Keepalive.hasRespondFlag(frame)) {
                this.sendProcessor.onNext(Frame.Keepalive.from(Unpooled.wrappedBuffer(frame.getData()), false));
            }
        });
    }

    private Mono<Void> handleCancelFrame(int i) {
        return Mono.fromRunnable(() -> {
            Subscription subscription;
            synchronized (this) {
                subscription = (Subscription) this.sendingSubscriptions.remove(i);
            }
            if (subscription != null) {
                subscription.cancel();
            }
        });
    }

    private Mono<Void> handleError(int i, Throwable th) {
        return Mono.fromRunnable(() -> {
            this.errorConsumer.accept(th);
            this.sendProcessor.onNext(Frame.Error.from(i, th));
        });
    }

    private Mono<Void> handleRequestN(int i, Frame frame) {
        Subscription subscription = getSubscription(i);
        if (subscription != null) {
            int requestN = Frame.RequestN.requestN(frame);
            subscription.request(requestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : requestN);
        }
        return Mono.empty();
    }

    private synchronized void addSubscription(int i, Subscription subscription) {
        this.sendingSubscriptions.put(i, subscription);
    }

    @Nullable
    private synchronized Subscription getSubscription(int i) {
        return (Subscription) this.sendingSubscriptions.get(i);
    }

    private synchronized void removeSubscription(int i) {
        this.sendingSubscriptions.remove(i);
    }

    private synchronized void addChannelProcessor(int i, UnicastProcessor<Payload> unicastProcessor) {
        this.channelProcessors.put(i, unicastProcessor);
    }

    @Nullable
    private synchronized UnicastProcessor<Payload> getChannelProcessor(int i) {
        return (UnicastProcessor) this.channelProcessors.get(i);
    }

    private synchronized void removeChannelProcessor(int i) {
        this.channelProcessors.remove(i);
    }
}
