/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.exceptions.IncompatibleSchemaException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ChunkedInputPlus;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Crc;
import org.apache.cassandra.net.FrameDecoder;
import org.apache.cassandra.net.InboundMessageCallbacks;
import org.apache.cassandra.net.InvalidSerializedSizeException;
import org.apache.cassandra.net.ManyToOneConcurrentLinkedQueue;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.net.ShareableBytes;
import org.apache.cassandra.net.SocketFactory;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InboundMessageHandler
extends ChannelInboundHandlerAdapter
implements FrameDecoder.FrameProcessor {
    private static final Logger logger = LoggerFactory.getLogger(InboundMessageHandler.class);
    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
    private static final Message.Serializer serializer = Message.serializer;
    private final FrameDecoder decoder;
    private final ConnectionType type;
    private final Channel channel;
    private final InetAddressAndPort self;
    private final InetAddressAndPort peer;
    private final int version;
    private final int largeThreshold;
    private LargeMessage largeMessage;
    private final long queueCapacity;
    volatile long queueSize = 0L;
    private static final AtomicLongFieldUpdater<InboundMessageHandler> queueSizeUpdater = AtomicLongFieldUpdater.newUpdater(InboundMessageHandler.class, "queueSize");
    private final ResourceLimits.Limit endpointReserveCapacity;
    private final WaitQueue endpointWaitQueue;
    private final ResourceLimits.Limit globalReserveCapacity;
    private final WaitQueue globalWaitQueue;
    private final OnHandlerClosed onClosed;
    private final InboundMessageCallbacks callbacks;
    private final Consumer<Message<?>> consumer;
    private WaitQueue.Ticket ticket = null;
    long corruptFramesRecovered;
    long corruptFramesUnrecovered;
    long receivedCount;
    long receivedBytes;
    long throttledCount;
    long throttledNanos;
    private boolean isClosed;

    InboundMessageHandler(FrameDecoder decoder, ConnectionType type, Channel channel, InetAddressAndPort self, InetAddressAndPort peer, int version, int largeThreshold, long queueCapacity, ResourceLimits.Limit endpointReserveCapacity, ResourceLimits.Limit globalReserveCapacity, WaitQueue endpointWaitQueue, WaitQueue globalWaitQueue, OnHandlerClosed onClosed, InboundMessageCallbacks callbacks, Consumer<Message<?>> consumer) {
        this.decoder = decoder;
        this.type = type;
        this.channel = channel;
        this.self = self;
        this.peer = peer;
        this.version = version;
        this.largeThreshold = largeThreshold;
        this.queueCapacity = queueCapacity;
        this.endpointReserveCapacity = endpointReserveCapacity;
        this.endpointWaitQueue = endpointWaitQueue;
        this.globalReserveCapacity = globalReserveCapacity;
        this.globalWaitQueue = globalWaitQueue;
        this.onClosed = onClosed;
        this.callbacks = callbacks;
        this.consumer = consumer;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        throw new IllegalStateException("InboundMessageHandler doesn't expect channelRead() to be invoked");
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.decoder.activate(this);
    }

    @Override
    public boolean process(FrameDecoder.Frame frame) throws IOException {
        if (frame instanceof FrameDecoder.IntactFrame) {
            return this.processIntactFrame((FrameDecoder.IntactFrame)frame, this.endpointReserveCapacity, this.globalReserveCapacity);
        }
        this.processCorruptFrame((FrameDecoder.CorruptFrame)frame);
        return true;
    }

    private boolean processIntactFrame(FrameDecoder.IntactFrame frame, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        if (frame.isSelfContained) {
            return this.processFrameOfContainedMessages(frame.contents, endpointReserve, globalReserve);
        }
        if (null == this.largeMessage) {
            return this.processFirstFrameOfLargeMessage(frame, endpointReserve, globalReserve);
        }
        return this.processSubsequentFrameOfLargeMessage(frame);
    }

    private boolean processFrameOfContainedMessages(ShareableBytes bytes, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        while (bytes.hasRemaining()) {
            if (this.processOneContainedMessage(bytes, endpointReserve, globalReserve)) continue;
            return false;
        }
        return true;
    }

    private boolean processOneContainedMessage(ShareableBytes bytes, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        ByteBuffer buf = bytes.get();
        long currentTimeNanos = MonotonicClock.approxTime.now();
        Message.Header header = serializer.extractHeader(buf, this.peer, currentTimeNanos, this.version);
        long timeElapsed = currentTimeNanos - header.createdAtNanos;
        int size = serializer.inferMessageSize(buf, buf.position(), buf.limit(), this.version);
        if (MonotonicClock.approxTime.isAfter(currentTimeNanos, header.expiresAtNanos)) {
            this.callbacks.onHeaderArrived(size, header, timeElapsed, TimeUnit.NANOSECONDS);
            this.callbacks.onArrivedExpired(size, header, false, timeElapsed, TimeUnit.NANOSECONDS);
            ++this.receivedCount;
            this.receivedBytes += (long)size;
            bytes.skipBytes(size);
            return true;
        }
        if (!this.acquireCapacity(endpointReserve, globalReserve, size, currentTimeNanos, header.expiresAtNanos)) {
            return false;
        }
        this.callbacks.onHeaderArrived(size, header, timeElapsed, TimeUnit.NANOSECONDS);
        this.callbacks.onArrived(size, header, timeElapsed, TimeUnit.NANOSECONDS);
        ++this.receivedCount;
        this.receivedBytes += (long)size;
        if (size <= this.largeThreshold) {
            this.processSmallMessage(bytes, size, header);
        } else {
            this.processLargeMessage(bytes, size, header);
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processSmallMessage(ShareableBytes bytes, int size, Message.Header header) {
        ByteBuffer buf = bytes.get();
        int begin = buf.position();
        int end = buf.limit();
        buf.limit(begin + size);
        Message message = null;
        try (DataInputBuffer in = new DataInputBuffer(buf, false);){
            Message m = serializer.deserialize((DataInputPlus)in, header, this.version);
            if (in.available() > 0) {
                throw new InvalidSerializedSizeException(header.verb, size, size - in.available());
            }
            message = m;
        }
        catch (IncompatibleSchemaException e) {
            this.callbacks.onFailedDeserialize(size, header, e);
            noSpamLogger.info("{} incompatible schema encountered while deserializing a message", this.id(), e);
        }
        catch (Throwable t) {
            JVMStabilityInspector.inspectThrowable(t, false);
            this.callbacks.onFailedDeserialize(size, header, t);
            logger.error("{} unexpected exception caught while deserializing a message", (Object)this.id(), (Object)t);
        }
        finally {
            if (null == message) {
                this.releaseCapacity(size);
            }
            buf.position(begin + size);
            buf.limit(end);
        }
        if (null != message) {
            this.dispatch(new ProcessSmallMessage(message, size));
        }
    }

    private void processLargeMessage(ShareableBytes bytes, int size, Message.Header header) {
        new LargeMessage(size, header, bytes.sliceAndConsume(size).share()).schedule();
    }

    private boolean processFirstFrameOfLargeMessage(FrameDecoder.IntactFrame frame, ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        ShareableBytes bytes = frame.contents;
        ByteBuffer buf = bytes.get();
        long currentTimeNanos = MonotonicClock.approxTime.now();
        Message.Header header = serializer.extractHeader(buf, this.peer, currentTimeNanos, this.version);
        int size = serializer.inferMessageSize(buf, buf.position(), buf.limit(), this.version);
        boolean expired = MonotonicClock.approxTime.isAfter(currentTimeNanos, header.expiresAtNanos);
        if (!expired && !this.acquireCapacity(endpointReserve, globalReserve, size, currentTimeNanos, header.expiresAtNanos)) {
            return false;
        }
        this.callbacks.onHeaderArrived(size, header, currentTimeNanos - header.createdAtNanos, TimeUnit.NANOSECONDS);
        this.receivedBytes += (long)buf.remaining();
        this.largeMessage = new LargeMessage(size, header, expired);
        this.largeMessage.supply(frame);
        return true;
    }

    private boolean processSubsequentFrameOfLargeMessage(FrameDecoder.Frame frame) {
        this.receivedBytes += (long)frame.frameSize;
        if (this.largeMessage.supply(frame)) {
            ++this.receivedCount;
            this.largeMessage = null;
        }
        return true;
    }

    private void processCorruptFrame(FrameDecoder.CorruptFrame frame) throws Crc.InvalidCrc {
        if (!frame.isRecoverable()) {
            ++this.corruptFramesUnrecovered;
            throw new Crc.InvalidCrc(frame.readCRC, frame.computedCRC);
        }
        if (frame.isSelfContained) {
            this.receivedBytes += (long)frame.frameSize;
            ++this.corruptFramesRecovered;
            noSpamLogger.warn("{} invalid, recoverable CRC mismatch detected while reading messages (corrupted self-contained frame)", this.id());
        } else {
            if (null == this.largeMessage) {
                this.receivedBytes += (long)frame.frameSize;
                ++this.corruptFramesUnrecovered;
                noSpamLogger.error("{} invalid, unrecoverable CRC mismatch detected while reading messages (corrupted first frame of a large message)", this.id());
                throw new Crc.InvalidCrc(frame.readCRC, frame.computedCRC);
            }
            this.processSubsequentFrameOfLargeMessage(frame);
            ++this.corruptFramesRecovered;
            noSpamLogger.warn("{} invalid, recoverable CRC mismatch detected while reading a large message", this.id());
        }
    }

    private void onEndpointReserveCapacityRegained(ResourceLimits.Limit endpointReserve, long elapsedNanos) {
        this.onReserveCapacityRegained(endpointReserve, this.globalReserveCapacity, elapsedNanos);
    }

    private void onGlobalReserveCapacityRegained(ResourceLimits.Limit globalReserve, long elapsedNanos) {
        this.onReserveCapacityRegained(this.endpointReserveCapacity, globalReserve, elapsedNanos);
    }

    private void onReserveCapacityRegained(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve, long elapsedNanos) {
        if (this.isClosed) {
            return;
        }
        assert (this.channel.eventLoop().inEventLoop());
        this.ticket = null;
        this.throttledNanos += elapsedNanos;
        try {
            if (this.processUpToOneMessage(endpointReserve, globalReserve)) {
                this.decoder.reactivate();
            }
        }
        catch (Throwable t) {
            this.exceptionCaught(t);
        }
    }

    private boolean processUpToOneMessage(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) throws IOException {
        UpToOneMessageFrameProcessor processor = new UpToOneMessageFrameProcessor(endpointReserve, globalReserve);
        this.decoder.processBacklog(processor);
        return processor.isActive;
    }

    private boolean acquireCapacity(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve, int bytes, long currentTimeNanos, long expiresAtNanos) {
        ResourceLimits.Outcome outcome = this.acquireCapacity(endpointReserve, globalReserve, bytes);
        if (outcome == ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT) {
            this.ticket = this.endpointWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos);
        } else if (outcome == ResourceLimits.Outcome.INSUFFICIENT_GLOBAL) {
            this.ticket = this.globalWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos);
        }
        if (outcome != ResourceLimits.Outcome.SUCCESS) {
            ++this.throttledCount;
        }
        return outcome == ResourceLimits.Outcome.SUCCESS;
    }

    private ResourceLimits.Outcome acquireCapacity(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve, int bytes) {
        long currentQueueSize = this.queueSize;
        if (currentQueueSize + (long)bytes <= this.queueCapacity) {
            queueSizeUpdater.addAndGet(this, bytes);
            return ResourceLimits.Outcome.SUCCESS;
        }
        long allocatedExcess = Math.min(currentQueueSize + (long)bytes - this.queueCapacity, (long)bytes);
        if (!globalReserve.tryAllocate(allocatedExcess)) {
            return ResourceLimits.Outcome.INSUFFICIENT_GLOBAL;
        }
        if (!endpointReserve.tryAllocate(allocatedExcess)) {
            globalReserve.release(allocatedExcess);
            this.globalWaitQueue.signal();
            return ResourceLimits.Outcome.INSUFFICIENT_GLOBAL;
        }
        long newQueueSize = queueSizeUpdater.addAndGet(this, bytes);
        long actualExcess = Math.max(0L, Math.min(newQueueSize - this.queueCapacity, (long)bytes));
        if (actualExcess != allocatedExcess) {
            long excess = allocatedExcess - actualExcess;
            endpointReserve.release(excess);
            globalReserve.release(excess);
            this.endpointWaitQueue.signal();
            this.globalWaitQueue.signal();
        }
        return ResourceLimits.Outcome.SUCCESS;
    }

    private void releaseCapacity(int bytes) {
        long oldQueueSize = queueSizeUpdater.getAndAdd(this, -bytes);
        if (oldQueueSize > this.queueCapacity) {
            long excess = Math.min(oldQueueSize - this.queueCapacity, (long)bytes);
            this.endpointReserveCapacity.release(excess);
            this.globalReserveCapacity.release(excess);
            this.endpointWaitQueue.signal();
            this.globalWaitQueue.signal();
        }
    }

    @VisibleForTesting
    protected void releaseProcessedCapacity(int size, Message.Header header) {
        this.releaseCapacity(size);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        try {
            this.exceptionCaught(cause);
        }
        catch (Throwable t) {
            logger.error("Unexpected exception in {}.exceptionCaught", (Object)this.getClass().getSimpleName(), (Object)t);
        }
    }

    private void exceptionCaught(Throwable cause) {
        this.decoder.discard();
        JVMStabilityInspector.inspectThrowable(cause, false);
        if (cause instanceof Message.InvalidLegacyProtocolMagic) {
            logger.error("{} invalid, unrecoverable CRC mismatch detected while reading messages - closing the connection", (Object)this.id());
        } else {
            logger.error("{} unexpected exception caught while processing inbound messages; terminating connection", (Object)this.id(), (Object)cause);
        }
        this.channel.close();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.isClosed = true;
        if (null != this.largeMessage) {
            this.largeMessage.abort();
        }
        if (null != this.ticket) {
            this.ticket.invalidate();
        }
        this.onClosed.call(this);
    }

    private EventLoop eventLoop() {
        return this.channel.eventLoop();
    }

    String id(boolean includeReal) {
        if (!includeReal) {
            return this.id();
        }
        return SocketFactory.channelId(this.peer, (InetSocketAddress)this.channel.remoteAddress(), this.self, (InetSocketAddress)this.channel.localAddress(), this.type, this.channel.id().asShortText());
    }

    String id() {
        return SocketFactory.channelId(this.peer, this.self, this.type, this.channel.id().asShortText());
    }

    private void dispatch(ProcessMessage task) {
        Message.Header header = task.header();
        TraceState state = Tracing.instance.initializeFromMessage(header);
        if (state != null) {
            state.trace("{} message received from {}", (Object)header.verb, (Object)header.from);
        }
        this.callbacks.onDispatched(task.size(), header);
        header.verb.stage.execute(task, ExecutorLocals.create(state));
    }

    public static interface OnHandlerClosed {
        public void call(InboundMessageHandler var1);
    }

    public static final class WaitQueue {
        private static final int NOT_RUNNING = 0;
        private static final int RUNNING = 1;
        private static final int RUN_AGAIN = 2;
        private volatile int scheduled;
        private static final AtomicIntegerFieldUpdater<WaitQueue> scheduledUpdater = AtomicIntegerFieldUpdater.newUpdater(WaitQueue.class, "scheduled");
        private final Kind kind;
        private final ResourceLimits.Limit reserveCapacity;
        private final ManyToOneConcurrentLinkedQueue<Ticket> queue = new ManyToOneConcurrentLinkedQueue();

        private WaitQueue(Kind kind, ResourceLimits.Limit reserveCapacity) {
            this.kind = kind;
            this.reserveCapacity = reserveCapacity;
        }

        public static WaitQueue endpoint(ResourceLimits.Limit endpointReserveCapacity) {
            return new WaitQueue(Kind.ENDPOINT, endpointReserveCapacity);
        }

        public static WaitQueue global(ResourceLimits.Limit globalReserveCapacity) {
            return new WaitQueue(Kind.GLOBAL, globalReserveCapacity);
        }

        private Ticket register(InboundMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos) {
            Ticket ticket = new Ticket(this, handler, bytesRequested, registeredAtNanos, expiresAtNanos);
            Ticket previous = this.queue.relaxedPeekLastAndOffer(ticket);
            if (null == previous || !previous.isWaiting()) {
                this.signal();
            }
            return ticket;
        }

        private void signal() {
            if (this.queue.relaxedIsEmpty()) {
                return;
            }
            if (0 == scheduledUpdater.getAndUpdate(this, i -> Math.min(2, i + 1))) {
                do {
                    this.schedule();
                } while (2 == scheduledUpdater.getAndDecrement(this));
            }
        }

        private void schedule() {
            Ticket t;
            IdentityHashMap<EventLoop, ReactivateHandlers> tasks = null;
            long currentTimeNanos = MonotonicClock.approxTime.now();
            while ((t = this.queue.peek()) != null) {
                if (!t.call()) {
                    this.queue.remove();
                    continue;
                }
                boolean isLive = t.isLive(currentTimeNanos);
                if (isLive && !this.reserveCapacity.tryAllocate(t.bytesRequested)) {
                    if (t.reset()) break;
                    this.queue.remove();
                    continue;
                }
                if (null == tasks) {
                    tasks = new IdentityHashMap<EventLoop, ReactivateHandlers>();
                }
                this.queue.remove();
                tasks.computeIfAbsent(t.handler.eventLoop(), e -> new ReactivateHandlers()).add(t, isLive);
            }
            if (null != tasks) {
                tasks.forEach(Executor::execute);
            }
        }

        private static final class Ticket {
            private static final int WAITING = 0;
            private static final int CALLED = 1;
            private static final int INVALIDATED = 2;
            private volatile int state;
            private static final AtomicIntegerFieldUpdater<Ticket> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(Ticket.class, "state");
            private final WaitQueue waitQueue;
            private final InboundMessageHandler handler;
            private final int bytesRequested;
            private final long reigsteredAtNanos;
            private final long expiresAtNanos;

            private Ticket(WaitQueue waitQueue, InboundMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos) {
                this.waitQueue = waitQueue;
                this.handler = handler;
                this.bytesRequested = bytesRequested;
                this.reigsteredAtNanos = registeredAtNanos;
                this.expiresAtNanos = expiresAtNanos;
            }

            private void reactivateHandler(ResourceLimits.Limit capacity) {
                long elapsedNanos = MonotonicClock.approxTime.now() - this.reigsteredAtNanos;
                try {
                    if (this.waitQueue.kind == Kind.ENDPOINT) {
                        this.handler.onEndpointReserveCapacityRegained(capacity, elapsedNanos);
                    } else {
                        this.handler.onGlobalReserveCapacityRegained(capacity, elapsedNanos);
                    }
                }
                catch (Throwable t) {
                    logger.error("{} exception caught while reactivating a handler", (Object)this.handler.id(), (Object)t);
                }
            }

            private boolean isWaiting() {
                return this.state == 0;
            }

            private boolean isLive(long currentTimeNanos) {
                return !MonotonicClock.approxTime.isAfter(currentTimeNanos, this.expiresAtNanos);
            }

            private void invalidate() {
                this.state = 2;
                this.waitQueue.signal();
            }

            private boolean call() {
                return stateUpdater.compareAndSet(this, 0, 1);
            }

            private boolean reset() {
                return stateUpdater.compareAndSet(this, 1, 0);
            }
        }

        private class ReactivateHandlers
        implements Runnable {
            List<Ticket> tickets = new ArrayList<Ticket>();
            long capacity = 0L;

            private ReactivateHandlers() {
            }

            private void add(Ticket ticket, boolean isLive) {
                this.tickets.add(ticket);
                if (isLive) {
                    this.capacity += (long)ticket.bytesRequested;
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ResourceLimits.Basic limit = new ResourceLimits.Basic(this.capacity);
                try {
                    for (Ticket ticket : this.tickets) {
                        ticket.reactivateHandler(limit);
                    }
                }
                finally {
                    long remaining = limit.remaining();
                    if (remaining > 0L) {
                        WaitQueue.this.reserveCapacity.release(remaining);
                        WaitQueue.this.signal();
                    }
                }
            }
        }

        static enum Kind {
            ENDPOINT,
            GLOBAL;

        }
    }

    private class ProcessLargeMessage
    extends ProcessMessage {
        private final LargeMessage message;

        ProcessLargeMessage(LargeMessage message) {
            this.message = message;
        }

        @Override
        int size() {
            return this.message.size;
        }

        @Override
        Message.Header header() {
            return this.message.header;
        }

        @Override
        Message provideMessage() {
            return this.message.deserialize();
        }

        @Override
        void releaseResources() {
            this.message.releaseBuffers();
        }
    }

    private class ProcessSmallMessage
    extends ProcessMessage {
        private final int size;
        private final Message message;

        ProcessSmallMessage(Message message, int size) {
            this.size = size;
            this.message = message;
        }

        @Override
        int size() {
            return this.size;
        }

        @Override
        Message.Header header() {
            return this.message.header;
        }

        @Override
        Message provideMessage() {
            return this.message;
        }
    }

    private abstract class ProcessMessage
    implements Runnable {
        private ProcessMessage() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Message.Header header = this.header();
            long currentTimeNanos = MonotonicClock.approxTime.now();
            boolean expired = MonotonicClock.approxTime.isAfter(currentTimeNanos, header.expiresAtNanos);
            boolean processed = false;
            try {
                InboundMessageHandler.this.callbacks.onExecuting(this.size(), header, currentTimeNanos - header.createdAtNanos, TimeUnit.NANOSECONDS);
                if (expired) {
                    InboundMessageHandler.this.callbacks.onExpired(this.size(), header, currentTimeNanos - header.createdAtNanos, TimeUnit.NANOSECONDS);
                    return;
                }
                Message message = this.provideMessage();
                if (null != message) {
                    InboundMessageHandler.this.consumer.accept(message);
                    processed = true;
                    InboundMessageHandler.this.callbacks.onProcessed(this.size(), header);
                }
            }
            finally {
                if (processed) {
                    InboundMessageHandler.this.releaseProcessedCapacity(this.size(), header);
                } else {
                    InboundMessageHandler.this.releaseCapacity(this.size());
                }
                this.releaseResources();
                InboundMessageHandler.this.callbacks.onExecuted(this.size(), header, MonotonicClock.approxTime.now() - currentTimeNanos, TimeUnit.NANOSECONDS);
            }
        }

        abstract int size();

        abstract Message.Header header();

        abstract Message provideMessage();

        void releaseResources() {
        }
    }

    private class LargeMessage {
        private final int size;
        private final Message.Header header;
        private final List<ShareableBytes> buffers = new ArrayList<ShareableBytes>();
        private int received;
        private boolean isExpired;
        private boolean isCorrupt;

        private LargeMessage(int size, Message.Header header, boolean isExpired) {
            this.size = size;
            this.header = header;
            this.isExpired = isExpired;
        }

        private LargeMessage(int size, Message.Header header, ShareableBytes bytes) {
            this(size, header, false);
            this.buffers.add(bytes);
        }

        private void schedule() {
            InboundMessageHandler.this.dispatch(new ProcessLargeMessage(this));
        }

        private boolean supply(FrameDecoder.Frame frame) {
            if (frame instanceof FrameDecoder.IntactFrame) {
                this.onIntactFrame((FrameDecoder.IntactFrame)frame);
            } else {
                this.onCorruptFrame();
            }
            this.received += frame.frameSize;
            if (this.size == this.received) {
                this.onComplete();
            }
            return this.size == this.received;
        }

        private void onIntactFrame(FrameDecoder.IntactFrame frame) {
            boolean expires = MonotonicClock.approxTime.isAfter(this.header.expiresAtNanos);
            if (!this.isExpired && !this.isCorrupt) {
                if (!expires) {
                    this.buffers.add(frame.contents.sliceAndConsume(frame.frameSize).share());
                    return;
                }
                this.releaseBuffersAndCapacity();
            }
            frame.consume();
            this.isExpired |= expires;
        }

        private void onCorruptFrame() {
            if (!this.isExpired && !this.isCorrupt) {
                this.releaseBuffersAndCapacity();
            }
            this.isCorrupt = true;
            this.isExpired |= MonotonicClock.approxTime.isAfter(this.header.expiresAtNanos);
        }

        private void onComplete() {
            long timeElapsed = MonotonicClock.approxTime.now() - this.header.createdAtNanos;
            if (!this.isExpired && !this.isCorrupt) {
                InboundMessageHandler.this.callbacks.onArrived(this.size, this.header, timeElapsed, TimeUnit.NANOSECONDS);
                this.schedule();
            } else if (this.isExpired) {
                InboundMessageHandler.this.callbacks.onArrivedExpired(this.size, this.header, this.isCorrupt, timeElapsed, TimeUnit.NANOSECONDS);
            } else {
                InboundMessageHandler.this.callbacks.onArrivedCorrupt(this.size, this.header, timeElapsed, TimeUnit.NANOSECONDS);
            }
        }

        private void abort() {
            if (!this.isExpired && !this.isCorrupt) {
                this.releaseBuffersAndCapacity();
            }
            InboundMessageHandler.this.callbacks.onClosedBeforeArrival(this.size, this.header, this.received, this.isCorrupt, this.isExpired);
        }

        private void releaseBuffers() {
            this.buffers.forEach(ShareableBytes::release);
            this.buffers.clear();
        }

        private void releaseBuffersAndCapacity() {
            this.releaseBuffers();
            InboundMessageHandler.this.releaseCapacity(this.size);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Loose catch block
         */
        private Message deserialize() {
            try {
                Message message;
                Throwable throwable;
                ChunkedInputPlus input;
                block20: {
                    block21: {
                        input = ChunkedInputPlus.of(this.buffers);
                        throwable = null;
                        Message m = serializer.deserialize((DataInputPlus)input, this.header, InboundMessageHandler.this.version);
                        int remainder = input.remainder();
                        if (remainder > 0) {
                            throw new InvalidSerializedSizeException(this.header.verb, this.size, this.size - remainder);
                        }
                        message = m;
                        if (input == null) break block20;
                        if (throwable == null) break block21;
                        try {
                            input.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                        break block20;
                    }
                    input.close();
                }
                return message;
                catch (Throwable throwable3) {
                    try {
                        try {
                            throwable = throwable3;
                            throw throwable3;
                        }
                        catch (Throwable throwable4) {
                            if (input != null) {
                                if (throwable != null) {
                                    try {
                                        input.close();
                                    }
                                    catch (Throwable throwable5) {
                                        throwable.addSuppressed(throwable5);
                                    }
                                } else {
                                    input.close();
                                }
                            }
                            throw throwable4;
                        }
                    }
                    catch (IncompatibleSchemaException e) {
                        InboundMessageHandler.this.callbacks.onFailedDeserialize(this.size, this.header, e);
                        noSpamLogger.info("{} incompatible schema encountered while deserializing a message", InboundMessageHandler.this.id(), e);
                    }
                    catch (Throwable t) {
                        JVMStabilityInspector.inspectThrowable(t, false);
                        InboundMessageHandler.this.callbacks.onFailedDeserialize(this.size, this.header, t);
                        logger.error("{} unexpected exception caught while deserializing a message", (Object)InboundMessageHandler.this.id(), (Object)t);
                    }
                }
            }
            finally {
                this.buffers.clear();
            }
            return null;
        }
    }

    private class UpToOneMessageFrameProcessor
    implements FrameDecoder.FrameProcessor {
        private final ResourceLimits.Limit endpointReserve;
        private final ResourceLimits.Limit globalReserve;
        boolean isActive = true;
        boolean firstFrame = true;

        private UpToOneMessageFrameProcessor(ResourceLimits.Limit endpointReserve, ResourceLimits.Limit globalReserve) {
            this.endpointReserve = endpointReserve;
            this.globalReserve = globalReserve;
        }

        @Override
        public boolean process(FrameDecoder.Frame frame) throws IOException {
            if (this.firstFrame) {
                if (!(frame instanceof FrameDecoder.IntactFrame)) {
                    throw new IllegalStateException("First backlog frame must be intact");
                }
                this.firstFrame = false;
                return this.processFirstFrame((FrameDecoder.IntactFrame)frame);
            }
            return this.processSubsequentFrame(frame);
        }

        private boolean processFirstFrame(FrameDecoder.IntactFrame frame) throws IOException {
            if (frame.isSelfContained) {
                this.isActive = InboundMessageHandler.this.processOneContainedMessage(frame.contents, this.endpointReserve, this.globalReserve);
                return false;
            }
            this.isActive = InboundMessageHandler.this.processFirstFrameOfLargeMessage(frame, this.endpointReserve, this.globalReserve);
            return this.isActive;
        }

        private boolean processSubsequentFrame(FrameDecoder.Frame frame) throws IOException {
            if (frame instanceof FrameDecoder.IntactFrame) {
                InboundMessageHandler.this.processSubsequentFrameOfLargeMessage(frame);
            } else {
                InboundMessageHandler.this.processCorruptFrame((FrameDecoder.CorruptFrame)frame);
            }
            return InboundMessageHandler.this.largeMessage != null;
        }
    }
}

