/*
 * Decompiled with CFR 0.152.
 */
package com.refinitiv.eta.valueadd.reactor;

import com.refinitiv.eta.codec.Buffer;
import com.refinitiv.eta.codec.CodecFactory;
import com.refinitiv.eta.codec.DecodeIterator;
import com.refinitiv.eta.codec.EncodeIterator;
import com.refinitiv.eta.codec.GenericMsg;
import com.refinitiv.eta.codec.Msg;
import com.refinitiv.eta.codec.RefreshMsg;
import com.refinitiv.eta.codec.State;
import com.refinitiv.eta.codec.StatusMsg;
import com.refinitiv.eta.transport.Error;
import com.refinitiv.eta.transport.TransportBuffer;
import com.refinitiv.eta.transport.TransportFactory;
import com.refinitiv.eta.valueadd.domainrep.rdm.queue.QueueMsgFactory;
import com.refinitiv.eta.valueadd.domainrep.rdm.queue.QueueRequest;
import com.refinitiv.eta.valueadd.reactor.QueueAckImpl;
import com.refinitiv.eta.valueadd.reactor.QueueCloseImpl;
import com.refinitiv.eta.valueadd.reactor.QueueDataExpiredImpl;
import com.refinitiv.eta.valueadd.reactor.QueueDataImpl;
import com.refinitiv.eta.valueadd.reactor.QueueRefreshImpl;
import com.refinitiv.eta.valueadd.reactor.QueueRequestImpl;
import com.refinitiv.eta.valueadd.reactor.QueueStatusImpl;
import com.refinitiv.eta.valueadd.reactor.TunnelStream;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamBuffer;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamMsg;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamPersistenceBuffer;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamPersistenceFile;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamPersistenceFileV1;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamPersistenceFileV2;
import com.refinitiv.eta.valueadd.reactor.TunnelStreamUtil;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.charset.Charset;

class TunnelSubstream {
    TunnelStream _tunnelStream;
    TunnelStreamPersistenceFile _persistFile;
    QueueRequestImpl _queueRequest;
    QueueRefreshImpl _queueRefresh;
    QueueStatusImpl _queueStatus;
    QueueAckImpl _queueAck;
    QueueDataImpl _queueData;
    QueueDataExpiredImpl _queueDataExpired;
    QueueCloseImpl _queueClose;
    GenericMsg _genericMsg;
    Msg _msg;
    Msg _substreamMsg;
    DecodeIterator _dIter;
    int _streamId;
    int _domainType;
    int _serviceId;
    Buffer _queueName;
    int _lastOutSeqNum;
    int _lastInSeqNum;
    EncodeIterator _encIter;
    DecodeIterator _decIter;
    TunnelSubstreamState _state;
    Msg _encSubMsg;
    Error _error;
    ByteBuffer _byteBuffer;

    TunnelSubstream(Buffer queueName, int streamId, int domainType, int serviceId, TunnelStream tunnelStreamHandler, Error error) {
        this._tunnelStream = tunnelStreamHandler;
        this._queueName = CodecFactory.createBuffer();
        this._queueName.data(ByteBuffer.allocateDirect(queueName.length()));
        queueName.copy(this._queueName);
        this._streamId = streamId;
        this._domainType = domainType;
        this._serviceId = serviceId;
        this._queueRequest = (QueueRequestImpl)QueueMsgFactory.createQueueRequest();
        this._queueRefresh = (QueueRefreshImpl)QueueMsgFactory.createQueueRefresh();
        this._queueStatus = (QueueStatusImpl)QueueMsgFactory.createQueueStatus();
        this._queueAck = (QueueAckImpl)QueueMsgFactory.createQueueAck();
        this._queueData = (QueueDataImpl)QueueMsgFactory.createQueueData();
        this._queueDataExpired = (QueueDataExpiredImpl)QueueMsgFactory.createQueueDataExpired();
        this._queueClose = (QueueCloseImpl)QueueMsgFactory.createQueueClose();
        this._substreamMsg = CodecFactory.createMsg();
        this._msg = CodecFactory.createMsg();
        this._dIter = CodecFactory.createDecodeIterator();
        error.errorId(0);
        this._state = TunnelSubstreamState.NOT_OPEN;
        this._encIter = CodecFactory.createEncodeIterator();
        this._decIter = CodecFactory.createDecodeIterator();
        this._encSubMsg = CodecFactory.createMsg();
        this._error = TransportFactory.createError();
        this._genericMsg = (GenericMsg)CodecFactory.createMsg();
        this._genericMsg.msgClass(7);
        this._byteBuffer = ByteBuffer.allocateDirect(4);
    }

