/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.core.PayloadValidationUtils;
import io.rsocket.core.ReassemblyUtils;
import io.rsocket.core.RequesterFrameHandler;
import io.rsocket.core.RequesterResponderSupport;
import io.rsocket.core.SendUtils;
import io.rsocket.core.StateUtils;
import io.rsocket.frame.CancelFrameCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.RequestNFrameCodec;
import io.rsocket.frame.decoder.PayloadDecoder;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

final class RequestStreamRequesterFlux
extends Flux<Payload>
implements RequesterFrameHandler,
Subscription,
Scannable {
    final ByteBufAllocator allocator;
    final Payload payload;
    final int mtu;
    final int maxFrameLength;
    final int maxInboundPayloadSize;
    final RequesterResponderSupport requesterResponderSupport;
    final DuplexConnection connection;
    final PayloadDecoder payloadDecoder;
    volatile long state;
    static final AtomicLongFieldUpdater<RequestStreamRequesterFlux> STATE = AtomicLongFieldUpdater.newUpdater(RequestStreamRequesterFlux.class, "state");
    int streamId;
    CoreSubscriber<? super Payload> inboundSubscriber;
    CompositeByteBuf frames;
    boolean done;

    RequestStreamRequesterFlux(Payload payload, RequesterResponderSupport requesterResponderSupport) {
        this.allocator = requesterResponderSupport.getAllocator();
        this.payload = payload;
        this.mtu = requesterResponderSupport.getMtu();
        this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
        this.maxInboundPayloadSize = requesterResponderSupport.getMaxInboundPayloadSize();
        this.requesterResponderSupport = requesterResponderSupport;
        this.connection = requesterResponderSupport.getDuplexConnection();
        this.payloadDecoder = requesterResponderSupport.getPayloadDecoder();
    }

    public void subscribe(CoreSubscriber<? super Payload> actual) {
        long previousState = StateUtils.markSubscribed(STATE, this);
        if (StateUtils.isSubscribedOrTerminated(previousState)) {
            Operators.error(actual, (Throwable)new IllegalStateException("RequestStreamFlux allows only a single Subscriber"));
            return;
        }
        Payload p = this.payload;
        try {
            if (!PayloadValidationUtils.isValid(this.mtu, this.maxFrameLength, p, false)) {
                StateUtils.lazyTerminate(STATE, this);
                Operators.error(actual, (Throwable)new IllegalArgumentException(String.format("The payload is too big to be send as a single frame with a max frame length %s. Consider enabling fragmentation.", this.maxFrameLength)));
                p.release();
                return;
            }
        }
        catch (IllegalReferenceCountException e) {
            StateUtils.lazyTerminate(STATE, this);
            Operators.error(actual, (Throwable)e);
            return;
        }
        this.inboundSubscriber = actual;
        actual.onSubscribe((Subscription)this);
    }

    public final void request(long n) {
        if (!Operators.validate((long)n)) {
            return;
        }
        long previousState = StateUtils.addRequestN(STATE, this, n);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        if (StateUtils.hasRequested(previousState)) {
            if (StateUtils.isFirstFrameSent(previousState) && !StateUtils.isMaxAllowedRequestN(StateUtils.extractRequestN(previousState))) {
                int streamId = this.streamId;
                ByteBuf requestNFrame = RequestNFrameCodec.encode(this.allocator, streamId, n);
                this.connection.sendFrame(streamId, requestNFrame);
            }
            return;
        }
        this.sendFirstPayload(this.payload, n);
    }

    void sendFirstPayload(Payload payload, long initialRequestN) {
        int streamId;
        RequesterResponderSupport sm = this.requesterResponderSupport;
        DuplexConnection connection = this.connection;
        ByteBufAllocator allocator = this.allocator;
        try {
            this.streamId = streamId = sm.addAndGetNextStreamId(this);
        }
        catch (Throwable t) {
            this.done = true;
            long previousState = StateUtils.markTerminated(STATE, this);
            payload.release();
            if (!StateUtils.isTerminated(previousState)) {
                this.inboundSubscriber.onError(Exceptions.unwrap((Throwable)t));
            }
            return;
        }
        try {
            SendUtils.sendReleasingPayload(streamId, FrameType.REQUEST_STREAM, initialRequestN, this.mtu, payload, connection, allocator, false);
        }
        catch (Throwable e) {
            this.done = true;
            StateUtils.lazyTerminate(STATE, this);
            sm.remove(streamId, this);
            this.inboundSubscriber.onError(e);
            return;
        }
        long previousState = StateUtils.markFirstFrameSent(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            if (this.done) {
                return;
            }
            sm.remove(streamId, this);
            ByteBuf cancelFrame = CancelFrameCodec.encode(allocator, streamId);
            connection.sendFrame(streamId, cancelFrame);
            return;
        }
        if (StateUtils.isMaxAllowedRequestN(initialRequestN)) {
            return;
        }
        long requestN = StateUtils.extractRequestN(previousState);
        if (StateUtils.isMaxAllowedRequestN(requestN)) {
            ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN);
            connection.sendFrame(streamId, requestNFrame);
            return;
        }
        if (requestN > initialRequestN) {
            ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, streamId, requestN - initialRequestN);
            connection.sendFrame(streamId, requestNFrame);
        }
    }

    public final void cancel() {
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        if (StateUtils.isFirstFrameSent(previousState)) {
            int streamId = this.streamId;
            this.requesterResponderSupport.remove(streamId, this);
            ReassemblyUtils.synchronizedRelease(this, previousState);
            this.connection.sendFrame(streamId, CancelFrameCodec.encode(this.allocator, streamId));
        } else if (!StateUtils.hasRequested(previousState)) {
            this.payload.release();
        }
    }

    @Override
    public final void handlePayload(Payload p) {
        if (this.done) {
            p.release();
            return;
        }
        this.inboundSubscriber.onNext((Object)p);
    }

    @Override
    public final void handleComplete() {
        if (this.done) {
            return;
        }
        this.done = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            return;
        }
        this.requesterResponderSupport.remove(this.streamId, this);
        this.inboundSubscriber.onComplete();
    }

    @Override
    public final void handleError(Throwable cause) {
        if (this.done) {
            Operators.onErrorDropped((Throwable)cause, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        this.done = true;
        long previousState = StateUtils.markTerminated(STATE, this);
        if (StateUtils.isTerminated(previousState)) {
            Operators.onErrorDropped((Throwable)cause, (Context)this.inboundSubscriber.currentContext());
            return;
        }
        this.requesterResponderSupport.remove(this.streamId, this);
        ReassemblyUtils.synchronizedRelease(this, previousState);
        this.inboundSubscriber.onError(cause);
    }

    @Override
    public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload) {
        ReassemblyUtils.handleNextSupport(STATE, this, this, this.inboundSubscriber, this.payloadDecoder, this.allocator, this.maxInboundPayloadSize, frame, hasFollows, isLastPayload);
    }

    @Override
    public CompositeByteBuf getFrames() {
        return this.frames;
    }

    @Override
    public void setFrames(CompositeByteBuf byteBuf) {
        this.frames = byteBuf;
    }

    @Nullable
    public Object scanUnsafe(Scannable.Attr key) {
        long state = this.state;
        if (key == Scannable.Attr.TERMINATED) {
            return StateUtils.isTerminated(state);
        }
        if (key == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
            return StateUtils.extractRequestN(state);
        }
        return null;
    }

    @NonNull
    public String stepName() {
        return "source(RequestStreamFlux)";
    }
}

