/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ProtocolException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.buffer.FullyFilledBuffer;
import org.apache.flink.runtime.io.network.netty.NetworkBufferAllocator;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufOutputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundInvoker;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public abstract class NettyMessage {
    static final int FRAME_HEADER_LENGTH = 9;
    static final int MAGIC_NUMBER = -1159983106;

    abstract void write(ChannelOutboundInvoker var1, ChannelPromise var2, ByteBufAllocator var3) throws IOException;

    private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
        return NettyMessage.allocateBuffer(allocator, id, -1);
    }

    private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int contentLength) {
        return NettyMessage.allocateBuffer(allocator, id, 0, contentLength, true);
    }

    private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int messageHeaderLength, int contentLength, boolean allocateForContent) {
        Preconditions.checkArgument(contentLength <= 0x7FFFFFF6);
        ByteBuf buffer = !allocateForContent ? allocator.directBuffer(9 + messageHeaderLength) : (contentLength != -1 ? allocator.directBuffer(9 + messageHeaderLength + contentLength) : allocator.directBuffer());
        buffer.writeInt(9 + messageHeaderLength + contentLength);
        buffer.writeInt(-1159983106);
        buffer.writeByte(id);
        return buffer;
    }

    void writeToChannel(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator, Consumer<ByteBuf> consumer, byte id, int length) throws IOException {
        ByteBuf byteBuf = null;
        try {
            byteBuf = NettyMessage.allocateBuffer(allocator, id, length);
            consumer.accept(byteBuf);
            out.write(byteBuf, promise);
        }
        catch (Throwable t) {
            this.handleException(byteBuf, null, t);
        }
    }

    void handleException(@Nullable ByteBuf byteBuf, @Nullable Buffer buffer, Throwable t) throws IOException {
        if (byteBuf != null) {
            byteBuf.release();
        }
        if (buffer != null) {
            buffer.recycleBuffer();
        }
        ExceptionUtils.rethrowIOException(t);
    }

    static class SegmentId
    extends NettyMessage {
        private static final byte ID = 11;
        final int subpartitionId;
        final int segmentId;
        final InputChannelID receiverId;

        SegmentId(int subpartitionId, int segmentId, InputChannelID receiverId) {
            this.subpartitionId = subpartitionId;
            Preconditions.checkArgument((long)segmentId > 0L, "The segmentId should be greater than 0");
            this.segmentId = segmentId;
            this.receiverId = receiverId;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)11, 8 + InputChannelID.getByteBufLength());
                result.writeInt(this.subpartitionId);
                result.writeInt(this.segmentId);
                this.receiverId.writeTo(result);
                out.write(result, promise);
            }
            catch (Throwable t) {
                this.handleException(result, null, t);
            }
        }

        static SegmentId readFrom(ByteBuf buffer) {
            int subpartitionId = buffer.readInt();
            int segmentId = buffer.readInt();
            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
            return new SegmentId(subpartitionId, segmentId, receiverId);
        }

        public String toString() {
            return String.format("SegmentId(%s : %d)", this.receiverId, this.segmentId);
        }
    }

    static class NewBufferSize
    extends NettyMessage {
        private static final byte ID = 10;
        final int bufferSize;
        final InputChannelID receiverId;

        NewBufferSize(int bufferSize, InputChannelID receiverId) {
            Preconditions.checkArgument(bufferSize > 0, "The new buffer size should be greater than 0");
            this.bufferSize = bufferSize;
            this.receiverId = receiverId;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)10, 4 + InputChannelID.getByteBufLength());
                result.writeInt(this.bufferSize);
                this.receiverId.writeTo(result);
                out.write(result, promise);
            }
            catch (Throwable t) {
                this.handleException(result, null, t);
            }
        }

        static NewBufferSize readFrom(ByteBuf buffer) {
            int bufferSize = buffer.readInt();
            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
            return new NewBufferSize(bufferSize, receiverId);
        }

        public String toString() {
            return String.format("NewBufferSize(%s : %d)", this.receiverId, this.bufferSize);
        }
    }

    static class BacklogAnnouncement
    extends NettyMessage {
        static final byte ID = 9;
        final int backlog;
        final InputChannelID receiverId;

        BacklogAnnouncement(int backlog, InputChannelID receiverId) {
            Preconditions.checkArgument(backlog > 0, "Must be positive.");
            Preconditions.checkArgument(receiverId != null, "Must be not null.");
            this.backlog = backlog;
            this.receiverId = receiverId;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)9, 4 + InputChannelID.getByteBufLength());
                result.writeInt(this.backlog);
                this.receiverId.writeTo(result);
                out.write(result, promise);
            }
            catch (Throwable t) {
                this.handleException(result, null, t);
            }
        }

        static BacklogAnnouncement readFrom(ByteBuf buffer) {
            int backlog = buffer.readInt();
            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
            return new BacklogAnnouncement(backlog, receiverId);
        }

        public String toString() {
            return String.format("BacklogAnnouncement(%d : %s)", this.backlog, this.receiverId);
        }
    }

    static class AckAllUserRecordsProcessed
    extends NettyMessage {
        private static final byte ID = 8;
        final InputChannelID receiverId;

        AckAllUserRecordsProcessed(InputChannelID receiverId) {
            this.receiverId = receiverId;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            this.writeToChannel(out, promise, allocator, this.receiverId::writeTo, (byte)8, InputChannelID.getByteBufLength());
        }

        static AckAllUserRecordsProcessed readFrom(ByteBuf buffer) {
            return new AckAllUserRecordsProcessed(InputChannelID.fromByteBuf(buffer));
        }

        public String toString() {
            return String.format("AckAllUserRecordsProcessed(%s)", this.receiverId);
        }
    }

    static class ResumeConsumption
    extends NettyMessage {
        private static final byte ID = 7;
        final InputChannelID receiverId;

        ResumeConsumption(InputChannelID receiverId) {
            this.receiverId = receiverId;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            this.writeToChannel(out, promise, allocator, this.receiverId::writeTo, (byte)7, InputChannelID.getByteBufLength());
        }

        static ResumeConsumption readFrom(ByteBuf buffer) {
            return new ResumeConsumption(InputChannelID.fromByteBuf(buffer));
        }

        public String toString() {
            return String.format("ResumeConsumption(%s)", this.receiverId);
        }
    }

    static class AddCredit
    extends NettyMessage {
        private static final byte ID = 6;
        final int credit;
        final InputChannelID receiverId;

        AddCredit(int credit, InputChannelID receiverId) {
            Preconditions.checkArgument(credit > 0, "The announced credit should be greater than 0");
            this.credit = credit;
            this.receiverId = receiverId;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuf result = null;
            try {
                result = NettyMessage.allocateBuffer(allocator, (byte)6, 4 + InputChannelID.getByteBufLength());
                result.writeInt(this.credit);
                this.receiverId.writeTo(result);
                out.write(result, promise);
            }
            catch (Throwable t) {
                this.handleException(result, null, t);
            }
        }

        static AddCredit readFrom(ByteBuf buffer) {
            int credit = buffer.readInt();
            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
            return new AddCredit(credit, receiverId);
        }

        public String toString() {
            return String.format("AddCredit(%s : %d)", this.receiverId, this.credit);
        }
    }

    static class CloseRequest
    extends NettyMessage {
        private static final byte ID = 5;

        CloseRequest() {
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            this.writeToChannel(out, promise, allocator, ignored -> {}, (byte)5, 0);
        }

        static CloseRequest readFrom(ByteBuf buffer) throws Exception {
            return new CloseRequest();
        }
    }

    static class CancelPartitionRequest
    extends NettyMessage {
        private static final byte ID = 4;
        final InputChannelID receiverId;

        CancelPartitionRequest(InputChannelID receiverId) {
            this.receiverId = Preconditions.checkNotNull(receiverId);
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            this.writeToChannel(out, promise, allocator, this.receiverId::writeTo, (byte)4, InputChannelID.getByteBufLength());
        }

        static CancelPartitionRequest readFrom(ByteBuf buffer) throws Exception {
            return new CancelPartitionRequest(InputChannelID.fromByteBuf(buffer));
        }
    }

    static class TaskEventRequest
    extends NettyMessage {
        private static final byte ID = 3;
        final TaskEvent event;
        final InputChannelID receiverId;
        final ResultPartitionID partitionId;

        TaskEventRequest(TaskEvent event, ResultPartitionID partitionId, InputChannelID receiverId) {
            this.event = Preconditions.checkNotNull(event);
            this.receiverId = Preconditions.checkNotNull(receiverId);
            this.partitionId = Preconditions.checkNotNull(partitionId);
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(this.event);
            Consumer<ByteBuf> consumer = bb -> {
                bb.writeInt(serializedEvent.remaining());
                bb.writeBytes(serializedEvent);
                this.partitionId.getPartitionId().writeTo((ByteBuf)bb);
                this.partitionId.getProducerId().writeTo((ByteBuf)bb);
                this.receiverId.writeTo((ByteBuf)bb);
            };
            this.writeToChannel(out, promise, allocator, consumer, (byte)3, 4 + serializedEvent.remaining() + IntermediateResultPartitionID.getByteBufLength() + ExecutionAttemptID.getByteBufLength() + InputChannelID.getByteBufLength());
        }

        static TaskEventRequest readFrom(ByteBuf buffer, ClassLoader classLoader) throws IOException {
            int length = buffer.readInt();
            ByteBuffer serializedEvent = buffer.nioBuffer(buffer.readerIndex(), length);
            buffer.readerIndex(buffer.readerIndex() + length);
            TaskEvent event = (TaskEvent)EventSerializer.fromSerializedEvent(serializedEvent, classLoader);
            ResultPartitionID partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
            return new TaskEventRequest(event, partitionId, receiverId);
        }
    }

    static class PartitionRequest
    extends NettyMessage {
        private static final byte ID = 2;
        final ResultPartitionID partitionId;
        final ResultSubpartitionIndexSet queueIndexSet;
        final InputChannelID receiverId;
        final int credit;

        PartitionRequest(ResultPartitionID partitionId, ResultSubpartitionIndexSet queueIndexSet, InputChannelID receiverId, int credit) {
            this.partitionId = Preconditions.checkNotNull(partitionId);
            this.queueIndexSet = queueIndexSet;
            this.receiverId = Preconditions.checkNotNull(receiverId);
            this.credit = credit;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            Consumer<ByteBuf> consumer = bb -> {
                this.partitionId.getPartitionId().writeTo((ByteBuf)bb);
                this.partitionId.getProducerId().writeTo((ByteBuf)bb);
                this.queueIndexSet.writeTo((ByteBuf)bb);
                this.receiverId.writeTo((ByteBuf)bb);
                bb.writeInt(this.credit);
            };
            this.writeToChannel(out, promise, allocator, consumer, (byte)2, IntermediateResultPartitionID.getByteBufLength() + ExecutionAttemptID.getByteBufLength() + ResultSubpartitionIndexSet.getByteBufLength(this.queueIndexSet) + InputChannelID.getByteBufLength() + 4);
        }

        static PartitionRequest readFrom(ByteBuf buffer) {
            ResultPartitionID partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(buffer), ExecutionAttemptID.fromByteBuf(buffer));
            ResultSubpartitionIndexSet queueIndexSet = ResultSubpartitionIndexSet.fromByteBuf(buffer);
            InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
            int credit = buffer.readInt();
            return new PartitionRequest(partitionId, queueIndexSet, receiverId, credit);
        }

        public String toString() {
            return String.format("PartitionRequest(%s:%s:%d)", this.partitionId, this.queueIndexSet, this.credit);
        }
    }

    static class ErrorResponse
    extends NettyMessage {
        static final byte ID = 1;
        final Throwable cause;
        @Nullable
        final InputChannelID receiverId;

        ErrorResponse(Throwable cause) {
            this.cause = Preconditions.checkNotNull(cause);
            this.receiverId = null;
        }

        ErrorResponse(Throwable cause, InputChannelID receiverId) {
            this.cause = Preconditions.checkNotNull(cause);
            this.receiverId = receiverId;
        }

        boolean isFatalError() {
            return this.receiverId == null;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuf result = NettyMessage.allocateBuffer(allocator, (byte)1);
            try (ObjectOutputStream oos = new ObjectOutputStream(new ByteBufOutputStream(result));){
                oos.writeObject(this.cause);
                if (this.receiverId != null) {
                    result.writeBoolean(true);
                    this.receiverId.writeTo(result);
                } else {
                    result.writeBoolean(false);
                }
                result.setInt(0, result.readableBytes());
                out.write(result, promise);
            }
            catch (Throwable t) {
                this.handleException(result, null, t);
            }
        }

        static ErrorResponse readFrom(ByteBuf buffer) throws Exception {
            try (ObjectInputStream ois = new ObjectInputStream(new ByteBufInputStream(buffer));){
                Object obj = ois.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new ClassCastException("Read object expected to be of type Throwable, actual type is " + obj.getClass() + ".");
                }
                if (buffer.readBoolean()) {
                    InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
                    ErrorResponse errorResponse = new ErrorResponse((Throwable)obj, receiverId);
                    return errorResponse;
                }
                ErrorResponse errorResponse = new ErrorResponse((Throwable)obj);
                return errorResponse;
            }
        }
    }

    static class BufferResponse
    extends NettyMessage {
        static final byte ID = 0;
        static final int MESSAGE_HEADER_LENGTH = InputChannelID.getByteBufLength() + 4 + 4 + 4 + 4 + 1 + 1 + 4;
        final Buffer buffer;
        final InputChannelID receiverId;
        final int subpartitionId;
        final int sequenceNumber;
        final int backlog;
        final Buffer.DataType dataType;
        final boolean isCompressed;
        final int bufferSize;
        final int numOfPartialBuffers;
        private List<Integer> partialBufferSizes = new ArrayList<Integer>();

        private BufferResponse(@Nullable Buffer buffer, Buffer.DataType dataType, boolean isCompressed, int sequenceNumber, InputChannelID receiverId, int subpartitionId, int numOfPartialBuffers, int backlog, int bufferSize) {
            this.buffer = buffer;
            this.dataType = dataType;
            this.isCompressed = isCompressed;
            this.sequenceNumber = sequenceNumber;
            this.receiverId = Preconditions.checkNotNull(receiverId);
            this.subpartitionId = subpartitionId;
            this.backlog = backlog;
            this.bufferSize = bufferSize;
            this.numOfPartialBuffers = numOfPartialBuffers;
        }

        BufferResponse(Buffer buffer, int sequenceNumber, InputChannelID receiverId, int subpartitionId, int numOfPartialBuffers, int backlog) {
            this.buffer = Preconditions.checkNotNull(buffer);
            Preconditions.checkArgument(buffer.getDataType().ordinal() <= 127, "Too many data types defined!");
            Preconditions.checkArgument(backlog >= 0, "Must be non-negative.");
            this.dataType = buffer.getDataType();
            this.isCompressed = buffer.isCompressed();
            this.sequenceNumber = sequenceNumber;
            this.receiverId = Preconditions.checkNotNull(receiverId);
            this.subpartitionId = subpartitionId;
            this.backlog = backlog;
            this.bufferSize = buffer.getSize();
            this.numOfPartialBuffers = numOfPartialBuffers;
        }

        boolean isBuffer() {
            return this.dataType.isBuffer();
        }

        @Nullable
        public Buffer getBuffer() {
            return this.buffer;
        }

        void releaseBuffer() {
            if (this.buffer != null) {
                this.buffer.recycleBuffer();
            }
        }

        public List<Integer> getPartialBufferSizes() {
            return this.partialBufferSizes;
        }

        @Override
        void write(ChannelOutboundInvoker out, ChannelPromise promise, ByteBufAllocator allocator) throws IOException {
            ByteBuf headerBuf = null;
            try {
                this.buffer.setAllocator(allocator);
                headerBuf = this.fillHeader(allocator);
                out.write(headerBuf);
                if (this.buffer instanceof FileRegionBuffer) {
                    out.write(this.buffer, promise);
                } else {
                    out.write(this.buffer.asByteBuf(), promise);
                }
            }
            catch (Throwable t) {
                this.handleException(headerBuf, this.buffer, t);
            }
        }

        @VisibleForTesting
        ByteBuf write(ByteBufAllocator allocator) throws IOException {
            ByteBuf headerBuf = null;
            try {
                this.buffer.setAllocator(allocator);
                headerBuf = this.fillHeader(allocator);
                CompositeByteBuf composityBuf = allocator.compositeDirectBuffer();
                composityBuf.addComponent(headerBuf);
                composityBuf.addComponent(this.buffer.asByteBuf());
                composityBuf.writerIndex(headerBuf.writerIndex() + this.buffer.asByteBuf().writerIndex());
                return composityBuf;
            }
            catch (Throwable t) {
                this.handleException(headerBuf, this.buffer, t);
                return null;
            }
        }

        private ByteBuf fillHeader(ByteBufAllocator allocator) {
            ByteBuf headerBuf = NettyMessage.allocateBuffer(allocator, (byte)0, MESSAGE_HEADER_LENGTH + 4 * this.numOfPartialBuffers, this.bufferSize, false);
            this.receiverId.writeTo(headerBuf);
            headerBuf.writeInt(this.subpartitionId);
            headerBuf.writeInt(this.numOfPartialBuffers);
            headerBuf.writeInt(this.sequenceNumber);
            headerBuf.writeInt(this.backlog);
            headerBuf.writeByte(this.dataType.ordinal());
            headerBuf.writeBoolean(this.isCompressed);
            headerBuf.writeInt(this.buffer.readableBytes());
            if (this.numOfPartialBuffers > 0) {
                Preconditions.checkArgument(this.buffer instanceof FullyFilledBuffer, "Partial buffers are only supported for fully filled buffers.");
                List<Buffer> partialBuffers = ((FullyFilledBuffer)this.buffer).getPartialBuffers();
                Preconditions.checkArgument(partialBuffers.size() == this.numOfPartialBuffers, "Mismatched number of partial buffers");
                for (int i = 0; i < this.numOfPartialBuffers; ++i) {
                    int bytes = partialBuffers.get(i).readableBytes();
                    headerBuf.writeInt(bytes);
                }
            }
            return headerBuf;
        }

        static BufferResponse readFrom(ByteBuf messageHeader, NetworkBufferAllocator bufferAllocator) {
            Buffer dataBuffer;
            InputChannelID receiverId = InputChannelID.fromByteBuf(messageHeader);
            int subpartitionId = messageHeader.readInt();
            int numOfPartialBuffers = messageHeader.readInt();
            int sequenceNumber = messageHeader.readInt();
            int backlog = messageHeader.readInt();
            Buffer.DataType dataType = Buffer.DataType.values()[messageHeader.readByte()];
            boolean isCompressed = messageHeader.readBoolean();
            int size = messageHeader.readInt();
            if (dataType.isBuffer()) {
                dataBuffer = bufferAllocator.allocatePooledNetworkBuffer(receiverId);
                if (dataBuffer != null) {
                    dataBuffer.setDataType(dataType);
                }
            } else {
                dataBuffer = bufferAllocator.allocateUnPooledNetworkBuffer(size, dataType);
            }
            if (size == 0 && dataBuffer != null) {
                dataBuffer.recycleBuffer();
                dataBuffer = null;
            }
            if (dataBuffer != null) {
                dataBuffer.setCompressed(isCompressed);
            }
            return new BufferResponse(dataBuffer, dataType, isCompressed, sequenceNumber, receiverId, subpartitionId, numOfPartialBuffers, backlog, size);
        }
    }

    static class NettyMessageDecoder
    extends LengthFieldBasedFrameDecoder {
        NettyMessageDecoder() {
            super(Integer.MAX_VALUE, 0, 4, -4, 4);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf msg = (ByteBuf)super.decode(ctx, in);
            if (msg == null) {
                return null;
            }
            try {
                NettyMessage decodedMsg;
                int magicNumber = msg.readInt();
                if (magicNumber != -1159983106) {
                    throw new IllegalStateException("Network stream corrupted: received incorrect magic number.");
                }
                byte msgId = msg.readByte();
                switch (msgId) {
                    case 2: {
                        decodedMsg = PartitionRequest.readFrom(msg);
                        break;
                    }
                    case 3: {
                        decodedMsg = TaskEventRequest.readFrom(msg, this.getClass().getClassLoader());
                        break;
                    }
                    case 4: {
                        decodedMsg = CancelPartitionRequest.readFrom(msg);
                        break;
                    }
                    case 5: {
                        decodedMsg = CloseRequest.readFrom(msg);
                        break;
                    }
                    case 6: {
                        decodedMsg = AddCredit.readFrom(msg);
                        break;
                    }
                    case 7: {
                        decodedMsg = ResumeConsumption.readFrom(msg);
                        break;
                    }
                    case 8: {
                        decodedMsg = AckAllUserRecordsProcessed.readFrom(msg);
                        break;
                    }
                    case 10: {
                        decodedMsg = NewBufferSize.readFrom(msg);
                        break;
                    }
                    case 11: {
                        decodedMsg = SegmentId.readFrom(msg);
                        break;
                    }
                    default: {
                        throw new ProtocolException("Received unknown message from producer: " + msg);
                    }
                }
                NettyMessage nettyMessage = decodedMsg;
                return nettyMessage;
            }
            finally {
                msg.release();
            }
        }
    }

    @ChannelHandler.Sharable
    static class NettyMessageEncoder
    extends ChannelOutboundHandlerAdapter {
        NettyMessageEncoder() {
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws IOException {
            if (msg instanceof NettyMessage) {
                ((NettyMessage)msg).write(ctx, promise, ctx.alloc());
            } else {
                ctx.write(msg, promise);
            }
        }
    }
}