    TunnelSubstream(Buffer queueName, int streamId, int domainType, int serviceId, String filePath, TunnelStream tunnelStreamHandler, Error error) {
        this(queueName, streamId, domainType, serviceId, tunnelStreamHandler, error);
        int fileVersion;
        FileLock fileLock;
        FileChannel fileChannel;
        RandomAccessFile file;
        if (error.errorId() != 0) {
            return;
        }
        this._tunnelStream = tunnelStreamHandler;
        byte[] queueBytes = new byte[queueName.length()];
        int queuePos = queueName.position();
        boolean reset = this._tunnelStream.forceFileReset();
        for (int i = 0; i < queueName.length(); ++i) {
            queueBytes[i] = queueName.data().get(queuePos + i);
        }
        String queueString = new String(queueBytes, Charset.forName("UTF-8"));
        File queueFile = new File(filePath, queueString);
        if (!queueFile.exists()) {
            reset = true;
        }
        try {
            file = new RandomAccessFile(queueFile, "rw");
            fileChannel = file.getChannel();
            try {
                fileLock = fileChannel.tryLock();
                if (fileLock == null) {
                    fileChannel.close();
                    file.close();
                    error.errorId(-1);
                    error.text("Persistence file already in use.");
                    return;
                }
            }
            catch (OverlappingFileLockException e) {
                fileChannel.close();
                file.close();
                error.errorId(-1);
                error.text("Persistence file already in use.");
                return;
            }
        }
        catch (IOException e) {
            error.errorId(-1);
            error.text("Failed to open persistence file.");
            return;
        }
        if (!reset) {
            this._byteBuffer.position(0);
            try {
                fileChannel.read(this._byteBuffer, 0L);
            }
            catch (IOException e) {
                error.errorId(-1);
                error.text("Caught IOException while reading persistence file version.");
            }
            fileVersion = this._byteBuffer.getInt(0);
        } else {
            fileVersion = TunnelStreamPersistenceFile.defaultPersistenceVersion();
        }
        switch (fileVersion) {
            case 0x2000000: {
                this._persistFile = new TunnelStreamPersistenceFileV2(this, file, fileChannel, fileLock, this._msg, this._encIter, this._dIter, reset, error);
                if (error.errorId() == 0) break;
                this._persistFile = null;
                return;
            }
            case 1: {
                this._persistFile = new TunnelStreamPersistenceFileV1(this, file, fileChannel, fileLock, this._msg, this._encIter, this._dIter, reset, error);
                if (error.errorId() == 0) break;
                this._persistFile = null;
                return;
            }
            default: {
                try {
                    fileChannel.close();
                    file.close();
                    error.text("Invalid persistence file version.");
                    error.errorId(-1);
                    return;
                }
                catch (IOException e) {
                    error.text("Caught IOException while closing persistence file (due to Invalid persistence file version).");
                    error.errorId(-1);
                    return;
                }
            }
        }
    }

    int lastOutSeqNum() {
        return this._lastOutSeqNum;
    }

    void lastOutSeqNum(int lastInSeqNum) {
        this._lastOutSeqNum = lastInSeqNum;
        if (this._persistFile != null) {
            this._persistFile.lastOutSeqNum(lastInSeqNum);
        }
    }

    int lastInSeqNum() {
        return this._lastInSeqNum;
    }

    void lastInSeqNum(int lastInSeqNum) {
        this._lastInSeqNum = lastInSeqNum;
        if (this._persistFile != null) {
            this._persistFile.lastInSeqNum(lastInSeqNum);
        }
    }

