/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.dataproxy.source;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Event;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.exception.ChannelUnWritableException;
import org.apache.inlong.dataproxy.exception.PkgParseException;
import org.apache.inlong.dataproxy.source.BaseSource;
import org.apache.inlong.dataproxy.source.v0msg.AbsV0MsgCodec;
import org.apache.inlong.dataproxy.source.v0msg.CodecBinMsg;
import org.apache.inlong.dataproxy.source.v0msg.CodecTextMsg;
import org.apache.inlong.dataproxy.source.v1msg.InlongTcpSourceCallback;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.sdk.commons.protocol.EventUtils;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
import org.apache.inlong.sdk.commons.protocol.ProxyPackEvent;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.apache.inlong.sdk.commons.protocol.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerMessageHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static final LogCounter exceptLogCounter = new LogCounter(10, 50000, 20000);
    private static final int INLONG_MSG_V1 = 1;
    private static final ConfigManager configManager = ConfigManager.getInstance();
    private final BaseSource source;

    public ServerMessageHandler(BaseSource source) {
        this.source = source;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg == null) {
            this.source.fileMetricIncSumStats("msg.readable.empty");
            return;
        }
        ByteBuf cb = (ByteBuf)msg;
        try {
            void var11_16;
            int readableLength = cb.readableBytes();
            if (readableLength == 0 && this.source.isFilterEmptyMsg()) {
                cb.clear();
                this.source.fileMetricIncSumStats("msg.readable.empty");
                return;
            }
            if (readableLength > this.source.getMaxMsgLength()) {
                this.source.fileMetricIncSumStats("msg.readable.overmax");
                throw new PkgParseException("Error msg, readableLength(" + readableLength + ") > max allowed message length (" + this.source.getMaxMsgLength() + ")");
            }
            cb.markReaderIndex();
            int totalDataLen = cb.readInt();
            if (readableLength < totalDataLen + 4) {
                cb.resetReaderIndex();
                this.source.fileMetricIncSumStats("msg.readable.unfilled");
                return;
            }
            byte msgTypeValue = cb.readByte();
            if (msgTypeValue == 0) {
                msgTypeValue = cb.readByte();
                if (msgTypeValue == 1) {
                    int bodyLength = totalDataLen - 2;
                    this.processV1Msg(ctx, cb, bodyLength);
                    return;
                }
                this.source.fileMetricIncSumStats("msg.msgtype.v1.invalid");
                throw new PkgParseException("Unknown V1 message version, version = " + msgTypeValue);
            }
            Channel channel = ctx.channel();
            MsgType msgType = MsgType.valueOf((int)msgTypeValue);
            long msgRcvTime = System.currentTimeMillis();
            if (MsgType.MSG_UNKNOWN == msgType) {
                this.source.fileMetricIncSumStats("msg.msgtype.v0.invalid");
                if (!logger.isDebugEnabled()) throw new PkgParseException("Unknown V0 message type, type = " + msgTypeValue);
                logger.debug("Received unknown message, channel {}", (Object)channel);
                throw new PkgParseException("Unknown V0 message type, type = " + msgTypeValue);
            }
            if (MsgType.MSG_HEARTBEAT == msgType) {
                this.flushV0MsgPackage(this.source, channel, this.buildHeartBeatMsgRspPackage(), MsgType.MSG_HEARTBEAT.name());
                return;
            }
            if (MsgType.MSG_BIN_HEARTBEAT == msgType) {
                this.procBinHeartbeatMsg(this.source, channel, cb, totalDataLen);
                return;
            }
            String strRemoteIP = AddressUtils.getChannelRemoteIP(channel);
            if (MsgType.MSG_BIN_MULTI_BODY == msgType) {
                if (totalDataLen < 25) {
                    this.source.fileMetricIncSumStats("msg.bin.totallen.belowmin");
                    String errMsg = String.format("Malformed msg, totalDataLen(%d) < min bin7-msg length(%d)", totalDataLen, 25);
                    if (!logger.isDebugEnabled()) throw new PkgParseException(errMsg);
                    logger.debug(errMsg + ", channel {}", (Object)channel);
                    throw new PkgParseException(errMsg);
                }
                CodecBinMsg codecBinMsg = new CodecBinMsg(totalDataLen, msgTypeValue, msgRcvTime, strRemoteIP);
            } else {
                if (totalDataLen < 9) {
                    this.source.fileMetricIncSumStats("msg.txt.totallen.belowmin");
                    String errMsg = String.format("Malformed msg, totalDataLen(%d) < min txt-msg length(%d)", totalDataLen, 9);
                    if (!logger.isDebugEnabled()) throw new PkgParseException(errMsg);
                    logger.debug(errMsg + ", channel {}", (Object)channel);
                    throw new PkgParseException(errMsg);
                }
                CodecTextMsg codecTextMsg = new CodecTextMsg(totalDataLen, msgTypeValue, msgRcvTime, strRemoteIP);
            }
            this.processV0Msg(channel, cb, (AbsV0MsgCodec)var11_16);
            return;
        }
        finally {
            cb.release();
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String strRemoteIp;
        if (ConfigManager.getInstance().needChkIllegalIP() && (strRemoteIp = AddressUtils.getChannelRemoteIP(ctx.channel())) != null && ConfigManager.getInstance().isIllegalIP(strRemoteIp)) {
            this.source.fileMetricIncSumStats("visit.illegal");
            ctx.channel().disconnect();
            ctx.channel().close();
            logger.error(strRemoteIp + " is Illegal IP, so refuse it !");
            return;
        }
        if (this.source.getAllChannels().size() >= this.source.getMaxConnections()) {
            this.source.fileMetricIncSumStats("visit.overmax");
            ctx.channel().disconnect();
            ctx.channel().close();
            logger.warn("{} refuse to connect = {} , connections = {}, maxConnections = {}", new Object[]{this.source.getCachedSrcName(), ctx.channel(), this.source.getAllChannels().size(), this.source.getMaxConnections()});
            return;
        }
        this.source.getAllChannels().add((Object)ctx.channel());
        ctx.fireChannelActive();
        this.source.fileMetricIncSumStats("visit.linkin");
        logger.info("{} added new channel {}, current connections = {}, maxConnections = {}", new Object[]{this.source.getCachedSrcName(), ctx.channel(), this.source.getAllChannels().size(), this.source.getMaxConnections()});
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.source.fileMetricIncSumStats("visit.linkout");
        ctx.fireChannelInactive();
        this.source.getAllChannels().remove((Object)ctx.channel());
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (!(cause instanceof PkgParseException) && !(cause instanceof ChannelUnWritableException)) {
            if (cause instanceof ReadTimeoutException) {
                this.source.fileMetricIncSumStats("link.read.timeout");
            } else if (cause instanceof TooLongFrameException) {
                this.source.fileMetricIncSumStats("link.frame.overmax");
            } else if (cause instanceof CorruptedFrameException) {
                this.source.fileMetricIncSumStats("link.frame.corrupted");
            } else if (cause instanceof IOException) {
                this.source.fileMetricIncSumStats("link.io.exception");
            } else {
                this.source.fileMetricIncSumStats("link.unknown.exception");
            }
            if (exceptLogCounter.shouldPrint()) {
                logger.warn("{} received an exception from channel {}", new Object[]{this.source.getCachedSrcName(), ctx.channel(), cause});
            }
        }
        if (ctx.channel() != null) {
            this.source.getAllChannels().remove((Object)ctx.channel());
            try {
                ctx.channel().disconnect();
                ctx.channel().close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        ctx.close();
    }

    private void processV0Msg(Channel channel, ByteBuf cb, AbsV0MsgCodec msgCodec) throws Exception {
        block8: {
            StringBuilder strBuff = new StringBuilder(512);
            if (!msgCodec.descMsg(this.source, cb)) {
                this.responseV0Msg(channel, msgCodec, strBuff);
                return;
            }
            if (this.source.isRejectService()) {
                this.source.fileMetricIncSumStats("service.closed");
                msgCodec.setFailureInfo(DataProxyErrCode.SERVICE_CLOSED);
                this.responseV0Msg(channel, msgCodec, strBuff);
                return;
            }
            if (!ConfigManager.getInstance().isMqClusterReady()) {
                this.source.fileMetricIncSumStats("service.sink.unready");
                msgCodec.setFailureInfo(DataProxyErrCode.SINK_SERVICE_UNREADY);
                this.responseV0Msg(channel, msgCodec, strBuff);
                return;
            }
            if (!msgCodec.validAndFillFields(this.source, strBuff)) {
                this.responseV0Msg(channel, msgCodec, strBuff);
                return;
            }
            Event event = msgCodec.encEventPackage(this.source, channel);
            try {
                this.source.getCachedChProcessor().processEvent(event);
                this.source.fileMetricAddSuccStats(strBuff, msgCodec.getGroupId(), msgCodec.getStreamId(), msgCodec.getTopicName(), msgCodec.getStrRemoteIP(), msgCodec.getMsgProcType(), msgCodec.getDataTimeMs(), msgCodec.getMsgPkgTime(), msgCodec.getMsgCount(), 1, event.getBody().length);
                this.source.addMetric(true, event.getBody().length, event);
                if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
                    msgCodec.setSuccessInfo();
                    this.responseV0Msg(channel, msgCodec, strBuff);
                }
            }
            catch (Throwable ex) {
                this.source.fileMetricAddFailStats(strBuff, msgCodec.getGroupId(), msgCodec.getStreamId(), msgCodec.getTopicName(), msgCodec.getStrRemoteIP(), msgCodec.getMsgProcType(), msgCodec.getDataTimeMs(), msgCodec.getMsgPkgTime(), 1);
                this.source.addMetric(false, event.getBody().length, event);
                if (msgCodec.isNeedResp()) {
                    msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE, strBuff.append("Put msg event to channel failure: ").append(ex.getMessage()).toString());
                    strBuff.delete(0, strBuff.length());
                    this.responseV0Msg(channel, msgCodec, strBuff);
                }
                if (!logCounter.shouldPrint()) break block8;
                logger.error("Error writing msg event to channel failure, attrs={}", (Object)msgCodec.getAttr(), (Object)ex);
            }
        }
    }

    private void processV1Msg(ChannelHandlerContext ctx, ByteBuf cb, int bodyLength) throws Exception {
        byte[] msgBytes = new byte[bodyLength];
        cb.readBytes(msgBytes);
        ProxySdk.MessagePack packObject = ProxySdk.MessagePack.parseFrom((byte[])msgBytes);
        if (this.source.isRejectService()) {
            this.source.addMetric(false, 0L, null);
            this.source.fileMetricIncSumStats("service.closed");
            this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, packObject);
            return;
        }
        List events = EventUtils.decodeSdkPack((ProxySdk.MessagePack)packObject);
        if (events.size() == 0) {
            this.responsePackage(ctx, ProxySdk.ResultCode.SUCCUSS, packObject);
        }
        if (CommonConfigHolder.getInstance().isResponseAfterSave()) {
            this.processAndWaitingSave(ctx, packObject, events);
        } else {
            this.processAndResponse(ctx, packObject, events);
        }
    }

    private void responsePackage(ChannelHandlerContext ctx, ProxySdk.ResultCode code, ProxySdk.MessagePack packObject) throws Exception {
        ProxySdk.ResponseInfo.Builder builder = ProxySdk.ResponseInfo.newBuilder();
        builder.setResult(code);
        ProxySdk.MessagePackHeader header = packObject.getHeader();
        builder.setPackId(header.getPackId());
        byte[] responseBytes = builder.build().toByteArray();
        ByteBuf buffer = Unpooled.wrappedBuffer((byte[])responseBytes);
        Channel remoteChannel = ctx.channel();
        if (!remoteChannel.isWritable()) {
            buffer.release();
            logger.warn("Send buffer2 is not writable, disconnect {}", (Object)remoteChannel);
            throw new ChannelUnWritableException("Send buffer2 is not writable, disconnect " + remoteChannel);
        }
        remoteChannel.write((Object)buffer);
    }

    private void processAndWaitingSave(ChannelHandlerContext ctx, ProxySdk.MessagePack packObject, List<ProxyEvent> events) throws Exception {
        block3: {
            ProxySdk.MessagePackHeader header = packObject.getHeader();
            InlongTcpSourceCallback callback = new InlongTcpSourceCallback(ctx, header);
            String inlongGroupId = header.getInlongGroupId();
            String inlongStreamId = header.getInlongStreamId();
            ProxyPackEvent packEvent = new ProxyPackEvent(inlongGroupId, inlongStreamId, events, (SourceCallback)callback);
            try {
                this.source.getCachedChProcessor().processEvent((Event)packEvent);
                events.forEach(event -> {
                    this.source.addMetric(true, event.getBody().length, (Event)event);
                    this.source.fileMetricIncSumStats("msg.post.v1.success");
                });
                boolean awaitResult = callback.getLatch().await(CommonConfigHolder.getInstance().getMaxResAfterSaveTimeout(), TimeUnit.MILLISECONDS);
                if (!awaitResult && !callback.getHasResponsed().getAndSet(true)) {
                    this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, packObject);
                }
            }
            catch (Throwable ex) {
                logger.error("Process Controller Event error can't write event to channel.", ex);
                events.forEach(event -> {
                    this.source.addMetric(false, event.getBody().length, (Event)event);
                    this.source.fileMetricIncSumStats("msg.post.v1.dropped");
                });
                if (callback.getHasResponsed().getAndSet(true)) break block3;
                this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, packObject);
            }
        }
    }

    private void processAndResponse(ChannelHandlerContext ctx, ProxySdk.MessagePack packObject, List<ProxyEvent> events) throws Exception {
        for (ProxyEvent event : events) {
            String topic = configManager.getTopicName(event.getInlongGroupId(), event.getInlongStreamId());
            if (StringUtils.isEmpty((CharSequence)topic)) {
                this.source.fileMetricIncWithDetailStats("source.topic.missing", event.getInlongGroupId());
                this.source.addMetric(false, event.getBody().length, (Event)event);
                this.responsePackage(ctx, ProxySdk.ResultCode.ERR_ID_ERROR, packObject);
                return;
            }
            event.setTopic(topic);
            try {
                this.source.getCachedChProcessor().processEvent((Event)event);
                this.source.addMetric(true, event.getBody().length, (Event)event);
                this.source.fileMetricIncSumStats("msg.post.v1.success");
            }
            catch (Throwable ex) {
                logger.error("Process Controller Event error can't write event to channel.", ex);
                this.source.addMetric(false, event.getBody().length, (Event)event);
                this.responsePackage(ctx, ProxySdk.ResultCode.ERR_REJECT, packObject);
                this.source.fileMetricIncSumStats("msg.post.v1.dropped");
                return;
            }
        }
        this.responsePackage(ctx, ProxySdk.ResultCode.SUCCUSS, packObject);
    }

    private void responseV0Msg(Channel channel, AbsV0MsgCodec msgObj, StringBuilder strBuff) throws Exception {
        MsgType msgType;
        if (channel == null || !channel.isWritable()) {
            this.source.fileMetricIncSumStats("link.unwritable");
            if (logCounter.shouldPrint()) {
                logger.warn("Prepare send msg but channel full, msgType={}, attr={}, channel={}", new Object[]{msgObj.getMsgType(), msgObj.getAttr(), channel});
            }
            throw new ChannelUnWritableException("Prepare send msg but channel full");
        }
        if (!msgObj.isNeedResp()) {
            return;
        }
        strBuff.append("dpIp").append("=").append(this.source.getSrcHost());
        if (msgObj.getErrCode() != DataProxyErrCode.SUCCESS) {
            strBuff.append("&").append("errCode").append("=").append(msgObj.getErrCode().getErrCodeStr());
            if (StringUtils.isNotEmpty((CharSequence)msgObj.getErrMsg())) {
                strBuff.append("&").append("errMsg").append("=").append(msgObj.getErrMsg());
            }
        }
        if (StringUtils.isNotEmpty((CharSequence)msgObj.getAttr())) {
            strBuff.append("&").append(msgObj.getAttr());
        }
        ByteBuf retData = MsgType.MSG_BIN_MULTI_BODY.equals((Object)(msgType = MsgType.valueOf((int)msgObj.getMsgType()))) ? ServerMessageHandler.buildBinMsgRspPackage(strBuff.toString(), msgObj.getUniq()) : this.buildTxtMsgRspPackage(msgType, strBuff.toString(), msgObj);
        strBuff.delete(0, strBuff.length());
        this.flushV0MsgPackage(this.source, channel, retData, msgObj.getAttr());
    }

    private void procBinHeartbeatMsg(BaseSource source, Channel channel, ByteBuf cb, int totalDataLen) throws Exception {
        short attrLen;
        int bodyLen;
        if (totalDataLen < 14) {
            source.fileMetricIncSumStats("msg.hb.totallen.belowmin");
            String errMsg = String.format("Malformed msg, totalDataLen(%d) < min hb-msg length(%d)", totalDataLen, 14);
            if (logger.isDebugEnabled()) {
                logger.debug(errMsg + ", channel {}", (Object)channel);
            }
            throw new PkgParseException(errMsg);
        }
        int msgHeadPos = cb.readerIndex() - 5;
        int msgMagic = cb.getUnsignedShort(msgHeadPos + 14 + (bodyLen = cb.getInt(msgHeadPos + 10)) + 2 + (attrLen = cb.getShort(msgHeadPos + 14 + bodyLen)));
        if (msgMagic != 60929) {
            source.fileMetricIncSumStats("msg.hb.magic.unequal");
            String errMsg = String.format("Malformed msg, msgMagic(%d) != %d", msgMagic, 60929);
            if (logger.isDebugEnabled()) {
                logger.debug(errMsg + ", channel {}", (Object)channel);
            }
            throw new PkgParseException(errMsg);
        }
        if (totalDataLen + 4 < bodyLen + attrLen + 18) {
            source.fileMetricIncSumStats("msg.hb.len.malformed");
            String errMsg = String.format("Malformed msg, bodyLen(%d) + attrLen(%d) > totalDataLen(%d)", bodyLen, (int)attrLen, totalDataLen);
            if (logger.isDebugEnabled()) {
                logger.debug(errMsg + ", channel {}", (Object)channel);
            }
            throw new PkgParseException(errMsg);
        }
        byte version = cb.getByte(msgHeadPos + 9);
        byte[] attrData = null;
        if (attrLen > 0) {
            attrData = new byte[attrLen];
            cb.getBytes(msgHeadPos + 14 + bodyLen + 2, attrData, 0, (int)attrLen);
        }
        this.flushV0MsgPackage(source, channel, this.buildHBRspPackage(attrData, version, 0), MsgType.MSG_BIN_HEARTBEAT.name());
    }

    public static ByteBuf buildBinMsgRspPackage(String attrs, long uniqVal) {
        int binTotalLen = 9;
        if (null != attrs) {
            binTotalLen += attrs.length();
        }
        ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
        binBuffer.writeInt(binTotalLen);
        binBuffer.writeByte(MsgType.MSG_BIN_MULTI_BODY.getValue());
        byte[] uniq = new byte[]{(byte)(uniqVal >> 24 & 0xFFL), (byte)(uniqVal >> 16 & 0xFFL), (byte)(uniqVal >> 8 & 0xFFL), (byte)(uniqVal & 0xFFL)};
        binBuffer.writeBytes(uniq);
        if (null != attrs) {
            binBuffer.writeShort(attrs.length());
            binBuffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
        } else {
            binBuffer.writeShort(0);
        }
        binBuffer.writeShort(60929);
        return binBuffer;
    }

    public static ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs) {
        int attrsLen = 0;
        int bodyLen = 0;
        if (attrs != null) {
            attrsLen = attrs.length();
        }
        int backTotalLen = 5 + bodyLen + 4 + attrsLen;
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
        buffer.writeInt(backTotalLen);
        buffer.writeByte(msgType.getValue());
        buffer.writeInt(bodyLen);
        buffer.writeInt(attrsLen);
        if (attrsLen > 0) {
            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
        }
        return buffer;
    }

    private ByteBuf buildTxtMsgRspPackage(MsgType msgType, String attrs, AbsV0MsgCodec msgObj) {
        int attrsLen = 0;
        int bodyLen = 0;
        byte[] backBody = null;
        if (attrs != null) {
            attrsLen = attrs.length();
        }
        if (MsgType.MSG_ORIGINAL_RETURN.equals((Object)msgType) && (backBody = msgObj.getOrigBody()) != null) {
            bodyLen = backBody.length;
        }
        int backTotalLen = 5 + bodyLen + 4 + attrsLen;
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + backTotalLen);
        buffer.writeInt(backTotalLen);
        buffer.writeByte(msgType.getValue());
        buffer.writeInt(bodyLen);
        if (bodyLen > 0) {
            buffer.writeBytes(backBody);
        }
        buffer.writeInt(attrsLen);
        if (attrsLen > 0) {
            buffer.writeBytes(attrs.getBytes(StandardCharsets.UTF_8));
        }
        return buffer;
    }

    private ByteBuf buildHBRspPackage(byte[] attrData, byte version, int loadValue) {
        int binTotalLen = 16;
        if (null != attrData) {
            binTotalLen += attrData.length;
        }
        if (loadValue == 0 || loadValue == -1) {
            loadValue = 65535;
        }
        ByteBuf binBuffer = ByteBufAllocator.DEFAULT.buffer(4 + binTotalLen);
        binBuffer.writeInt(binTotalLen);
        binBuffer.writeByte(MsgType.MSG_BIN_HEARTBEAT.getValue());
        binBuffer.writeInt((int)(System.currentTimeMillis() / 1000L));
        binBuffer.writeByte((int)version);
        binBuffer.writeInt(2);
        binBuffer.writeShort(loadValue);
        if (null != attrData) {
            binBuffer.writeShort(attrData.length);
            binBuffer.writeBytes(attrData);
        } else {
            binBuffer.writeShort(0);
        }
        binBuffer.writeShort(60929);
        return binBuffer;
    }

    private ByteBuf buildHeartBeatMsgRspPackage() {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(5);
        buffer.writeBytes(new byte[]{0, 0, 0, 1, 1});
        return buffer;
    }

    private void flushV0MsgPackage(BaseSource source, Channel channel, ByteBuf binBuffer, String orgAttr) throws Exception {
        if (channel == null || !channel.isWritable()) {
            binBuffer.release();
            source.fileMetricIncSumStats("link.unwritable");
            if (logCounter.shouldPrint()) {
                logger.warn("Send msg but channel full, attr={}, channel={}", (Object)orgAttr, (Object)channel);
            }
            throw new ChannelUnWritableException("Send response but channel full");
        }
        channel.writeAndFlush((Object)binBuffer);
    }
}

