/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.collection.IntObjectHashMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.CancelFrameFlyweight;
import io.rsocket.frame.ErrorFrameFlyweight;
import io.rsocket.frame.FrameHeaderFlyweight;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.PayloadFrameFlyweight;
import io.rsocket.frame.RequestChannelFrameFlyweight;
import io.rsocket.frame.RequestNFrameFlyweight;
import io.rsocket.frame.RequestStreamFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.LimitableRequestPublisher;
import io.rsocket.internal.UnboundedProcessor;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

class RSocketServer
implements ResponderRSocket {
    private final DuplexConnection connection;
    private final RSocket requestHandler;
    private final ResponderRSocket responderRSocket;
    private final PayloadDecoder payloadDecoder;
    private final Consumer<Throwable> errorConsumer;
    private final Map<Integer, LimitableRequestPublisher> sendingLimitableSubscriptions;
    private final Map<Integer, Subscription> sendingSubscriptions;
    private final Map<Integer, Processor<Payload, Payload>> channelProcessors;
    private final UnboundedProcessor<ByteBuf> sendProcessor;
    private final ByteBufAllocator allocator;

    RSocketServer(ByteBufAllocator allocator, DuplexConnection connection, RSocket requestHandler, PayloadDecoder payloadDecoder, Consumer<Throwable> errorConsumer) {
        this.allocator = allocator;
        this.connection = connection;
        this.requestHandler = requestHandler;
        this.responderRSocket = requestHandler instanceof ResponderRSocket ? (ResponderRSocket)requestHandler : null;
        this.payloadDecoder = payloadDecoder;
        this.errorConsumer = errorConsumer;
        this.sendingLimitableSubscriptions = Collections.synchronizedMap(new IntObjectHashMap());
        this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap());
        this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap());
        this.sendProcessor = new UnboundedProcessor();
        this.sendProcessor.doOnRequest(r -> {
            for (LimitableRequestPublisher lrp : this.sendingLimitableSubscriptions.values()) {
                lrp.increaseInternalLimit(r);
            }
        }).transform(connection::send).doFinally(this::handleSendProcessorCancel).subscribe(null, this::handleSendProcessorError);
        Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
        this.connection.onClose().doFinally(s -> {
            this.cleanup();
            receiveDisposable.dispose();
        }).subscribe(null, errorConsumer);
    }

    private void handleSendProcessorError(Throwable t) {
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        });
        this.sendingLimitableSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        });
        this.channelProcessors.values().forEach(subscription -> {
            try {
                subscription.onError(t);
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        });
    }

    private void handleSendProcessorCancel(SignalType t) {
        if (SignalType.ON_ERROR == t) {
            return;
        }
        this.sendingSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        });
        this.sendingLimitableSubscriptions.values().forEach(subscription -> {
            try {
                subscription.cancel();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        });
        this.channelProcessors.values().forEach(subscription -> {
            try {
                subscription.onComplete();
            }
            catch (Throwable e) {
                this.errorConsumer.accept(e);
            }
        });
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return this.requestHandler.fireAndForget(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return this.requestHandler.requestResponse(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        try {
            return this.requestHandler.requestStream(payload);
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        try {
            return this.requestHandler.requestChannel(payloads);
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
        try {
            return this.responderRSocket.requestChannel(payload, payloads);
        }
        catch (Throwable t) {
            return Flux.error((Throwable)t);
        }
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        try {
            return this.requestHandler.metadataPush(payload);
        }
        catch (Throwable t) {
            return Mono.error((Throwable)t);
        }
    }

    public void dispose() {
        this.connection.dispose();
    }

    public boolean isDisposed() {
        return this.connection.isDisposed();
    }

    @Override
    public Mono<Void> onClose() {
        return this.connection.onClose();
    }

    private void cleanup() {
        this.cleanUpSendingSubscriptions();
        this.cleanUpChannelProcessors();
        this.requestHandler.dispose();
        this.sendProcessor.dispose();
    }

    private synchronized void cleanUpSendingSubscriptions() {
        this.sendingSubscriptions.values().forEach(Subscription::cancel);
        this.sendingSubscriptions.clear();
        this.sendingLimitableSubscriptions.values().forEach(Subscription::cancel);
        this.sendingLimitableSubscriptions.clear();
    }

    private synchronized void cleanUpChannelProcessors() {
        this.channelProcessors.values().forEach(Subscriber::onComplete);
        this.channelProcessors.clear();
    }

    private void handleFrame(ByteBuf frame) {
        try {
            int streamId = FrameHeaderFlyweight.streamId(frame);
            FrameType frameType = FrameHeaderFlyweight.frameType(frame);
            switch (frameType) {
                case REQUEST_FNF: {
                    this.handleFireAndForget(streamId, this.fireAndForget((Payload)this.payloadDecoder.apply(frame)));
                    break;
                }
                case REQUEST_RESPONSE: {
                    this.handleRequestResponse(streamId, this.requestResponse((Payload)this.payloadDecoder.apply(frame)));
                    break;
                }
                case CANCEL: {
                    this.handleCancelFrame(streamId);
                    break;
                }
                case KEEPALIVE: {
                    break;
                }
                case REQUEST_N: {
                    this.handleRequestN(streamId, frame);
                    break;
                }
                case REQUEST_STREAM: {
                    this.handleStream(streamId, this.requestStream((Payload)this.payloadDecoder.apply(frame)), RequestStreamFrameFlyweight.initialRequestN(frame));
                    break;
                }
                case REQUEST_CHANNEL: {
                    this.handleChannel(streamId, (Payload)this.payloadDecoder.apply(frame), RequestChannelFrameFlyweight.initialRequestN(frame));
                    break;
                }
                case METADATA_PUSH: {
                    this.metadataPush((Payload)this.payloadDecoder.apply(frame));
                    break;
                }
                case PAYLOAD: {
                    break;
                }
                case LEASE: {
                    break;
                }
                case NEXT: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onNext(this.payloadDecoder.apply(frame));
                    break;
                }
                case COMPLETE: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onComplete();
                    break;
                }
                case ERROR: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onError((Throwable)new ApplicationErrorException(ErrorFrameFlyweight.dataUtf8(frame)));
                    break;
                }
                case NEXT_COMPLETE: {
                    Subscriber receiver = (Subscriber)this.channelProcessors.get(streamId);
                    if (receiver == null) break;
                    receiver.onNext(this.payloadDecoder.apply(frame));
                    receiver.onComplete();
                    break;
                }
                case SETUP: {
                    this.handleError(streamId, new IllegalStateException("Setup frame received post setup."));
                    break;
                }
                default: {
                    this.handleError(streamId, new IllegalStateException("ServerRSocket: Unexpected frame type: " + (Object)((Object)frameType)));
                }
            }
            ReferenceCountUtil.safeRelease((Object)frame);
        }
        catch (Throwable t) {
            ReferenceCountUtil.safeRelease((Object)frame);
            throw Exceptions.propagate((Throwable)t);
        }
    }

    private void handleFireAndForget(final int streamId, Mono<Void> result) {
        result.subscribe((CoreSubscriber)new BaseSubscriber<Void>(){

            protected void hookOnSubscribe(Subscription subscription) {
                RSocketServer.this.sendingSubscriptions.put(streamId, subscription);
                subscription.request(Long.MAX_VALUE);
            }

            protected void hookOnError(Throwable throwable) {
                RSocketServer.this.errorConsumer.accept(throwable);
            }

            protected void hookFinally(SignalType type) {
                RSocketServer.this.sendingSubscriptions.remove(streamId);
            }
        });
    }

    private void handleRequestResponse(final int streamId, Mono<Payload> response) {
        response.subscribe((CoreSubscriber)new BaseSubscriber<Payload>(){
            private boolean isEmpty = true;

            protected void hookOnSubscribe(Subscription subscription) {
                RSocketServer.this.sendingSubscriptions.put(streamId, subscription);
                subscription.request(Long.MAX_VALUE);
            }

            protected void hookOnNext(Payload payload) {
                ByteBuf byteBuf;
                if (this.isEmpty) {
                    this.isEmpty = false;
                }
                try {
                    byteBuf = PayloadFrameFlyweight.encodeNextComplete(RSocketServer.this.allocator, streamId, payload);
                }
                catch (Throwable t) {
                    payload.release();
                    throw Exceptions.propagate((Throwable)t);
                }
                payload.release();
                RSocketServer.this.sendProcessor.onNext(byteBuf);
            }

            protected void hookOnError(Throwable throwable) {
                RSocketServer.this.handleError(streamId, throwable);
            }

            protected void hookOnComplete() {
                if (this.isEmpty) {
                    RSocketServer.this.sendProcessor.onNext(PayloadFrameFlyweight.encodeComplete(RSocketServer.this.allocator, streamId));
                }
            }

            protected void hookFinally(SignalType type) {
                RSocketServer.this.sendingSubscriptions.remove(streamId);
            }
        });
    }

    private void handleStream(final int streamId, Flux<Payload> response, int initialRequestN) {
        response.transform(frameFlux -> {
            LimitableRequestPublisher payloads = LimitableRequestPublisher.wrap(frameFlux, this.sendProcessor.available());
            this.sendingLimitableSubscriptions.put(streamId, payloads);
            payloads.request(initialRequestN >= Integer.MAX_VALUE ? Long.MAX_VALUE : (long)initialRequestN);
            return payloads;
        }).subscribe((CoreSubscriber)new BaseSubscriber<Payload>(){

            protected void hookOnNext(Payload payload) {
                ByteBuf byteBuf;
                try {
                    byteBuf = PayloadFrameFlyweight.encodeNext(RSocketServer.this.allocator, streamId, payload);
                }
                catch (Throwable t) {
                    payload.release();
                    throw Exceptions.propagate((Throwable)t);
                }
                payload.release();
                RSocketServer.this.sendProcessor.onNext(byteBuf);
            }

            protected void hookOnComplete() {
                RSocketServer.this.sendProcessor.onNext(PayloadFrameFlyweight.encodeComplete(RSocketServer.this.allocator, streamId));
            }

            protected void hookOnError(Throwable throwable) {
                RSocketServer.this.handleError(streamId, throwable);
            }

            protected void hookFinally(SignalType type) {
                RSocketServer.this.sendingLimitableSubscriptions.remove(streamId);
            }
        });
    }

    private void handleChannel(int streamId, Payload payload, int initialRequestN) {
        UnicastProcessor frames = UnicastProcessor.create();
        this.channelProcessors.put(streamId, (Processor<Payload, Payload>)frames);
        Flux payloads = frames.doOnCancel(() -> this.sendProcessor.onNext(CancelFrameFlyweight.encode(this.allocator, streamId))).doOnError(t -> this.handleError(streamId, (Throwable)t)).doOnRequest(l -> this.sendProcessor.onNext(RequestNFrameFlyweight.encode(this.allocator, streamId, l))).doFinally(signalType -> this.channelProcessors.remove(streamId));
        frames.onNext((Object)payload);
        if (this.responderRSocket != null) {
            this.handleStream(streamId, this.requestChannel(payload, (Publisher<Payload>)payloads), initialRequestN);
        } else {
            this.handleStream(streamId, this.requestChannel((Publisher<Payload>)payloads), initialRequestN);
        }
    }

    private void handleCancelFrame(int streamId) {
        Subscription subscription = this.sendingSubscriptions.remove(streamId);
        if (subscription == null) {
            subscription = this.sendingLimitableSubscriptions.get(streamId);
        }
        if (subscription != null) {
            subscription.cancel();
        }
    }

    private void handleError(int streamId, Throwable t) {
        this.errorConsumer.accept(t);
        this.sendProcessor.onNext(ErrorFrameFlyweight.encode(this.allocator, streamId, t));
    }

    private void handleRequestN(int streamId, ByteBuf frame) {
        Subscription subscription = this.sendingSubscriptions.get(streamId);
        if (subscription == null) {
            subscription = this.sendingLimitableSubscriptions.get(streamId);
        }
        if (subscription != null) {
            int n = RequestNFrameFlyweight.requestN(frame);
            subscription.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : (long)n);
        }
    }
}

