/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.quic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.quic.DefaultQuicStreamFrame;
import io.netty.handler.codec.quic.QuicStreamChannel;
import io.netty.handler.codec.quic.QuicStreamFrame;
import io.netty.handler.codec.quic.QuicStreamType;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.quic.QuicInbound;
import reactor.netty.quic.QuicOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;

class QuicStreamOperations
extends ChannelOperations<QuicInbound, QuicOutbound>
implements QuicInbound,
QuicOutbound {
    static final AtomicIntegerFieldUpdater<QuicStreamOperations> FIN_SENT = AtomicIntegerFieldUpdater.newUpdater(QuicStreamOperations.class, "finSent");
    volatile int finSent;
    static final Logger log = Loggers.getLogger(QuicStreamOperations.class);
    static final String INBOUND_CANCEL_LOG = "Quic inbound stream cancelled, sending WRITE_FIN.";

    QuicStreamOperations(Connection connection, ConnectionObserver listener) {
        super(connection, listener);
        this.markPersistent(false);
    }

    public String asLongText() {
        return this.asShortText() + ", " + this.channel().localAddress();
    }

    @Override
    public boolean isLocalStream() {
        return ((QuicStreamChannel)this.connection().channel()).isLocalCreated();
    }

    public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<ByteBuf> predicate) {
        Objects.requireNonNull(predicate, "predicate");
        if (!this.channel().isActive()) {
            return this.then((Publisher)Mono.error((Throwable)AbortedException.beforeSend()));
        }
        if (dataStream instanceof Mono) {
            return this.then((Publisher)((Mono)dataStream).flatMap(m -> {
                if (this.markFinSent()) {
                    return FutureMono.from((Future)this.channel().writeAndFlush((Object)new DefaultQuicStreamFrame(m, true)));
                }
                return FutureMono.from((Future)this.channel().writeAndFlush(m));
            }).doOnDiscard(ByteBuf.class, ReferenceCounted::release));
        }
        return super.send(dataStream, predicate);
    }

    public NettyOutbound sendObject(Object message) {
        if (!this.channel().isActive()) {
            ReactorNetty.safeRelease((Object)message);
            return this.then((Publisher)Mono.error((Throwable)AbortedException.beforeSend()));
        }
        if (!(message instanceof ByteBuf)) {
            return super.sendObject(message);
        }
        ByteBuf buffer = (ByteBuf)message;
        return this.then((Publisher)FutureMono.deferFuture(() -> {
            if (this.markFinSent()) {
                return this.connection().channel().writeAndFlush((Object)new DefaultQuicStreamFrame(buffer, true));
            }
            return this.connection().channel().writeAndFlush((Object)buffer);
        }), () -> ReactorNetty.safeRelease((Object)buffer));
    }

    @Override
    public long streamId() {
        return ((QuicStreamChannel)this.connection().channel()).streamId();
    }

    @Override
    public QuicStreamType streamType() {
        return ((QuicStreamChannel)this.connection().channel()).type();
    }

    protected void onInboundCancel() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format((Channel)this.channel(), (String)INBOUND_CANCEL_LOG));
        }
        this.sendFinNow(f -> this.terminate());
    }

    protected void onOutboundError(Throwable err) {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format((Channel)this.channel(), (String)"Outbound error happened. Sending WRITE_FIN."), err);
        }
        this.sendFinNow(f -> this.terminate());
    }

    protected final void onInboundComplete() {
        super.onInboundComplete();
    }

    final boolean markFinSent() {
        return FIN_SENT.compareAndSet(this, 0, 1);
    }

    final void sendFinNow() {
        this.sendFinNow(null);
    }

    final void sendFinNow(@Nullable ChannelFutureListener listener) {
        if (this.markFinSent()) {
            ChannelFuture f = this.channel().writeAndFlush((Object)QuicStreamFrame.EMPTY_FIN);
            if (listener != null) {
                f.addListener((GenericFutureListener)listener);
            }
        }
    }

    static void callTerminate(Channel ch) {
        ChannelOperations ops = QuicStreamOperations.get((Channel)ch);
        if (ops == null) {
            return;
        }
        ((QuicStreamOperations)ops).terminate();
    }
}