    int close(Error error) {
        this._state = TunnelSubstreamState.NOT_OPEN;
        if (this._persistFile != null) {
            if (this._persistFile.close(error) != 0) {
                return error.errorId();
            }
            this._persistFile = null;
        }
        return 0;
    }

    void sendQueueAckToListener(TunnelStreamBuffer tunnelBuffer) {
        tunnelBuffer.setAsInnerReadBuffer();
        this._decIter.clear();
        this._decIter.setBufferAndRWFVersion((TransportBuffer)tunnelBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        this._genericMsg.decode(this._decIter);
        this._queueData.decode(this._decIter, (Msg)this._genericMsg);
        this._queueAck.clear();
        this._queueAck.sourceName().data(this._queueData.destName().data(), this._queueData.destName().position(), this._queueData.destName().length());
        this._queueAck.destName().data(this._queueData.sourceName().data(), this._queueData.sourceName().position(), this._queueData.sourceName().length());
        this._queueAck.domainType(this._queueData.domainType());
        this._queueAck.identifier(this._queueData.identifier());
        this._queueAck.serviceId(this._queueData.serviceId());
        this._queueAck.streamId(this._streamId);
        TunnelStreamBuffer tunnelWriteBuffer = this._tunnelStream.getBuffer(this._queueAck.ackMsgBufferSize(), false, false, this._error);
        this._encIter.clear();
        this._encIter.setBufferAndRWFVersion((TransportBuffer)tunnelWriteBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        this._queueAck.encode(this._encIter);
        this._decIter.clear();
        this._decIter.setBufferAndRWFVersion((TransportBuffer)tunnelWriteBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        this._genericMsg.clear();
        this._genericMsg.decode(this._decIter);
        this._tunnelStream.queueMsgAcknowledged(this._queueAck, (Msg)this._genericMsg);
        this._tunnelStream.releaseBuffer(tunnelWriteBuffer, this._error);
    }

    Buffer queueName() {
        return this._queueName;
    }

    int streamId() {
        return this._streamId;
    }

    int domainType() {
        return this._domainType;
    }

    int serviceId() {
        return this._serviceId;
    }

    int sendSubstreamRequest(QueueRequest queueRequest, Error error) {
        this._queueRequest.clear();
        this._queueRequest.streamId(queueRequest.streamId());
        this._queueRequest.domainType(queueRequest.domainType());
        this._queueRequest.serviceId(this._tunnelStream.serviceId());
        this._queueRequest.lastOutSeqNum(this.lastOutSeqNum());
        this._queueRequest.lastInSeqNum(this.lastInSeqNum());
        this._queueRequest.sourceName(queueRequest.sourceName());
        this._queueRequest.opCode(((QueueRequestImpl)queueRequest).opCode());
        TunnelStreamBuffer tunnelBuffer = this._tunnelStream.getBuffer(128 + this._queueRequest.requestMsgBufferSize(), false, true, error);
        if (tunnelBuffer == null) {
            return error.errorId();
        }
        this._encIter.clear();
        this._encIter.setBufferAndRWFVersion((TransportBuffer)tunnelBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        int ret = this._queueRequest.encode(this._encIter);
        if (ret < 0) {
            this._tunnelStream.releaseBuffer(tunnelBuffer, error);
            error.errorId(ret);
            error.text("Substream request encode failed");
            return -1;
        }
        tunnelBuffer.setCurrentPositionAsEndOfEncoding();
        if ((this._tunnelStream._traceFlags & 1) > 0) {
            System.out.println("<!-- TunnelTrace: Sending substream request. -->");
        }
        this._tunnelStream._outboundTransmitList.push(tunnelBuffer, TunnelStreamBuffer.RETRANS_LINK);
        this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
        this._state = TunnelSubstreamState.WAITING_SUBSTREAM_REFRESH;
        error.errorId(0);
        return 0;
    }

    int readMsg(Msg deliveredMsg, Error error) {
        switch (this._state) {
            case WAITING_SUBSTREAM_REFRESH: {
                if (deliveredMsg.containerType() != 141) {
                    error.errorId(-1);
                    error.text("Received unexpected container type " + deliveredMsg.containerType() + " while establishing substream.");
                    return -1;
                }
                this._decIter.clear();
                this._decIter.setBufferAndRWFVersion(deliveredMsg.encodedDataBody(), this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                int ret = this._encSubMsg.decode(this._decIter);
                if (ret < 0) {
                    error.errorId(ret);
                    error.text("Failed to decode substream Msg.");
                    return -1;
                }
                switch (this._encSubMsg.msgClass()) {
                    case 2: {
                        RefreshMsg refreshMsg = (RefreshMsg)this._encSubMsg;
                        State state = refreshMsg.state();
                        switch (state.streamState()) {
                            case 1: {
                                if (state.dataState() != 1) {
                                    return 0;
                                }
                                this._state = TunnelSubstreamState.OPEN;
                                break;
                            }
                            default: {
                                this._tunnelStream._streamIdtoQueueSubstreamTable.remove(this._streamId);
                                ret = this.close(error);
                                if (ret == 0) break;
                                return ret;
                            }
                        }
                        ret = this._queueRefresh.decode(this._decIter, (Msg)refreshMsg);
                        if (ret != 0) {
                            error.errorId(ret);
                            error.text("Failed to decode substream refresh header.");
                            return -1;
                        }
                        if (state.streamState() != 1) {
                            this._tunnelStream.queueMsgReceived(this._queueRefresh, (Msg)refreshMsg);
                            return 0;
                        }
                        if (this._persistFile != null && (ret = this._persistFile.retransmitBuffers(this._queueRefresh.lastInSeqNum(), this._msg, this._encIter, this._dIter, error)) != 0) {
                            return ret;
                        }
                        this.lastOutSeqNum(this._queueRefresh.lastInSeqNum());
                        if (this.lastInSeqNum() == 0 || this._queueRefresh.lastOutSeqNum() == 0) {
                            this.lastInSeqNum(this._queueRefresh.lastOutSeqNum());
                        }
                        this._tunnelStream.queueMsgReceived(this._queueRefresh, (Msg)refreshMsg);
                        if (state.streamState() != 1) {
                            return 0;
                        }
                        if (this._persistFile == null || (ret = this._persistFile.sendLocalQueueAcks(this._encIter, this._decIter, error)) == 0) break;
                        return ret;
                    }
                    case 3: {
                        StatusMsg statusMsg = (StatusMsg)this._encSubMsg;
                        State state = statusMsg.state();
                        if (statusMsg.checkHasState()) {
                            switch (state.streamState()) {
                                case 1: {
                                    if (state.dataState() == 1) break;
                                    return 0;
                                }
                                default: {
                                    this._tunnelStream._streamIdtoQueueSubstreamTable.remove(this._streamId);
                                    ret = this.close(error);
                                    if (ret == 0) break;
                                    return ret;
                                }
                            }
                            if ((ret = this._queueStatus.decode(this._decIter, (Msg)statusMsg)) != 0) {
                                error.errorId(ret);
                                error.text("Failed to decode substream refresh header.");
                                return -1;
                            }
                            this._tunnelStream.queueMsgReceived(this._queueStatus, (Msg)statusMsg);
                            if (state.streamState() == 1) break;
                            return 0;
                        }
                        this._queueStatus.clear();
                        this._queueStatus.streamId(statusMsg.streamId());
                        this._queueStatus.domainType(statusMsg.domainType());
                        this._tunnelStream.queueMsgReceived(this._queueStatus, (Msg)statusMsg);
                        break;
                    }
                    default: {
                        error.errorId(-1);
                        error.text("Received unexpected substream MsgClass " + this._encSubMsg.msgClass() + " while establishing substream.");
                        return -1;
                    }
                }
                if ((this._tunnelStream._traceFlags & 1) > 0) {
                    System.out.println("<!-- TunnelTrace: Substream established on stream " + deliveredMsg.streamId() + ", queue ready -->");
                }
                this._tunnelStream._recvLastSeqNum = ((TunnelStreamMsg.TunnelStreamData)((Object)this._tunnelStream._tunnelStreamMsg)).seqNum();
                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                return 0;
            }
            case OPEN: {
                if (deliveredMsg.containerType() != 141) {
                    error.errorId(-1);
                    error.text("Unexpected container type: " + deliveredMsg.containerType());
                    return -1;
                }
                this._decIter.clear();
                this._decIter.setBufferAndRWFVersion(deliveredMsg.encodedDataBody(), this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                int ret = this._encSubMsg.decode(this._decIter);
                if (ret < 0) {
                    error.errorId(ret);
                    error.text("Failed to decode substream message.");
                    return -1;
                }
                block15 : switch (this._encSubMsg.msgClass()) {
                    case 7: {
                        int opcode = this.getSubstreamOpcode(this._encSubMsg);
                        switch (opcode) {
                            case 1: {
                                ret = this._queueData.decode(this._decIter, this._encSubMsg);
                                if (ret != 0) {
                                    error.errorId(ret);
                                    error.text("Substream header decode failed");
                                    return -1;
                                }
                                this.lastInSeqNum(this._queueData.seqNum());
                                error.errorId(0);
                                this._tunnelStream.queueMsgReceived(this._queueData, this._encSubMsg);
                                this._queueAck.clear();
                                this._queueAck.streamId(this.streamId());
                                this._queueAck.domainType(this.domainType());
                                this._queueAck.serviceId(this._tunnelStream.serviceId());
                                this._queueAck.seqNum(this._queueData.seqNum());
                                this._queueAck.sourceName().data(this._queueData.destName().data().duplicate(), this._queueData.destName().position(), this._queueData.destName().length());
                                this._queueAck.destName().data(this._queueData.sourceName().data().duplicate(), this._queueData.sourceName().position(), this._queueData.sourceName().length());
                                this._queueAck.identifier(this._queueData.identifier());
                                TunnelStreamBuffer ackBuffer = this._tunnelStream.getBuffer(this._queueAck.ackMsgBufferSize(), false, true, error);
                                if (ackBuffer == null) {
                                    return error.errorId();
                                }
                                ackBuffer.isApplicationBuffer(false);
                                this._encIter.clear();
                                this._encIter.setBufferAndRWFVersion((TransportBuffer)ackBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                                ret = this._queueAck.encode(this._encIter);
                                if (ret != 0) {
                                    this._tunnelStream.releaseBuffer(ackBuffer, error);
                                    error.errorId(ret);
                                    error.text("Substream ack header encode failed.");
                                    return -1;
                                }
                                ackBuffer.setCurrentPositionAsEndOfEncoding();
                                this._tunnelStream._outboundTransmitList.push(ackBuffer, TunnelStreamBuffer.RETRANS_LINK);
                                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                                return 0;
                            }
                            case 4: {
                                ret = this._queueDataExpired.decode(this._decIter, this._encSubMsg);
                                if (ret != 0) {
                                    error.errorId(ret);
                                    error.text("Substream header decode failed");
                                    return -1;
                                }
                                this.lastInSeqNum(this._queueDataExpired.seqNum());
                                error.errorId(0);
                                this._tunnelStream.queueMsgReceived(this._queueDataExpired, this._encSubMsg);
                                this._queueAck.clear();
                                this._queueAck.streamId(this.streamId());
                                this._queueAck.domainType(this.domainType());
                                this._queueAck.serviceId(this._tunnelStream.serviceId());
                                this._queueAck.seqNum(this._queueDataExpired.seqNum());
                                this._queueAck.sourceName().data(this.queueName().data());
                                this._queueAck.destName().data(this._queueDataExpired.sourceName().data(), this._queueDataExpired.sourceName().position(), this._queueDataExpired.sourceName().length());
                                this._queueAck.identifier(this._queueDataExpired.identifier());
                                TunnelStreamBuffer ackBuffer = this._tunnelStream.getBuffer(this._queueAck.ackMsgBufferSize(), false, true, error);
                                if (ackBuffer == null) {
                                    return error.errorId();
                                }
                                ackBuffer.isApplicationBuffer(false);
                                this._encIter.clear();
                                this._encIter.setBufferAndRWFVersion((TransportBuffer)ackBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
                                ret = this._queueAck.encode(this._encIter);
                                if (ret != 0) {
                                    this._tunnelStream.releaseBuffer(ackBuffer, error);
                                    error.errorId(ret);
                                    error.text("Substream ack header encode failed.");
                                    return -1;
                                }
                                ackBuffer.setCurrentPositionAsEndOfEncoding();
                                this._tunnelStream._outboundTransmitList.push(ackBuffer, TunnelStreamBuffer.RETRANS_LINK);
                                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                                return 0;
                            }
                            case 2: {
                                ret = this._queueAck.decode(this._decIter, this._encSubMsg);
                                if (ret != 0) {
                                    error.errorId(ret);
                                    error.text("Substream header decode failed");
                                    return -1;
                                }
                                int seqNum = this._queueAck.seqNum();
                                this._tunnelStream.queueMsgAcknowledged(this._queueAck, this._encSubMsg);
                                if (this._persistFile == null) break block15;
                                this._persistFile.releasePersistenceBuffers(seqNum);
                                break block15;
                            }
                            case 3: {
                                break block15;
                            }
                            default: {
                                error.errorId(-1);
                                error.text("Unhandled substream header opcode.");
                                return -1;
                            }
                        }
                    }
                    default: {
                        error.errorId(-1);
                        error.text("Unhandled substream message class.");
                        return -1;
                    }
                }
                this._tunnelStream.tunnelStreamManager().addTunnelStreamToDispatchList(this._tunnelStream);
                return 0;
            }
            case NOT_OPEN: {
                error.errorId(0);
                return 0;
            }
        }
        error.errorId(-1);
        error.text("Unknown queue substream state.");
        return -1;
    }

    int saveMsg(TunnelStreamBuffer buffer, Error error) {
        if (this._persistFile != null) {
            return this._persistFile.saveMsg(buffer, error);
        }
        buffer.persistenceBuffer(this, null);
        return 0;
    }

    int setBufferAsTransmitted(TunnelStreamBuffer tunnelBuffer, Error error) {
        int seqNum = this.lastOutSeqNum() + 1;
        TunnelStreamPersistenceBuffer persistenceBuffer = tunnelBuffer.persistenceBuffer();
        if (persistenceBuffer != null) {
            this._persistFile.setBufferAsTransmitted(persistenceBuffer);
            tunnelBuffer.persistenceBuffer(null, null);
        } else {
            seqNum = this.lastOutSeqNum() + 1;
        }
        this.lastOutSeqNum(seqNum);
        tunnelBuffer.setAsInnerReadBuffer();
        this._encIter.clear();
        this._encIter.setBufferAndRWFVersion((TransportBuffer)tunnelBuffer, this._tunnelStream.classOfService().common().protocolMajorVersion(), this._tunnelStream.classOfService().common().protocolMinorVersion());
        int ret = this._encIter.replaceSeqNum((long)seqNum);
        if (ret != 0) {
            error.errorId(ret);
            error.text("Failed to update sequence number on substream message.");
            return -1;
        }
        if (!tunnelBuffer.timeoutIsCode()) {
            long newTimeoutNsec = tunnelBuffer.timeoutNsec() - System.nanoTime();
            if (newTimeoutNsec < 1000000L) {
                newTimeoutNsec = 1000000L;
            }
            TunnelStreamUtil.replaceQueueDataTimeout(tunnelBuffer.data(), newTimeoutNsec / 1000000L);
        }
        tunnelBuffer.setToFullWritebuffer();
        return 0;
    }

    void releasePersistenceBuffer(TunnelStreamPersistenceBuffer persistBuffer) {
        if (this._persistFile != null) {
            this._persistFile.releasePersistenceBuffer(persistBuffer);
        }
    }

    private int getSubstreamOpcode(Msg substreamMsg) {
        int opcode = 0;
        if (substreamMsg.extendedHeader() != null) {
            opcode = substreamMsg.extendedHeader().data().get(substreamMsg.extendedHeader().position());
        }
        return opcode;
    }

    static enum TunnelSubstreamState {
        NOT_OPEN,
        WAITING_SUBSTREAM_REFRESH,
        OPEN;

    }
}

