/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.FireAndForgetRequesterMono;
import io.rsocket.core.FrameHandler;
import io.rsocket.core.MetadataPushRequesterMono;
import io.rsocket.core.RequestChannelRequesterFlux;
import io.rsocket.core.RequestResponseRequesterMono;
import io.rsocket.core.RequestStreamRequesterFlux;
import io.rsocket.core.RequesterLeaseTracker;
import io.rsocket.core.RequesterResponderSupport;
import io.rsocket.core.SlowFireAndForgetRequesterMono;
import io.rsocket.core.StreamIdSupplier;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.keepalive.KeepAliveHandler;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.plugins.RequestInterceptor;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

class RSocketRequester
extends RequesterResponderSupport
implements RSocket {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketRequester.class);
    private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
    private volatile Throwable terminationError;
    private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR;
    @Nullable
    private final RequesterLeaseTracker requesterLeaseTracker;
    private final Sinks.Empty<Void> onThisSideClosedSink;
    private final Mono<Void> onAllClosed;
    private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;

    RSocketRequester(DuplexConnection connection, PayloadDecoder payloadDecoder, StreamIdSupplier streamIdSupplier, int mtu, int maxFrameLength, int maxInboundPayloadSize, int keepAliveTickPeriod, int keepAliveAckTimeout, @Nullable KeepAliveHandler keepAliveHandler, Function<RSocket, RequestInterceptor> requestInterceptorFunction, @Nullable RequesterLeaseTracker requesterLeaseTracker, Sinks.Empty<Void> onThisSideClosedSink, Mono<Void> onAllClosed) {
        super(mtu, maxFrameLength, maxInboundPayloadSize, payloadDecoder, connection, streamIdSupplier, requestInterceptorFunction);
        this.requesterLeaseTracker = requesterLeaseTracker;
        this.onThisSideClosedSink = onThisSideClosedSink;
        this.onAllClosed = onAllClosed;
        connection.onClose().subscribe(null, this::tryShutdown, this::tryShutdown);
        connection.receive().subscribe(this::handleIncomingFrames, e -> {});
        if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
            KeepAliveSupport.ClientKeepAliveSupport keepAliveSupport = new KeepAliveSupport.ClientKeepAliveSupport(this.getAllocator(), keepAliveTickPeriod, keepAliveAckTimeout);
            this.keepAliveFramesAcceptor = keepAliveHandler.start(keepAliveSupport, keepAliveFrame -> connection.sendFrame(0, (ByteBuf)keepAliveFrame), this::tryTerminateOnKeepAlive);
        } else {
            this.keepAliveFramesAcceptor = null;
        }
    }

    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        if (this.requesterLeaseTracker == null) {
            return new FireAndForgetRequesterMono(payload, this);
        }
        return new SlowFireAndForgetRequesterMono(payload, this);
    }

    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        return new RequestResponseRequesterMono(payload, this);
    }

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return new RequestStreamRequesterFlux(payload, this);
    }

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return new RequestChannelRequesterFlux(payloads, this);
    }

    @Override
    public Mono<Void> metadataPush(Payload payload) {
        Throwable terminationError = this.terminationError;
        if (terminationError != null) {
            payload.release();
            return Mono.error((Throwable)terminationError);
        }
        return new MetadataPushRequesterMono(payload, this);
    }

    @Override
    public RequesterLeaseTracker getRequesterLeaseTracker() {
        return this.requesterLeaseTracker;
    }

    @Override
    public int getNextStreamId() {
        int nextStreamId = super.getNextStreamId();
        Throwable terminationError = this.terminationError;
        if (terminationError != null) {
            throw Exceptions.propagate((Throwable)terminationError);
        }
        return nextStreamId;
    }

    @Override
    public int addAndGetNextStreamId(FrameHandler frameHandler) {
        int nextStreamId = super.addAndGetNextStreamId(frameHandler);
        Throwable terminationError = this.terminationError;
        if (terminationError != null) {
            super.remove(nextStreamId, frameHandler);
            throw Exceptions.propagate((Throwable)terminationError);
        }
        return nextStreamId;
    }

    @Override
    public double availability() {
        RequesterLeaseTracker requesterLeaseTracker = this.requesterLeaseTracker;
        if (requesterLeaseTracker != null) {
            return Math.min(this.getDuplexConnection().availability(), requesterLeaseTracker.availability());
        }
        return this.getDuplexConnection().availability();
    }

    @Override
    public void dispose() {
        if (this.terminationError != null) {
            return;
        }
        this.getDuplexConnection().sendErrorAndClose(new ConnectionErrorException("Disposed"));
    }

    @Override
    public boolean isDisposed() {
        return this.terminationError != null;
    }

    @Override
    public Mono<Void> onClose() {
        return this.onAllClosed;
    }

    private void handleIncomingFrames(ByteBuf frame) {
        try {
            int streamId = FrameHeaderCodec.streamId(frame);
            FrameType type = FrameHeaderCodec.frameType(frame);
            if (streamId == 0) {
                this.handleStreamZero(type, frame);
            } else {
                this.handleFrame(streamId, type, frame);
            }
        }
        catch (Throwable t) {
            LOGGER.error("Unexpected error during frame handling", t);
            ConnectionErrorException error = new ConnectionErrorException("Unexpected error during frame handling", t);
            this.getDuplexConnection().sendErrorAndClose(error);
        }
    }

    private void handleStreamZero(FrameType type, ByteBuf frame) {
        switch (type) {
            case ERROR: {
                this.tryTerminateOnZeroError(frame);
                break;
            }
            case LEASE: {
                this.requesterLeaseTracker.handleLeaseFrame(frame);
                break;
            }
            case KEEPALIVE: {
                if (this.keepAliveFramesAcceptor == null) break;
                this.keepAliveFramesAcceptor.receive(frame);
                break;
            }
            default: {
                if (!LOGGER.isInfoEnabled()) break;
                LOGGER.info("Requester received unsupported frame on stream 0: " + frame.toString());
            }
        }
    }

    private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
        FrameHandler receiver = this.get(streamId);
        if (receiver == null) {
            this.handleMissingResponseProcessor(streamId, type, frame);
            return;
        }
        switch (type) {
            case NEXT_COMPLETE: {
                receiver.handleNext(frame, false, true);
                break;
            }
            case NEXT: {
                boolean hasFollows = FrameHeaderCodec.hasFollows(frame);
                receiver.handleNext(frame, hasFollows, false);
                break;
            }
            case COMPLETE: {
                receiver.handleComplete();
                break;
            }
            case ERROR: {
                receiver.handleError(io.rsocket.exceptions.Exceptions.from(streamId, frame));
                break;
            }
            case CANCEL: {
                receiver.handleCancel();
                break;
            }
            case REQUEST_N: {
                long n = RequestNFrameCodec.requestN(frame);
                receiver.handleRequestN(n);
                break;
            }
            default: {
                throw new IllegalStateException("Requester received unsupported frame on stream " + streamId + ": " + frame.toString());
            }
        }
    }

    private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBuf frame) {
        if (!this.streamIdSupplier.isBeforeOrCurrent(streamId)) {
            if (type == FrameType.ERROR) {
                String errorMessage = ErrorFrameCodec.dataUtf8(frame);
                throw new IllegalStateException("Client received error for non-existent stream: " + streamId + " Message: " + errorMessage);
            }
            throw new IllegalStateException("Client received message for non-existent stream: " + streamId + ", frame type: " + (Object)((Object)type));
        }
    }

    private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
        this.tryTerminate(() -> new ConnectionErrorException(String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
        this.getDuplexConnection().dispose();
    }

    private void tryShutdown(Throwable e) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("trying to close requester " + this.getDuplexConnection());
        }
        if (this.terminationError == null) {
            if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
                this.terminate(CLOSED_CHANNEL_EXCEPTION);
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("trying to close requester failed because of " + this.terminationError + " " + this.getDuplexConnection());
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.info("trying to close requester failed because of " + this.terminationError + " " + this.getDuplexConnection());
        }
    }

    private void tryTerminateOnZeroError(ByteBuf errorFrame) {
        this.tryTerminate(() -> io.rsocket.exceptions.Exceptions.from(0, errorFrame));
    }

    private void tryTerminate(Supplier<Throwable> errorSupplier) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("trying to close requester " + this.getDuplexConnection());
        }
        if (this.terminationError == null) {
            Throwable e = errorSupplier.get();
            if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
                this.terminate(e);
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("trying to close requester failed because of " + this.terminationError + " " + this.getDuplexConnection());
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("trying to close requester failed because of " + this.terminationError + " " + this.getDuplexConnection());
        }
    }

    private void tryShutdown() {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("trying to close requester " + this.getDuplexConnection());
        }
        if (this.terminationError == null) {
            if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) {
                this.terminate(CLOSED_CHANNEL_EXCEPTION);
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("trying to close requester failed because of " + this.terminationError + " " + this.getDuplexConnection());
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("trying to close requester failed because of " + this.terminationError + " " + this.getDuplexConnection());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void terminate(Throwable e) {
        ArrayList activeStreamsCopy;
        RequesterLeaseTracker requesterLeaseTracker;
        RequestInterceptor requestInterceptor;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("closing requester " + this.getDuplexConnection() + " due to " + e);
        }
        if (this.keepAliveFramesAcceptor != null) {
            this.keepAliveFramesAcceptor.dispose();
        }
        if ((requestInterceptor = this.getRequestInterceptor()) != null) {
            requestInterceptor.dispose();
        }
        if ((requesterLeaseTracker = this.requesterLeaseTracker) != null) {
            requesterLeaseTracker.dispose(e);
        }
        RSocketRequester rSocketRequester = this;
        synchronized (rSocketRequester) {
            IntObjectMap activeStreams = this.activeStreams;
            activeStreamsCopy = new ArrayList(activeStreams.values());
        }
        for (FrameHandler handler : activeStreamsCopy) {
            if (handler == null) continue;
            try {
                handler.handleError(e);
            }
            catch (Throwable throwable) {}
        }
        if (e == CLOSED_CHANNEL_EXCEPTION) {
            this.onThisSideClosedSink.tryEmitEmpty();
        } else {
            this.onThisSideClosedSink.tryEmitError(e);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("requester closed " + this.getDuplexConnection());
        }
    }

    static {
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
        TERMINATION_ERROR = AtomicReferenceFieldUpdater.newUpdater(RSocketRequester.class, Throwable.class, "terminationError");
    }
}

