/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCounted;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperationsHandler;
import reactor.netty.channel.FluxReceive;
import reactor.netty.channel.MonoSendMany;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

public class ChannelOperations<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound>
implements NettyInbound,
NettyOutbound,
Connection,
CoreSubscriber<Void> {
    final Connection connection;
    final FluxReceive inbound;
    final ConnectionObserver listener;
    final MonoProcessor<Void> onTerminate;
    volatile Subscription outboundSubscription;
    static final Logger log = Loggers.getLogger(ChannelOperations.class);
    static final Object TERMINATED_OPS = new Object();
    static final OnSetup EMPTY_SETUP = (c, l, msg) -> null;
    static final AtomicReferenceFieldUpdater<ChannelOperations, Subscription> OUTBOUND_CLOSE = AtomicReferenceFieldUpdater.newUpdater(ChannelOperations.class, Subscription.class, "outboundSubscription");

    public static void addReactiveBridge(Channel ch, OnSetup opsFactory, ConnectionObserver listener) {
        ch.pipeline().addLast("reactor.right.reactiveBridge", (ChannelHandler)new ChannelOperationsHandler(opsFactory, listener));
    }

    @Nullable
    public static ChannelOperations<?, ?> get(Channel ch) {
        return Connection.from(ch).as(ChannelOperations.class);
    }

    protected ChannelOperations(ChannelOperations<INBOUND, OUTBOUND> replaced) {
        this.connection = replaced.connection;
        this.listener = replaced.listener;
        this.onTerminate = replaced.onTerminate;
        this.inbound = new FluxReceive(this);
    }

    public ChannelOperations(Connection connection, ConnectionObserver listener) {
        this.connection = Objects.requireNonNull(connection, "connection");
        this.listener = Objects.requireNonNull(listener, "listener");
        this.onTerminate = MonoProcessor.create();
        this.inbound = new FluxReceive(this);
    }

    @Override
    @Nullable
    public <T extends Connection> T as(Class<T> clazz) {
        if (clazz == ChannelOperations.class) {
            ChannelOperations thiz = this;
            return (T)thiz;
        }
        return Connection.super.as(clazz);
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.connection.channel().alloc();
    }

    @Override
    public NettyInbound inbound() {
        return this;
    }

    @Override
    public NettyOutbound outbound() {
        return this;
    }

    @Override
    public final Channel channel() {
        return this.connection.channel();
    }

    @Override
    public ChannelOperations<INBOUND, OUTBOUND> withConnection(Consumer<? super Connection> withConnection) {
        withConnection.accept(this);
        return this;
    }

    @Override
    public void dispose() {
        if (!this.inbound.isDisposed()) {
            this.inbound.cancel();
        }
        this.connection.dispose();
    }

    @Override
    public CoreSubscriber<Void> disposeSubscriber() {
        return this;
    }

    @Override
    public final boolean isDisposed() {
        return !this.channel().isActive() || this.isSubscriptionDisposed();
    }

    public final boolean isSubscriptionDisposed() {
        return OUTBOUND_CLOSE.get(this) == Operators.cancelledSubscription();
    }

    @Override
    public final Mono<Void> onDispose() {
        return this.connection.onDispose();
    }

    @Override
    public Connection onDispose(Disposable onDispose) {
        this.connection.onDispose(onDispose);
        return this;
    }

    @Override
    public final void onComplete() {
        if (this.isDisposed()) {
            return;
        }
        OUTBOUND_CLOSE.set(this, Operators.cancelledSubscription());
        this.onOutboundComplete();
    }

    @Override
    public final void onError(Throwable t) {
        if (this.isDisposed()) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(this.channel(), "An outbound error could not be processed"), t);
            }
            return;
        }
        OUTBOUND_CLOSE.set(this, Operators.cancelledSubscription());
        this.onOutboundError(t);
    }

    @Override
    public final void onNext(Void aVoid) {
    }

    @Override
    public final void onSubscribe(Subscription s) {
        if (Operators.setOnce(OUTBOUND_CLOSE, this, s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    @Override
    public Flux<?> receiveObject() {
        return this.inbound;
    }

    @Override
    public ByteBufFlux receive() {
        return ByteBufFlux.fromInbound(this.receiveObject(), this.connection.channel().alloc());
    }

    @Override
    public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<ByteBuf> predicate) {
        if (!this.channel().isActive()) {
            return this.then(Mono.error(new AbortedException("Connection has been closed")));
        }
        if (dataStream instanceof Mono) {
            return this.then(((Mono)dataStream).flatMap(m -> FutureMono.from(this.channel().writeAndFlush(m))).doOnDiscard(ByteBuf.class, ReferenceCounted::release));
        }
        return this.then(MonoSendMany.byteBufSource(dataStream, this.channel(), predicate));
    }

    @Override
    public NettyOutbound sendObject(Publisher<?> dataStream, Predicate<Object> predicate) {
        if (!this.channel().isActive()) {
            return this.then(Mono.error(new AbortedException("Connection has been closed")));
        }
        if (dataStream instanceof Mono) {
            return this.then(((Mono)dataStream).flatMap(m -> FutureMono.from(this.channel().writeAndFlush(m))).doOnDiscard(ReferenceCounted.class, ReferenceCounted::release));
        }
        return this.then(MonoSendMany.objectSource(dataStream, this.channel(), predicate));
    }

    @Override
    public NettyOutbound sendObject(Object message) {
        return this.then(FutureMono.deferFuture(() -> this.connection.channel().writeAndFlush(message)), () -> ReactorNetty.safeRelease(message));
    }

    @Override
    public <S> NettyOutbound sendUsing(Callable<? extends S> sourceInput, BiFunction<? super Connection, ? super S, ?> mappedInput, Consumer<? super S> sourceCleanup) {
        Objects.requireNonNull(sourceInput, "sourceInput");
        Objects.requireNonNull(mappedInput, "mappedInput");
        Objects.requireNonNull(sourceCleanup, "sourceCleanup");
        return this.then(Mono.using(sourceInput, s -> FutureMono.from(this.connection.channel().writeAndFlush(mappedInput.apply(this, (Object)s))), sourceCleanup));
    }

    @Override
    public final Mono<Void> onTerminate() {
        if (!this.isPersistent()) {
            return this.connection.onDispose();
        }
        return this.onTerminate.or(this.connection.onDispose());
    }

    public final ConnectionObserver listener() {
        return this.listener;
    }

    public String toString() {
        return "ChannelOperations{" + this.connection.toString() + "}";
    }

    public final void discard() {
        this.inbound.cancel();
    }

    public final boolean isInboundCancelled() {
        return this.inbound.isCancelled();
    }

    public final boolean isInboundDisposed() {
        return this.inbound.isDisposed();
    }

    protected void onInboundNext(ChannelHandlerContext ctx, Object msg) {
        this.inbound.onInboundNext(msg);
    }

    protected void onInboundCancel() {
    }

    protected void onInboundComplete() {
        this.inbound.onInboundComplete();
    }

    protected void onInboundClose() {
        if (this.inbound.receiver == null) {
            this.inbound.cancel();
        }
        this.terminate();
    }

    protected void onOutboundComplete() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(this.channel(), "[{}] User Handler requesting close connection"), this.formatName());
        }
        this.markPersistent(false);
        this.terminate();
    }

    protected void onOutboundError(Throwable err) {
        this.markPersistent(false);
        this.terminate();
    }

    protected final void terminate() {
        if (this.rebind(this.connection)) {
            if (log.isTraceEnabled()) {
                log.trace(ReactorNetty.format(this.channel(), "Disposing ChannelOperation from a channel"), new Exception("ChannelOperation terminal stack"));
            }
            Operators.terminate(OUTBOUND_CLOSE, this);
            this.onInboundComplete();
            this.onTerminate.onComplete();
            this.listener.onStateChange(this, ConnectionObserver.State.DISCONNECTING);
        }
    }

    protected final void onInboundError(Throwable err) {
        this.inbound.onInboundError(err);
    }

    protected final Connection connection() {
        return this.connection;
    }

    protected final String formatName() {
        return this.getClass().getSimpleName().replace("Operations", "");
    }

    @Override
    public boolean isPersistent() {
        return this.connection.isPersistent();
    }

    @Override
    public Context currentContext() {
        return this.listener.currentContext();
    }

    @FunctionalInterface
    public static interface OnSetup {
        public static OnSetup empty() {
            return EMPTY_SETUP;
        }

        @Nullable
        public ChannelOperations<?, ?> create(Connection var1, ConnectionObserver var2, @Nullable Object var3);
    }
}

