/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming.async;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.AsyncStreamingInputPlus;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamInitMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingInboundHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(StreamingInboundHandler.class);
    private static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
    private static volatile boolean trackInboundHandlers = false;
    private static Collection<StreamingInboundHandler> inboundHandlers;
    private final InetAddressAndPort remoteAddress;
    private final int protocolVersion;
    private final StreamSession session;
    private AsyncStreamingInputPlus buffers;
    private volatile boolean closed;

    public StreamingInboundHandler(InetAddressAndPort remoteAddress, int protocolVersion, @Nullable StreamSession session) {
        this.remoteAddress = remoteAddress;
        this.protocolVersion = protocolVersion;
        this.session = session;
        if (trackInboundHandlers) {
            inboundHandlers.add(this);
        }
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.buffers = new AsyncStreamingInputPlus(ctx.channel());
        FastThreadLocalThread blockingIOThread = new FastThreadLocalThread((Runnable)new StreamDeserializingTask(DEFAULT_SESSION_PROVIDER, this.session, ctx.channel()), String.format("Stream-Deserializer-%s-%s", this.remoteAddress.toString(), ctx.channel().id()));
        blockingIOThread.setDaemon(true);
        blockingIOThread.start();
    }

    public void channelRead(ChannelHandlerContext ctx, Object message) {
        if (this.closed || !(message instanceof ByteBuf) || !this.buffers.append((ByteBuf)message)) {
            ReferenceCountUtil.release((Object)message);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.close();
        ctx.fireChannelInactive();
    }

    void close() {
        this.closed = true;
        this.buffers.requestClosure();
        if (trackInboundHandlers) {
            inboundHandlers.remove((Object)this);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof IOException) {
            logger.trace("connection problem while streaming", cause);
        } else {
            logger.warn("exception occurred while in processing streaming data", cause);
        }
        this.close();
    }

    void setPendingBuffers(AsyncStreamingInputPlus bufChannel) {
        this.buffers = bufChannel;
    }

    @VisibleForTesting
    public static void shutdown() {
        assert (trackInboundHandlers) : "in-JVM tests required tracking of inbound streaming handlers";
        inboundHandlers.forEach(StreamingInboundHandler::close);
        inboundHandlers.clear();
    }

    public static void trackInboundHandlers() {
        inboundHandlers = Collections.newSetFromMap(new ConcurrentHashMap());
        trackInboundHandlers = true;
    }

    static /* synthetic */ AsyncStreamingInputPlus access$000(StreamingInboundHandler x0) {
        return x0.buffers;
    }

    static /* synthetic */ boolean access$100(StreamingInboundHandler x0) {
        return x0.closed;
    }

    static /* synthetic */ int access$200(StreamingInboundHandler x0) {
        return x0.protocolVersion;
    }

    static /* synthetic */ Logger access$300() {
        return logger;
    }

    static /* synthetic */ InetAddressAndPort access$400(StreamingInboundHandler x0) {
        return x0.remoteAddress;
    }

    static /* synthetic */ boolean access$102(StreamingInboundHandler x0, boolean x1) {
        x0.closed = x1;
        return x0.closed;
    }

    static class SessionIdentifier {
        final InetAddressAndPort from;
        final UUID planId;
        final int sessionIndex;

        SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex) {
            this.from = from;
            this.planId = planId;
            this.sessionIndex = sessionIndex;
        }
    }

    class StreamDeserializingTask
    implements Runnable {
        private final Function<SessionIdentifier, StreamSession> sessionProvider;
        private final Channel channel;
        @VisibleForTesting
        StreamSession session;

        StreamDeserializingTask(Function<SessionIdentifier, StreamSession> sessionProvider, StreamSession session, Channel channel) {
            this.sessionProvider = sessionProvider;
            this.session = session;
            this.channel = channel;
        }

        /*
         * Exception decompiling
         */
        @Override
        public void run() {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [9[WHILELOOP]], but top level block is 2[TRYBLOCK]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        StreamSession deriveSession(StreamMessage message) {
            StreamSession streamSession = null;
            if (message instanceof StreamInitMessage) {
                assert (this.session == null) : "initiator of stream session received a StreamInitMessage";
                StreamInitMessage init = (StreamInitMessage)message;
                StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this.channel, init.pendingRepair, init.previewKind);
                streamSession = this.sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex));
            } else if (message instanceof IncomingStreamMessage) {
                StreamMessageHeader header = ((IncomingStreamMessage)message).header;
                streamSession = this.sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex));
            }
            if (streamSession == null) {
                throw new IllegalStateException(NettyStreamingMessageSender.createLogTag(null, this.channel) + " no session found for message " + message);
            }
            streamSession.attach(this.channel);
            return streamSession;
        }
    }
}

