/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.ChunkChecksum;
import com.rabbitmq.stream.ChunkChecksumValidationException;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.ParameterizedTypeReference;
import com.rabbitmq.stream.impl.Utils;
import com.rabbitmq.stream.metrics.MetricsCollector;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ServerFrameHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerFrameHandler.class);
    private static final FrameHandler RESPONSE_FRAME_HANDLER = new ResponseFrameHandler();
    private static final FrameHandler[][] HANDLERS;

    ServerFrameHandler() {
    }

    static List<FrameHandlerInfo> commandVersions() {
        ArrayList<FrameHandlerInfo> infos = new ArrayList<FrameHandlerInfo>(HANDLERS.length);
        for (int i = 0; i < HANDLERS.length; ++i) {
            FrameHandler[] handlers = HANDLERS[i];
            if (handlers == null) continue;
            FrameHandler handler = null;
            int minVersion = Short.MAX_VALUE;
            int maxVersion = 0;
            for (int j = 1; j < handlers.length; j = (int)((short)(j + 1))) {
                if (handlers[j] == null || !handlers[j].isInitiatedByServer()) continue;
                minVersion = Math.min(minVersion, j);
                maxVersion = Math.max(maxVersion, j);
                handler = handlers[j];
            }
            if (handler == null) continue;
            infos.add(new FrameHandlerInfo((short)i, (short)minVersion, (short)maxVersion));
        }
        return infos;
    }

    static FrameHandler defaultHandler() {
        return RESPONSE_FRAME_HANDLER;
    }

    static FrameHandler lookup(short commandId, short version, ByteBuf message) {
        FrameHandler handler = HANDLERS[commandId][version];
        if (handler == null) {
            message.release();
            throw new StreamException("Unsupported command " + commandId);
        }
        return handler;
    }

    private static <T> Client.OutstandingRequest<T> remove(ConcurrentMap<Integer, Client.OutstandingRequest<?>> outstandingRequests, int correlationId, ParameterizedTypeReference<T> type) {
        return (Client.OutstandingRequest)outstandingRequests.remove(correlationId);
    }

    private static <T> Client.OutstandingRequest<T> remove(ConcurrentMap<Integer, Client.OutstandingRequest<?>> outstandingRequests, int correlationId, Class<T> clazz) {
        return (Client.OutstandingRequest)outstandingRequests.remove(correlationId);
    }

    private static String readString(ByteBuf bb) {
        short size = bb.readShort();
        byte[] bytes = new byte[size];
        bb.readBytes(bytes);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    static {
        short maxCommandKey = (short)Arrays.stream(Constants.class.getDeclaredFields()).filter(f -> f.getName().startsWith("COMMAND_")).mapToInt(field -> {
            try {
                return ((Number)field.get(null)).intValue();
            }
            catch (IllegalAccessException e) {
                LOGGER.info("Error while trying to access field Constants." + field.getName());
                return 0;
            }
        }).max().getAsInt();
        HashMap<Short, FrameHandler> handlers = new HashMap<Short, FrameHandler>();
        handlers.put((short)22, new CloseFrameHandler());
        handlers.put((short)7, RESPONSE_FRAME_HANDLER);
        handlers.put((short)12, RESPONSE_FRAME_HANDLER);
        handlers.put((short)1, RESPONSE_FRAME_HANDLER);
        handlers.put((short)6, RESPONSE_FRAME_HANDLER);
        handlers.put((short)13, RESPONSE_FRAME_HANDLER);
        handlers.put((short)14, RESPONSE_FRAME_HANDLER);
        handlers.put((short)21, new OpenFrameHandler());
        handlers.put((short)3, new ConfirmFrameHandler());
        handlers.put((short)4, new PublishErrorHandler());
        handlers.put((short)16, new MetadataUpdateFrameHandler());
        handlers.put((short)15, new MetadataFrameHandler());
        handlers.put((short)18, new SaslHandshakeFrameHandler());
        handlers.put((short)19, new SaslAuthenticateFrameHandler());
        handlers.put((short)20, new TuneFrameHandler());
        handlers.put((short)23, new HeartbeatFrameHandler());
        handlers.put((short)17, new PeerPropertiesFrameHandler());
        handlers.put((short)9, new CreditNotificationFrameHandler());
        handlers.put((short)11, new QueryOffsetFrameHandler());
        handlers.put((short)5, new QueryPublisherSequenceFrameHandler());
        handlers.put((short)24, new RouteFrameHandler());
        handlers.put((short)25, new PartitionsFrameHandler());
        handlers.put((short)26, new ConsumerUpdateFrameHandler());
        handlers.put((short)27, new ExchangeCommandVersionsFrameHandler());
        handlers.put((short)28, new StreamStatsFrameHandler());
        handlers.put((short)29, RESPONSE_FRAME_HANDLER);
        handlers.put((short)30, RESPONSE_FRAME_HANDLER);
        HANDLERS = new FrameHandler[maxCommandKey + 1][];
        handlers.entrySet().forEach(entry -> {
            ServerFrameHandler.HANDLERS[((Short)entry.getKey()).shortValue()] = new FrameHandler[2];
            ServerFrameHandler.HANDLERS[((Short)entry.getKey()).shortValue()][1] = (FrameHandler)entry.getValue();
        });
        ServerFrameHandler.HANDLERS[8] = new FrameHandler[3];
        ServerFrameHandler.HANDLERS[8][1] = new DeliverVersion1FrameHandler();
        ServerFrameHandler.HANDLERS[8][2] = new DeliverVersion2FrameHandler();
    }

    private static class StreamStatsFrameHandler
    extends BaseFrameHandler {
        private StreamStatsFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            int infoCount = message.readInt();
            read += 4;
            LinkedHashMap<String, Long> info = new LinkedHashMap<String, Long>(infoCount);
            for (int i = 0; i < infoCount; ++i) {
                String key = ServerFrameHandler.readString(message);
                read += 2 + key.length();
                long value = message.readLong();
                info.put(key, value);
                read += 8;
            }
            Client.OutstandingRequest<Client.StreamStatsResponse> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, Client.StreamStatsResponse.class);
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(new Client.StreamStatsResponse(responseCode, info));
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class ExchangeCommandVersionsFrameHandler
    extends BaseFrameHandler {
        private ExchangeCommandVersionsFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            Client.OutstandingRequest<List<FrameHandlerInfo>> outstandingRequest;
            List commandVersions;
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            int commandVersionsCount = message.readInt();
            read += 4;
            if (commandVersionsCount == 0) {
                commandVersions = Collections.emptyList();
            } else {
                commandVersions = new ArrayList(commandVersionsCount);
                for (int i = 0; i < commandVersionsCount; ++i) {
                    short key = message.readShort();
                    short minVersion = message.readShort();
                    short maxVersion = message.readShort();
                    read += 6;
                    commandVersions.add(new FrameHandlerInfo(key, minVersion, maxVersion));
                }
            }
            if (responseCode != 1) {
                LOGGER.info("Exchange command versions returned error: {}", (Object)Utils.formatConstant(responseCode));
            }
            if ((outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<List<FrameHandlerInfo>>(){})) == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(commandVersions);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class PartitionsFrameHandler
    extends BaseFrameHandler {
        private PartitionsFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            Client.OutstandingRequest<List<String>> outstandingRequest;
            List streams;
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            int streamCount = message.readInt();
            read += 4;
            if (streamCount == 0) {
                streams = Collections.emptyList();
            } else {
                streams = new ArrayList(streamCount);
                for (int i = 0; i < streamCount; ++i) {
                    String stream = ServerFrameHandler.readString(message);
                    read += 2 + stream.length();
                    streams.add(stream);
                }
            }
            if (responseCode != 1) {
                LOGGER.info("Route returned error: {}", (Object)Utils.formatConstant(responseCode));
            }
            if ((outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<List<String>>(){})) == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(streams);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class RouteFrameHandler
    extends BaseFrameHandler {
        private RouteFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            Client.OutstandingRequest<List<String>> outstandingRequest;
            List streams;
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            int streamCount = message.readInt();
            read += 4;
            if (streamCount == 0) {
                streams = Collections.emptyList();
            } else {
                streams = new ArrayList(streamCount);
                for (int i = 0; i < streamCount; ++i) {
                    String stream = ServerFrameHandler.readString(message);
                    read += 2 + stream.length();
                    streams.add(stream);
                }
            }
            if (responseCode != 1) {
                LOGGER.info("Route returned error: {}", (Object)Utils.formatConstant(responseCode));
            }
            if ((outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<List<String>>(){})) == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(streams);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class ResponseFrameHandler
    extends BaseFrameHandler {
        private ResponseFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            Client.OutstandingRequest<Client.Response> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, Client.Response.class);
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                Client.Response response = new Client.Response(responseCode);
                outstandingRequest.response().set(response);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class MetadataFrameHandler
    extends BaseFrameHandler {
        private MetadataFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            HashMap<Short, Client.Broker> brokers = new HashMap<Short, Client.Broker>();
            int brokersCount = message.readInt();
            read += 4;
            for (int i = 0; i < brokersCount; ++i) {
                short brokerReference = message.readShort();
                read += 2;
                String host = ServerFrameHandler.readString(message);
                read += 2 + host.length();
                int port = message.readInt();
                read += 4;
                brokers.put(brokerReference, new Client.Broker(host, port));
            }
            int streamsCount = message.readInt();
            LinkedHashMap<String, Client.StreamMetadata> results = new LinkedHashMap<String, Client.StreamMetadata>(streamsCount);
            read += 4;
            for (int i = 0; i < streamsCount; ++i) {
                List<Client.Broker> replicas;
                String stream = ServerFrameHandler.readString(message);
                read += 2 + stream.length();
                short responseCode = message.readShort();
                read += 2;
                short leaderReference = message.readShort();
                read += 2;
                int replicasCount = message.readInt();
                read += 4;
                if (replicasCount == 0) {
                    replicas = Collections.emptyList();
                } else {
                    replicas = new ArrayList(replicasCount);
                    for (int j = 0; j < replicasCount; ++j) {
                        short replicaReference = message.readShort();
                        read += 2;
                        replicas.add((Client.Broker)brokers.get(replicaReference));
                    }
                }
                Client.StreamMetadata streamMetadata = new Client.StreamMetadata(stream, responseCode, (Client.Broker)brokers.get(leaderReference), replicas);
                results.put(stream, streamMetadata);
            }
            Client.OutstandingRequest<Map<String, Client.StreamMetadata>> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<Map<String, Client.StreamMetadata>>(){});
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(results);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class SaslHandshakeFrameHandler
    extends BaseFrameHandler {
        private SaslHandshakeFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            if (responseCode != 1) {
                while (message.isReadable()) {
                    message.readByte();
                }
                throw new StreamException("Unexpected response code for SASL handshake response: " + responseCode);
            }
            int mechanismsCount = message.readInt();
            read += 4;
            ArrayList<String> mechanisms = new ArrayList<String>(mechanismsCount);
            for (int i = 0; i < mechanismsCount; ++i) {
                String mechanism = ServerFrameHandler.readString(message);
                mechanisms.add(mechanism);
                read += 2 + mechanism.length();
            }
            Client.OutstandingRequest<List<String>> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<List<String>>(){});
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(mechanisms);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class SaslAuthenticateFrameHandler
    extends BaseFrameHandler {
        private SaslAuthenticateFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            byte[] challenge;
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            if (responseCode == 10) {
                int challengeSize = message.readInt();
                read += 4;
                challenge = new byte[challengeSize];
                message.readBytes(challenge);
                read += challenge.length;
            } else {
                challenge = null;
            }
            Client.SaslAuthenticateResponse response = new Client.SaslAuthenticateResponse(responseCode, challenge);
            Client.OutstandingRequest<Client.SaslAuthenticateResponse> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, Client.SaslAuthenticateResponse.class);
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(response);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class TuneFrameHandler
    extends BaseFrameHandler {
        private TuneFrameHandler() {
        }

        private static int negotiatedMaxValue(int clientValue, int serverValue) {
            return clientValue == 0 || serverValue == 0 ? Math.max(clientValue, serverValue) : Math.min(clientValue, serverValue);
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int serverMaxFrameSize = message.readInt();
            int read = 4;
            int serverHeartbeat = message.readInt();
            read += 4;
            int maxFrameSize = TuneFrameHandler.negotiatedMaxValue(client.tuneState.requestedMaxFrameSize(), serverMaxFrameSize);
            int heartbeat = TuneFrameHandler.negotiatedMaxValue(client.tuneState.requestedHeartbeat(), serverHeartbeat);
            int length = 12;
            ByteBuf byteBuf = client.allocateNoCheck(ctx.alloc(), length + 4);
            byteBuf.writeInt(length).writeShort((int)Utils.encodeResponseCode((short)20)).writeShort(1).writeInt(maxFrameSize).writeInt(heartbeat);
            ctx.writeAndFlush((Object)byteBuf);
            client.tuneState.maxFrameSize(maxFrameSize).heartbeat(heartbeat);
            if (heartbeat > 0) {
                client.channel.pipeline().addBefore(Client.NETTY_HANDLER_FRAME_DECODER, Client.NETTY_HANDLER_IDLE_STATE, (ChannelHandler)new IdleStateHandler(heartbeat * 2, heartbeat, 0));
            }
            client.tuneState.done();
            return read;
        }
    }

    private static class HeartbeatFrameHandler
    extends BaseFrameHandler {
        private HeartbeatFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            return 0;
        }
    }

    private static class OpenFrameHandler
    extends BaseFrameHandler {
        private OpenFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            Client.OutstandingRequest<Client.OpenResponse> outstandingRequest;
            Map<String, String> connectionProperties;
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            if (message.isReadable()) {
                int connectionPropertiesCount = message.readInt();
                read += 4;
                connectionProperties = new LinkedHashMap(connectionPropertiesCount);
                for (int i = 0; i < connectionPropertiesCount; ++i) {
                    String key = ServerFrameHandler.readString(message);
                    read += 2 + key.length();
                    String value = ServerFrameHandler.readString(message);
                    read += 2 + value.length();
                    connectionProperties.put(key, value);
                }
            } else {
                connectionProperties = Collections.emptyMap();
            }
            if ((outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, Client.OpenResponse.class)) == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(new Client.OpenResponse(responseCode, connectionProperties));
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class PeerPropertiesFrameHandler
    extends BaseFrameHandler {
        private PeerPropertiesFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            LOGGER.debug("Handling peer properties response for connection {}", (Object)client.clientConnectionName());
            int correlationId = message.readInt();
            LOGGER.debug("Handling peer properties response for connection {}, correlation ID is {}", (Object)client.clientConnectionName(), (Object)correlationId);
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            if (responseCode != 1) {
                while (message.isReadable()) {
                    message.readByte();
                }
                throw new StreamException("Unexpected response code for SASL handshake response: " + responseCode);
            }
            int serverPropertiesCount = message.readInt();
            read += 4;
            LinkedHashMap<String, String> serverProperties = new LinkedHashMap<String, String>(serverPropertiesCount);
            for (int i = 0; i < serverPropertiesCount; ++i) {
                String key = ServerFrameHandler.readString(message);
                read += 2 + key.length();
                String value = ServerFrameHandler.readString(message);
                read += 2 + value.length();
                serverProperties.put(key, value);
            }
            Client.OutstandingRequest<Map<String, String>> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, new ParameterizedTypeReference<Map<String, String>>(){});
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                outstandingRequest.response().set(Collections.unmodifiableMap(serverProperties));
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class ConsumerUpdateFrameHandler
    extends BaseFrameHandler {
        private ConsumerUpdateFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            byte subscriptionId = message.readByte();
            ++read;
            byte activeByte = message.readByte();
            OffsetSpecification offsetSpecification = client.consumerUpdateListener.handle(client, subscriptionId, activeByte == 1);
            client.consumerUpdateResponse(correlationId, (short)1, offsetSpecification);
            return ++read;
        }
    }

    private static class CreditNotificationFrameHandler
    extends BaseFrameHandler {
        private CreditNotificationFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            short responseCode = message.readShort();
            int read = 2;
            byte subscriptionId = message.readByte();
            client.creditNotification.handle(subscriptionId, responseCode);
            return ++read;
        }
    }

    private static class QueryOffsetFrameHandler
    extends BaseFrameHandler {
        private QueryOffsetFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            long offset = message.readLong();
            read += 8;
            Client.OutstandingRequest<Client.QueryOffsetResponse> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, Client.QueryOffsetResponse.class);
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                Client.QueryOffsetResponse response = new Client.QueryOffsetResponse(responseCode, offset);
                outstandingRequest.response().set(response);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class QueryPublisherSequenceFrameHandler
    extends BaseFrameHandler {
        private QueryPublisherSequenceFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            short responseCode = message.readShort();
            read += 2;
            long sequence = message.readLong();
            read += 8;
            Client.OutstandingRequest<Client.QueryPublisherSequenceResponse> outstandingRequest = ServerFrameHandler.remove(client.outstandingRequests, correlationId, Client.QueryPublisherSequenceResponse.class);
            if (outstandingRequest == null) {
                this.logMissingOutstandingRequest(correlationId);
            } else {
                Client.QueryPublisherSequenceResponse response = new Client.QueryPublisherSequenceResponse(responseCode, sequence);
                outstandingRequest.response().set(response);
                outstandingRequest.countDown();
            }
            return read;
        }
    }

    private static class CloseFrameHandler
    extends BaseFrameHandler {
        private CloseFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int correlationId = message.readInt();
            int read = 4;
            short closeCode = message.readShort();
            read += 2;
            String closeReason = ServerFrameHandler.readString(message);
            read += 2 + closeReason.length();
            LOGGER.info("Received close from server, reason: {} {}", (Object)closeCode, (Object)closeReason);
            int length = 10;
            ByteBuf byteBuf = client.allocate(ctx.alloc(), length + 4);
            byteBuf.writeInt(length).writeShort((int)Utils.encodeResponseCode((short)22)).writeShort(1).writeInt(correlationId).writeShort(1);
            client.shutdownReason(Client.ShutdownContext.ShutdownReason.SERVER_CLOSE);
            ctx.writeAndFlush((Object)byteBuf).addListener(future -> {
                if (client.closing.compareAndSet(false, true)) {
                    client.executorService.submit(() -> client.closingSequence(Client.ShutdownContext.ShutdownReason.SERVER_CLOSE));
                }
            });
            return read;
        }
    }

    private static class MetadataUpdateFrameHandler
    extends BaseFrameHandler {
        private MetadataUpdateFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            short code = message.readShort();
            int read = 2;
            if (code != 6) {
                throw new IllegalArgumentException("Unsupported metadata update code " + code);
            }
            String stream = ServerFrameHandler.readString(message);
            LOGGER.debug("Stream {} is no longer available", (Object)stream);
            client.metadataListener.handle(stream, code);
            return read += 2 + stream.length();
        }
    }

    static class DeliverVersion2FrameHandler
    extends BaseFrameHandler {
        DeliverVersion2FrameHandler() {
        }

        @Override
        public boolean isInitiatedByServer() {
            return true;
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            return DeliverVersion1FrameHandler.handleDeliver(message, client, client.chunkListener, client.messageListener, client.messageIgnoredListener, client.codec, client.subscriptionOffsets, client.chunkChecksum, client.metricsCollector, message.readByte(), message.readLong(), 9);
        }
    }

    static class DeliverVersion1FrameHandler
    extends BaseFrameHandler {
        DeliverVersion1FrameHandler() {
        }

        @Override
        public boolean isInitiatedByServer() {
            return true;
        }

        static int handleMessage(ByteBuf bb, int read, boolean ignore, Utils.MutableBoolean messageIgnored, long offset, long offsetLimit, long chunkTimestamp, long committedChunkId, Codec codec, Client.MessageListener messageListener, byte subscriptionId, Object chunkContext) {
            int entrySize = bb.readInt();
            read += 4;
            byte[] data = new byte[entrySize];
            bb.readBytes(data);
            read += entrySize;
            if (ignore && Long.compareUnsigned(offset, offsetLimit) < 0) {
                messageIgnored.set(true);
            } else {
                try {
                    Message message = codec.decode(data);
                    messageListener.handle(subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext, message);
                }
                catch (RuntimeException e) {
                    LOGGER.warn("Error while decoding message at offset {}", (Object)offset, (Object)e);
                    throw e;
                }
            }
            return read;
        }

        static int handleDeliverVersion1(ByteBuf message, Client client, Client.ChunkListener chunkListener, Client.MessageListener messageListener, Client.MessageIgnoredListener messageIgnoredListener, Codec codec, List<Client.SubscriptionOffset> subscriptionOffsets, ChunkChecksum chunkChecksum, MetricsCollector metricsCollector) {
            return DeliverVersion1FrameHandler.handleDeliver(message, client, chunkListener, messageListener, messageIgnoredListener, codec, subscriptionOffsets, chunkChecksum, metricsCollector, message.readByte(), 0L, 1);
        }

        static int handleDeliver(ByteBuf message, Client client, Client.ChunkListener chunkListener, Client.MessageListener messageListener, Client.MessageIgnoredListener messageIgnoredListener, Codec codec, List<Client.SubscriptionOffset> subscriptionOffsets, ChunkChecksum chunkChecksum, MetricsCollector metricsCollector, byte subscriptionId, long committedOffset, int read) {
            message.readByte();
            ++read;
            byte chunkType = message.readByte();
            if (chunkType != 0) {
                throw new IllegalStateException("Invalid chunk type: " + chunkType);
            }
            ++read;
            int numEntries = message.readUnsignedShort();
            read += 2;
            long numRecords = message.readUnsignedInt();
            read += 4;
            long chunkTimestamp = message.readLong();
            read += 8;
            message.readLong();
            read += 8;
            long offset = message.readLong();
            read += 8;
            long crc = message.readUnsignedInt();
            read += 4;
            long dataLength = message.readUnsignedInt();
            read += 4;
            message.readUnsignedInt();
            read += 4;
            message.readInt();
            read += 4;
            Object chunkContext = chunkListener.handle(client, subscriptionId, offset, numRecords, dataLength);
            long offsetLimit = -1L;
            if (!subscriptionOffsets.isEmpty()) {
                for (Client.SubscriptionOffset subscriptionOffset : subscriptionOffsets) {
                    if (subscriptionOffset.subscriptionId() != subscriptionId) continue;
                    subscriptionOffsets.remove(subscriptionOffset);
                    offsetLimit = subscriptionOffset.offset();
                    break;
                }
            }
            boolean ignore = offsetLimit != -1L;
            try {
                chunkChecksum.checksum(message, dataLength, crc);
            }
            catch (ChunkChecksumValidationException e) {
                LOGGER.warn("Checksum failure at offset {}, expecting {}, got {}", new Object[]{offset, e.getExpected(), e.getComputed()});
                throw e;
            }
            metricsCollector.chunk(numEntries);
            Utils.MutableBoolean messageIgnored = new Utils.MutableBoolean(false);
            while (numRecords != 0L) {
                byte entryType = message.readByte();
                if ((entryType & 0x80) == 0) {
                    message.readerIndex(message.readerIndex() - 1);
                    read = DeliverVersion1FrameHandler.handleMessage(message, read, ignore, messageIgnored, offset, offsetLimit, chunkTimestamp, committedOffset, codec, messageListener, subscriptionId, chunkContext);
                    if (messageIgnored.get()) {
                        messageIgnoredListener.ignored(subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
                        messageIgnored.set(false);
                    } else {
                        metricsCollector.consume(1L);
                    }
                    --numRecords;
                    ++offset;
                    continue;
                }
                byte compression = (byte)((entryType & 0x70) >> 4);
                ++read;
                Compression comp = Compression.get(compression);
                int numRecordsInBatch = message.readUnsignedShort();
                read += 2;
                int uncompressedDataSize = message.readInt();
                read += 4;
                int dataSize = message.readInt();
                int readBeforeSubEntries = read += 4;
                ByteBuf bbToReadFrom = message;
                if (comp.code() != Compression.NONE.code()) {
                    CompressionCodec compressionCodec = client.compressionCodecFactory.get(comp);
                    ByteBuf outBb = client.channel.alloc().heapBuffer(uncompressedDataSize);
                    ByteBuf slice = message.slice(message.readerIndex(), dataSize);
                    InputStream inputStream = compressionCodec.decompress((InputStream)new ByteBufInputStream(slice));
                    byte[] inBuffer = new byte[Math.min(uncompressedDataSize, 1024)];
                    try {
                        int n;
                        while (-1 != (n = inputStream.read(inBuffer))) {
                            outBb.writeBytes(inBuffer, 0, n);
                        }
                    }
                    catch (IOException e) {
                        throw new StreamException("Error while uncompressing sub-entry", e);
                    }
                    message.readerIndex(message.readerIndex() + dataSize);
                    bbToReadFrom = outBb;
                }
                numRecords -= (long)numRecordsInBatch;
                while (numRecordsInBatch != 0) {
                    read = DeliverVersion1FrameHandler.handleMessage(bbToReadFrom, read, ignore, messageIgnored, offset, offsetLimit, chunkTimestamp, committedOffset, codec, messageListener, subscriptionId, chunkContext);
                    if (messageIgnored.get()) {
                        messageIgnoredListener.ignored(subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
                        messageIgnored.set(false);
                    } else {
                        metricsCollector.consume(1L);
                    }
                    --numRecordsInBatch;
                    ++offset;
                }
                if (comp.code() == Compression.NONE.code()) continue;
                bbToReadFrom.release();
                read = readBeforeSubEntries + dataSize;
            }
            return read;
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            return DeliverVersion1FrameHandler.handleDeliverVersion1(message, client, client.chunkListener, client.messageListener, client.messageIgnoredListener, client.codec, client.subscriptionOffsets, client.chunkChecksum, client.metricsCollector);
        }
    }

    private static class PublishErrorHandler
    extends BaseFrameHandler {
        private PublishErrorHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int publishingErrorCount;
            byte publisherId = message.readByte();
            int read = 1;
            read += 4;
            client.metricsCollector.publishError(publishingErrorCount);
            for (publishingErrorCount = message.readInt(); publishingErrorCount != 0; --publishingErrorCount) {
                long publishingId = message.readLong();
                read += 8;
                short code = message.readShort();
                read += 2;
                client.publishErrorListener.handle(publisherId, publishingId, code);
            }
            return read;
        }
    }

    private static class ConfirmFrameHandler
    extends BaseFrameHandler {
        private ConfirmFrameHandler() {
        }

        @Override
        int doHandle(Client client, ChannelHandlerContext ctx, ByteBuf message) {
            int publishingIdCount;
            byte publisherId = message.readByte();
            int read = 1;
            read += 4;
            client.metricsCollector.publishConfirm(publishingIdCount);
            for (publishingIdCount = message.readInt(); publishingIdCount != 0; --publishingIdCount) {
                long publishingId = message.readLong();
                read += 8;
                client.publishConfirmListener.handle(publisherId, publishingId);
            }
            return read;
        }
    }

    private static abstract class BaseFrameHandler
    implements FrameHandler {
        private BaseFrameHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(Client client, int frameSize, ChannelHandlerContext ctx, ByteBuf message) {
            try {
                int read = this.doHandle(client, ctx, message) + 4;
                if (read != frameSize) {
                    LOGGER.warn("Read {} bytes in frame, expecting {}", (Object)read, (Object)frameSize);
                }
            }
            catch (Exception e) {
                LOGGER.warn("Error while handling response from server", (Throwable)e);
            }
            finally {
                message.release();
            }
        }

        abstract int doHandle(Client var1, ChannelHandlerContext var2, ByteBuf var3);

        protected void logMissingOutstandingRequest(int correlationId) {
            LOGGER.warn("Could not find outstanding request with correlation ID {} ({})", (Object)correlationId, (Object)this.getClass().getSimpleName());
        }
    }

    static interface FrameHandler {
        public void handle(Client var1, int var2, ChannelHandlerContext var3, ByteBuf var4);

        default public boolean isInitiatedByServer() {
            return false;
        }
    }

    static class FrameHandlerInfo {
        private final short key;
        private final short minVersion;
        private final short maxVersion;

        FrameHandlerInfo(short key, short minVersion, short maxVersion) {
            this.key = key;
            this.minVersion = minVersion;
            this.maxVersion = maxVersion;
        }

        short getKey() {
            return this.key;
        }

        short getMinVersion() {
            return this.minVersion;
        }

        short getMaxVersion() {
            return this.maxVersion;
        }

        public String toString() {
            return "FrameHandlerInfo{key=" + this.key + ", minVersion=" + this.minVersion + ", maxVersion=" + this.maxVersion + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            FrameHandlerInfo that = (FrameHandlerInfo)o;
            return this.key == that.key && this.minVersion == that.minVersion && this.maxVersion == that.maxVersion;
        }

        public int hashCode() {
            return Objects.hash(this.key, this.minVersion, this.maxVersion);
        }
    }
}

