/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.library;

import io.aeron.ControlledFragmentAssembler;
import io.aeron.Subscription;
import io.aeron.logbuffer.ControlledFragmentHandler;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.status.AtomicCounter;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.FixCounters;
import uk.co.real_logic.artio.LivenessDetector;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.Pressure;
import uk.co.real_logic.artio.Reply;
import uk.co.real_logic.artio.builder.SessionHeaderEncoder;
import uk.co.real_logic.artio.dictionary.FixDictionary;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.library.FollowerSessionReply;
import uk.co.real_logic.artio.library.InitiateSessionReply;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import uk.co.real_logic.artio.library.LibraryReply;
import uk.co.real_logic.artio.library.LibraryTransport;
import uk.co.real_logic.artio.library.ReleaseToGatewayReply;
import uk.co.real_logic.artio.library.RequestSessionReply;
import uk.co.real_logic.artio.library.SentPositionHandler;
import uk.co.real_logic.artio.library.SessionConfiguration;
import uk.co.real_logic.artio.library.SessionExistsHandler;
import uk.co.real_logic.artio.library.SessionSubscriber;
import uk.co.real_logic.artio.messages.ConnectionType;
import uk.co.real_logic.artio.messages.ControlNotificationDecoder;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.messages.GatewayError;
import uk.co.real_logic.artio.messages.MessageStatus;
import uk.co.real_logic.artio.messages.SessionReplyStatus;
import uk.co.real_logic.artio.messages.SessionState;
import uk.co.real_logic.artio.messages.SessionStatus;
import uk.co.real_logic.artio.messages.SlowStatus;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.protocol.LibraryEndPointHandler;
import uk.co.real_logic.artio.protocol.LibraryProtocolSubscription;
import uk.co.real_logic.artio.protocol.NotConnectedException;
import uk.co.real_logic.artio.protocol.ProtocolHandler;
import uk.co.real_logic.artio.protocol.ProtocolSubscription;
import uk.co.real_logic.artio.session.AcceptorSession;
import uk.co.real_logic.artio.session.CompositeKey;
import uk.co.real_logic.artio.session.InitiatorSession;
import uk.co.real_logic.artio.session.InternalSession;
import uk.co.real_logic.artio.session.Session;
import uk.co.real_logic.artio.session.SessionIdStrategy;
import uk.co.real_logic.artio.session.SessionParser;
import uk.co.real_logic.artio.session.SessionProxy;
import uk.co.real_logic.artio.session.SessionWriter;
import uk.co.real_logic.artio.timing.LibraryTimers;
import uk.co.real_logic.artio.timing.Timer;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;
import uk.co.real_logic.artio.validation.MessageValidationStrategy;

final class LibraryPoller
implements LibraryEndPointHandler,
ProtocolHandler,
AutoCloseable {
    private static final int CONNECTED = 0;
    private static final int ATTEMPT_CONNECT = 1;
    private static final int CONNECTING = 2;
    private static final int ATTEMPT_CURRENT_NODE = 3;
    private static final int CLOSED = 4;
    private static final int ENGINE_CLOSE = 5;
    private final Long2ObjectHashMap<SessionSubscriber> connectionIdToSession = new Long2ObjectHashMap();
    private InternalSession[] sessions = new InternalSession[0];
    private InternalSession[] pendingInitiatorSessions = new InternalSession[0];
    private final List<Session> unmodifiableSessions = new AbstractList<Session>(){

        @Override
        public Session get(int index) {
            return LibraryPoller.this.sessions[index];
        }

        @Override
        public int size() {
            return LibraryPoller.this.sessions.length;
        }
    };
    private final LongHashSet sessionIds = new LongHashSet();
    private final int libraryId;
    private final EpochClock epochClock;
    private final LibraryConfiguration configuration;
    private final SessionIdStrategy sessionIdStrategy;
    private final Timer sessionTimer;
    private final Timer receiveTimer;
    private final SessionExistsHandler sessionExistsHandler;
    private final SentPositionHandler sentPositionHandler;
    private final boolean enginesAreClustered;
    private final FixCounters fixCounters;
    private final Long2ObjectHashMap<LibraryReply<?>> correlationIdToReply = new Long2ObjectHashMap();
    private final List<BooleanSupplier> tasks = new ArrayList<BooleanSupplier>();
    private final LibraryTransport transport;
    private final FixLibrary fixLibrary;
    private final Runnable onDisconnectFunc = this::onDisconnect;
    private long currentCorrelationId = ThreadLocalRandom.current().nextLong(1L, Long.MAX_VALUE);
    private int state = 2;
    private LivenessDetector livenessDetector;
    private Subscription inboundSubscription;
    private GatewayPublication outboundPublication;
    private String currentAeronChannel;
    private long nextSendLibraryConnectTime;
    private long nextEngineAttemptTime;
    private long connectCorrelationId = 0L;
    private volatile Throwable remoteThrowable;
    private int sessionLogoutIndex = 0;
    private final ControlledFragmentHandler outboundSubscription = new ControlledFragmentAssembler(ProtocolSubscription.of(this, new LibraryProtocolSubscription(this)));

    LibraryPoller(LibraryConfiguration configuration, LibraryTimers timers, FixCounters fixCounters, LibraryTransport transport, FixLibrary fixLibrary, EpochClock epochClock) {
        this.libraryId = configuration.libraryId();
        this.fixCounters = fixCounters;
        this.transport = transport;
        this.fixLibrary = fixLibrary;
        this.sessionTimer = timers.sessionTimer();
        this.receiveTimer = timers.receiveTimer();
        this.configuration = configuration;
        this.sessionIdStrategy = configuration.sessionIdStrategy();
        this.sessionExistsHandler = configuration.sessionExistsHandler();
        this.sentPositionHandler = configuration.sentPositionHandler();
        this.epochClock = epochClock;
        this.enginesAreClustered = configuration.libraryAeronChannels().size() > 1;
    }

    boolean isConnected() {
        return this.state == 0;
    }

    boolean isClosed() {
        return this.state == 4;
    }

    boolean isAtEndOfDay() {
        return this.state == 5;
    }

    int libraryId() {
        return this.libraryId;
    }

    long connectCorrelationId() {
        return this.connectCorrelationId;
    }

    List<Session> sessions() {
        return this.unmodifiableSessions;
    }

    Reply<Session> initiate(SessionConfiguration configuration) {
        Objects.requireNonNull(configuration, "configuration");
        this.validateEndOfDay();
        return new InitiateSessionReply(this, this.timeInMs() + configuration.timeoutInMs(), configuration);
    }

    Reply<SessionReplyStatus> releaseToGateway(Session session, long timeoutInMs) {
        Objects.requireNonNull(session, "session");
        this.validateEndOfDay();
        return new ReleaseToGatewayReply(this, this.timeInMs() + timeoutInMs, (InternalSession)session);
    }

    Reply<SessionReplyStatus> requestSession(long sessionId, int resendFromSequenceNumber, int resendFromSequenceIndex, long timeoutInMs) {
        this.validateEndOfDay();
        return new RequestSessionReply(this, this.timeInMs() + timeoutInMs, sessionId, resendFromSequenceNumber, resendFromSequenceIndex);
    }

    SessionWriter followerSession(long id, long connectionId, int sequenceIndex) {
        this.checkState();
        return new SessionWriter(this.libraryId, id, connectionId, this.sessionBuffer(), this.outboundPublication, sequenceIndex);
    }

    Reply<SessionWriter> followerSession(SessionHeaderEncoder headerEncoder, long timeoutInMs) {
        this.validateEndOfDay();
        return new FollowerSessionReply(this, this.timeInMs() + timeoutInMs, headerEncoder);
    }

    void disableSession(InternalSession session) {
        this.sessions = (InternalSession[])ArrayUtil.remove((Object[])this.sessions, (Object)session);
        session.disable();
    }

    long saveReleaseSession(Session session, long correlationId) {
        this.checkState();
        return this.outboundPublication.saveReleaseSession(this.libraryId, session.connectionId(), session.id(), correlationId, session.state(), session.awaitingResend(), session.heartbeatIntervalInMs(), session.lastSentMsgSeqNum(), session.lastReceivedMsgSeqNum(), session.username(), session.password());
    }

    long saveInitiateConnection(String host, int port, long correlationId, SessionConfiguration configuration) {
        this.checkState();
        return this.outboundPublication.saveInitiateConnection(this.libraryId, host, port, configuration.senderCompId(), configuration.senderSubId(), configuration.senderLocationId(), configuration.targetCompId(), configuration.targetSubId(), configuration.targetLocationId(), configuration.sequenceNumberType(), configuration.resetSeqNum(), configuration.initialReceivedSequenceNumber(), configuration.initialSentSequenceNumber(), configuration.closedResendInterval(), configuration.resendRequestChunkSize(), configuration.sendRedundantResendRequests(), configuration.enableLastMsgSeqNumProcessed(), configuration.username(), configuration.password(), configuration.fixDictionary(), this.configuration.defaultHeartbeatIntervalInS(), correlationId);
    }

    void onInitiatorSessionTimeout(long correlationId, long connectionId) {
        this.checkState();
        if (connectionId == -1L) {
            if (!this.saveMidConnectionDisconnect(correlationId)) {
                this.tasks.add(() -> this.saveMidConnectionDisconnect(correlationId));
            }
        } else if (!this.saveNoLogonRequestDisconnect(connectionId)) {
            this.tasks.add(() -> this.saveNoLogonRequestDisconnect(connectionId));
        }
    }

    private boolean saveMidConnectionDisconnect(long correlationId) {
        return this.outboundPublication.saveMidConnectionDisconnect(this.libraryId, correlationId) > 0L;
    }

    private boolean saveNoLogonRequestDisconnect(long connectionId) {
        return this.outboundPublication.saveRequestDisconnect(this.libraryId, connectionId, DisconnectReason.NO_LOGON) > 0L;
    }

    long saveRequestSession(long sessionId, long correlationId, int lastReceivedSequenceNumber, int sequenceIndex) {
        this.checkState();
        return this.outboundPublication.saveRequestSession(this.libraryId, sessionId, correlationId, lastReceivedSequenceNumber, sequenceIndex);
    }

    long saveFollowerSessionRequest(long correlationId, MutableAsciiBuffer buffer, int offset, int length) {
        this.checkState();
        return this.outboundPublication.saveFollowerSessionRequest(this.libraryId, correlationId, (DirectBuffer)buffer, offset, length);
    }

    int poll(int fragmentLimit) {
        long timeInMs = this.timeInMs();
        if (null != this.remoteThrowable) {
            LangUtil.rethrowUnchecked((Throwable)this.remoteThrowable);
        }
        switch (this.state) {
            case 0: {
                return this.pollWithoutReconnect(timeInMs, fragmentLimit);
            }
            case 1: {
                this.startConnecting();
                return this.pollWithoutReconnect(timeInMs, fragmentLimit);
            }
            case 2: {
                this.nextConnectingStep(timeInMs);
                return this.pollWithoutReconnect(timeInMs, fragmentLimit);
            }
            case 3: {
                this.connectToNewEngine(timeInMs);
                this.state = 2;
                return this.pollWithoutReconnect(timeInMs, fragmentLimit);
            }
            case 5: {
                this.attemptEngineCloseBasedLogout();
                return this.pollWithoutReconnect(timeInMs, fragmentLimit);
            }
        }
        return 0;
    }

    private int pollWithoutReconnect(long timeInMs, int fragmentLimit) {
        int operations = 0;
        operations += this.inboundSubscription.controlledPoll(this.outboundSubscription, fragmentLimit);
        operations += this.livenessDetector.poll(timeInMs);
        operations += this.pollSessions(timeInMs);
        operations += this.pollPendingInitiatorSessions(timeInMs);
        return operations += this.checkReplies(timeInMs);
    }

    void postExceptionToLibraryThread(Throwable t) {
        this.remoteThrowable = t;
    }

    void startConnecting() {
        this.startConnecting(this.timeInMs());
    }

    private void startConnecting(long timeInMs) {
        try {
            this.state = 2;
            this.currentAeronChannel = this.configuration.libraryAeronChannels().get(0);
            DebugLogger.log(LogTag.LIBRARY_CONNECT, "%d: Attempting to connect to %s%n", (long)this.libraryId, (Object)this.currentAeronChannel);
            this.initStreams();
            this.newLivenessDetector();
            this.resetNextEngineTimer(timeInMs);
            this.sendLibraryConnect(timeInMs);
        }
        catch (Exception ex) {
            try {
                this.closeWithParent();
            }
            catch (Exception closeException) {
                ex.addSuppressed(closeException);
            }
            LangUtil.rethrowUnchecked((Throwable)ex);
        }
    }

    private void nextConnectingStep(long timeInMs) {
        if (timeInMs > this.nextEngineAttemptTime) {
            this.attemptNextEngine();
            this.connectToNewEngine(timeInMs);
        } else if (timeInMs > this.nextSendLibraryConnectTime) {
            this.sendLibraryConnect(timeInMs);
        }
    }

    private void connectToNewEngine(long timeInMs) {
        this.initStreams();
        this.newLivenessDetector();
        this.sendLibraryConnect(timeInMs);
        this.resetNextEngineTimer(timeInMs);
    }

    private void attemptNextEngine() {
        if (this.enginesAreClustered) {
            List<String> aeronChannels = this.configuration.libraryAeronChannels();
            int nextIndex = (aeronChannels.indexOf(this.currentAeronChannel) + 1) % aeronChannels.size();
            this.currentAeronChannel = aeronChannels.get(nextIndex);
            DebugLogger.log(LogTag.LIBRARY_CONNECT, "%d: Attempting connect to next engine (%s) in round-robin%n", (long)this.libraryId, (Object)this.currentAeronChannel);
        }
    }

    private void resetNextEngineTimer(long timeInMs) {
        this.nextEngineAttemptTime = this.configuration.replyTimeoutInMs() + timeInMs;
    }

    private void initStreams() {
        if (this.enginesAreClustered || this.isFirstConnect()) {
            this.transport.initStreams(this.currentAeronChannel);
            this.inboundSubscription = this.transport.inboundSubscription();
            this.outboundPublication = this.transport.outboundPublication();
        }
    }

    private void newLivenessDetector() {
        this.livenessDetector = LivenessDetector.forLibrary(this.outboundPublication, this.libraryId, this.configuration.replyTimeoutInMs(), this.onDisconnectFunc);
    }

    private boolean isFirstConnect() {
        return !this.transport.isReconnect();
    }

    private void sendLibraryConnect(long timeInMs) {
        try {
            long correlationId = ++this.currentCorrelationId;
            if (this.outboundPublication.saveLibraryConnect(this.libraryId, this.configuration.libraryName(), correlationId) < 0L) {
                this.connectToNextEngineNow(timeInMs);
            } else {
                this.connectCorrelationId = correlationId;
                this.nextSendLibraryConnectTime = this.configuration.connectAttemptTimeoutInMs() + timeInMs;
            }
        }
        catch (NotConnectedException e) {
            this.connectToNextEngineNow(timeInMs);
        }
    }

    private void connectToNextEngineNow(long timeInMs) {
        this.nextEngineAttemptTime = timeInMs;
    }

    private void onConnect() {
        DebugLogger.log(LogTag.LIBRARY_CONNECT, "%d: Connected to [%s]%n", (long)this.libraryId, (Object)this.currentAeronChannel);
        this.configuration.libraryConnectHandler().onConnect(this.fixLibrary);
        this.setLibraryConnected(true);
    }

    private void onDisconnect() {
        DebugLogger.log(LogTag.LIBRARY_CONNECT, "%d: Disconnected from [%s]%n", (long)this.libraryId, (Object)this.currentAeronChannel);
        this.configuration.libraryConnectHandler().onDisconnect(this.fixLibrary);
        this.setLibraryConnected(false);
        this.state = 1;
    }

    private void setLibraryConnected(boolean libraryConnected) {
        for (InternalSession session : this.sessions) {
            session.libraryConnected(libraryConnected);
        }
    }

    String currentAeronChannel() {
        return this.currentAeronChannel;
    }

    private int pollSessions(long timeInMs) {
        InternalSession[] sessions = this.sessions;
        int total = 0;
        for (InternalSession session : sessions) {
            total += session.poll(timeInMs);
        }
        return total;
    }

    private int pollPendingInitiatorSessions(long timeInMs) {
        Object[] pendingSessions = this.pendingInitiatorSessions;
        int total = 0;
        int i = 0;
        int size = pendingSessions.length;
        while (i < size) {
            InternalSession session = pendingSessions[i];
            total += session.poll(timeInMs);
            if (session.state() == SessionState.ACTIVE) {
                pendingSessions = (InternalSession[])ArrayUtil.remove((Object[])pendingSessions, (int)i);
                this.pendingInitiatorSessions = pendingSessions;
                --size;
                this.sessions = (InternalSession[])ArrayUtil.add((Object[])this.sessions, (Object)session);
                continue;
            }
            ++i;
        }
        return total;
    }

    private long timeInMs() {
        return this.epochClock.time();
    }

    private int checkReplies(long timeInMs) {
        int count = 0;
        Long2ObjectHashMap.ValueIterator iterator = this.correlationIdToReply.values().iterator();
        while (iterator.hasNext()) {
            LibraryReply reply = (LibraryReply)iterator.next();
            if (!reply.poll(timeInMs)) continue;
            iterator.remove();
            ++count;
        }
        Iterator<BooleanSupplier> tasksIt = this.tasks.iterator();
        while (tasksIt.hasNext()) {
            BooleanSupplier task = tasksIt.next();
            if (!task.getAsBoolean()) continue;
            tasksIt.remove();
        }
        return count;
    }

    long register(LibraryReply<?> reply) {
        long correlationId = ++this.currentCorrelationId;
        this.correlationIdToReply.put(correlationId, reply);
        return correlationId;
    }

    void deregister(long correlationId) {
        this.correlationIdToReply.remove(correlationId);
    }

    @Override
    public ControlledFragmentHandler.Action onManageSession(int libraryId, long connectionId, long sessionId, int lastSentSeqNum, int lastRecvSeqNum, long logonTime, SessionStatus sessionStatus, SlowStatus slowStatus, ConnectionType connectionType, SessionState sessionState, int heartbeatIntervalInS, boolean closedResendInterval, int resendRequestChunkSize, boolean sendRedundantResendRequests, boolean enableLastMsgSeqNumProcessed, long correlationId, int sequenceIndex, boolean awaitingResend, int lastResentMsgSeqNo, int lastResendChunkMsgSeqNum, int endOfResendRequestRange, boolean awaitingHeartbeat, String localCompId, String localSubId, String localLocationId, String remoteCompId, String remoteSubId, String remoteLocationId, String address, String username, String password, Class<? extends FixDictionary> fixDictionaryType) {
        if (this.state == 0) {
            FixDictionary fixDictionary = FixDictionary.of(fixDictionaryType);
            if (libraryId == 0) {
                this.sessionExistsHandler.onSessionExists(this.fixLibrary, sessionId, localCompId, localSubId, localLocationId, remoteCompId, remoteSubId, remoteLocationId);
            } else if (libraryId == this.libraryId) {
                this.onHandoverSession(libraryId, connectionId, sessionId, lastSentSeqNum, lastRecvSeqNum, logonTime, sessionStatus, slowStatus, connectionType, sessionState, heartbeatIntervalInS, closedResendInterval, resendRequestChunkSize, sendRedundantResendRequests, enableLastMsgSeqNumProcessed, correlationId, sequenceIndex, awaitingResend, lastResentMsgSeqNo, lastResendChunkMsgSeqNum, endOfResendRequestRange, awaitingHeartbeat, localCompId, localSubId, localLocationId, remoteCompId, remoteSubId, remoteLocationId, address, username, password, fixDictionary);
            }
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private void onHandoverSession(int libraryId, long connection, long sessionId, int lastSentSeqNum, int lastRecvSeqNum, long logonTime, SessionStatus sessionStatus, SlowStatus slowStatus, ConnectionType connectionType, SessionState sessionState, int heartbeatIntervalInS, boolean closedResendInterval, int resendRequestChunkSize, boolean sendRedundantResendRequests, boolean enableLastMsgSeqNumProcessed, long correlationId, int sequenceIndex, boolean awaitingResend, int lastResentMsgSeqNo, int lastResendChunkMsgSeqNum, int endOfResendRequestRange, boolean awaitingHeartbeat, String localCompId, String localSubId, String localLocationId, String remoteCompId, String remoteSubId, String remoteLocationId, String address, String username, String password, FixDictionary fixDictionary) {
        InitiateSessionReply reply = null;
        if (sessionStatus == SessionStatus.SESSION_HANDOVER) {
            InternalSession session;
            if (connectionType == ConnectionType.INITIATOR) {
                DebugLogger.log(LogTag.FIX_CONNECTION, "Init Connect: %d, %d%n", connection, (long)libraryId);
                LibraryReply task = (LibraryReply)this.correlationIdToReply.get(correlationId);
                boolean isReply = task instanceof InitiateSessionReply;
                if (isReply) {
                    reply = (InitiateSessionReply)task;
                    reply.onTcpConnected(connection);
                }
                session = this.newInitiatorSession(connection, lastSentSeqNum, lastRecvSeqNum, sessionState, isReply ? reply.configuration() : null, sequenceIndex, enableLastMsgSeqNumProcessed, fixDictionary);
            } else {
                DebugLogger.log(LogTag.FIX_CONNECTION, "Acct Connect: %d, %d%n", connection, (long)libraryId);
                session = this.acceptSession(connection, address, sessionState, heartbeatIntervalInS, sequenceIndex, enableLastMsgSeqNumProcessed, fixDictionary);
                session.lastSentMsgSeqNum(lastSentSeqNum);
                session.initialLastReceivedMsgSeqNum(lastRecvSeqNum);
            }
            CompositeKey compositeKey = this.sessionIdStrategy.onInitiateLogon(localCompId, localSubId, localLocationId, remoteCompId, remoteSubId, remoteLocationId);
            session.username(username);
            session.password(password);
            session.setupSession(sessionId, compositeKey);
            session.logonTime(logonTime);
            session.closedResendInterval(closedResendInterval);
            session.resendRequestChunkSize(resendRequestChunkSize);
            session.sendRedundantResendRequests(sendRedundantResendRequests);
            session.awaitingResend(awaitingResend);
            session.lastResentMsgSeqNo(lastResentMsgSeqNo);
            session.lastResendChunkMsgSeqNum(lastResendChunkMsgSeqNum);
            session.endOfResendRequestRange(endOfResendRequestRange);
            session.awaitingHeartbeat(awaitingHeartbeat);
            this.createSessionSubscriber(connection, session, reply, slowStatus, fixDictionary);
            this.insertSession(session, connectionType, sessionState);
            DebugLogger.log(LogTag.GATEWAY_MESSAGE, "onSessionExists: conn=%d, sess=%d, sentSeqNo=%d, recvSeqNo=%d%n", connection, sessionId, (long)lastSentSeqNum, (long)lastRecvSeqNum);
        } else {
            this.sessionExistsHandler.onSessionExists(this.fixLibrary, sessionId, localCompId, localSubId, localLocationId, remoteCompId, remoteSubId, remoteLocationId);
        }
    }

    private void insertSession(InternalSession session, ConnectionType connectionType, SessionState sessionState) {
        if (connectionType == ConnectionType.INITIATOR && sessionState != SessionState.ACTIVE) {
            this.pendingInitiatorSessions = (InternalSession[])ArrayUtil.add((Object[])this.pendingInitiatorSessions, (Object)session);
        } else {
            this.sessions = (InternalSession[])ArrayUtil.add((Object[])this.sessions, (Object)session);
        }
    }

    @Override
    public ControlledFragmentHandler.Action onMessage(DirectBuffer buffer, int offset, int length, int libraryId, long connectionId, long sessionId, int sequenceIndex, int messageType, long timestamp, MessageStatus status, int sequenceNumber, long position) {
        if (libraryId == this.libraryId) {
            DebugLogger.log(LogTag.FIX_MESSAGE, "(%d) Received %s %n", libraryId, buffer, offset, length);
            SessionSubscriber subscriber = (SessionSubscriber)this.connectionIdToSession.get(connectionId);
            if (subscriber != null) {
                return subscriber.onMessage(buffer, offset, length, libraryId, sessionId, sequenceIndex, messageType, timestamp, status, position);
            }
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onDisconnect(int libraryId, long connectionId, DisconnectReason reason) {
        SessionSubscriber subscriber;
        DebugLogger.log(LogTag.GATEWAY_MESSAGE, "%2$d: Library Disconnect %3$d, %1$s%n", reason, (long)libraryId, connectionId);
        if (libraryId == this.libraryId && (subscriber = (SessionSubscriber)this.connectionIdToSession.remove(connectionId)) != null) {
            ControlledFragmentHandler.Action action = subscriber.onDisconnect(libraryId, reason);
            if (action == ControlledFragmentHandler.Action.ABORT) {
                this.connectionIdToSession.put(connectionId, (Object)subscriber);
            } else {
                InternalSession session = subscriber.session();
                session.close();
                this.pendingInitiatorSessions = (InternalSession[])ArrayUtil.remove((Object[])this.pendingInitiatorSessions, (Object)session);
                this.sessions = (InternalSession[])ArrayUtil.remove((Object[])this.sessions, (Object)session);
            }
            return action;
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onError(int libraryId, GatewayError errorType, long replyToId, String message) {
        LibraryReply reply;
        if (libraryId == this.libraryId && (reply = (LibraryReply)this.correlationIdToReply.remove(replyToId)) != null) {
            reply.onError(errorType, message);
        }
        return this.configuration.gatewayErrorHandler().onError(errorType, libraryId, message);
    }

    @Override
    public ControlledFragmentHandler.Action onApplicationHeartbeat(int libraryId) {
        if (libraryId == this.libraryId) {
            long timeInMs = this.timeInMs();
            DebugLogger.log(LogTag.APPLICATION_HEARTBEAT, "%d: Received Heartbeat from engine at timeInMs %d%n", (long)libraryId, timeInMs);
            this.livenessDetector.onHeartbeat(timeInMs);
            if (!this.isConnected() && this.livenessDetector.isConnected()) {
                this.state = 0;
                this.onConnect();
            }
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onReleaseSessionReply(int libraryId, long replyToId, SessionReplyStatus status) {
        ReleaseToGatewayReply reply = (ReleaseToGatewayReply)this.correlationIdToReply.remove(replyToId);
        if (reply != null) {
            reply.onComplete(status);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onRequestSessionReply(int libraryId, long replyToId, SessionReplyStatus status) {
        RequestSessionReply reply = (RequestSessionReply)this.correlationIdToReply.remove(replyToId);
        if (reply != null) {
            reply.onComplete(status);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onFollowerSessionReply(int libraryId, long replyToId, long sessionId) {
        FollowerSessionReply reply = (FollowerSessionReply)this.correlationIdToReply.remove(replyToId);
        if (reply != null) {
            reply.onComplete(this.followerSession(sessionId, -1L, 0));
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onEngineClose(int libraryId) {
        if (libraryId == this.libraryId) {
            this.state = 5;
            this.attemptEngineCloseBasedLogout();
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private void attemptEngineCloseBasedLogout() {
        InternalSession[] sessions = this.sessions;
        int length = sessions.length;
        while (this.sessionLogoutIndex < length) {
            long position = sessions[this.sessionLogoutIndex].logoutAndDisconnect();
            if (position < 0L) {
                return;
            }
            ++this.sessionLogoutIndex;
        }
        this.state = 0;
    }

    private void validateEndOfDay() {
        if (this.isAtEndOfDay()) {
            throw new IllegalStateException("Cannot perform operation whilst end of day process is running");
        }
    }

    @Override
    public ControlledFragmentHandler.Action onNewSentPosition(int libraryId, long position) {
        if (this.libraryId == libraryId) {
            return this.sentPositionHandler.onSendCompleted(position);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onControlNotification(int libraryId, ControlNotificationDecoder.SessionsDecoder sessionsDecoder) {
        if (libraryId == this.libraryId) {
            long timeInMs = this.timeInMs();
            this.livenessDetector.onHeartbeat(timeInMs);
            this.state = 0;
            DebugLogger.log(LogTag.LIBRARY_CONNECT, "%d: Received Control Notification from engine at timeInMs %d%n", (long)libraryId, timeInMs);
            LongHashSet sessionIds = this.sessionIds;
            Object[] sessions = this.sessions;
            sessionIds.clear();
            while (sessionsDecoder.hasNext()) {
                sessionsDecoder.next();
                sessionIds.add(sessionsDecoder.sessionId());
            }
            int size = sessions.length;
            for (int i = 0; i < size; ++i) {
                InternalSession session = sessions[i];
                long sessionId = session.id();
                if (!sessionIds.remove(sessionId)) {
                    SessionSubscriber subscriber = (SessionSubscriber)this.connectionIdToSession.remove(session.connectionId());
                    if (subscriber != null) {
                        subscriber.onTimeout(libraryId);
                    }
                    session.close();
                    sessions = (InternalSession[])ArrayUtil.remove((Object[])sessions, (int)i);
                    --size;
                    continue;
                }
                ++i;
            }
            this.sessions = sessions;
            if (!sessionIds.isEmpty()) {
                String msg = String.format("The gateway thinks that we own the following session ids: %s", sessionIds);
                this.configuration.gatewayErrorHandler().onError(GatewayError.UNKNOWN_SESSION, libraryId, msg);
            }
            return ControlledFragmentHandler.Action.BREAK;
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onSlowStatusNotification(int libraryId, long connectionId, boolean hasBecomeSlow) {
        SessionSubscriber subscriber;
        if (libraryId == this.libraryId && (subscriber = (SessionSubscriber)this.connectionIdToSession.get(connectionId)) != null) {
            subscriber.onSlowStatusNotification(libraryId, hasBecomeSlow);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    @Override
    public ControlledFragmentHandler.Action onResetLibrarySequenceNumber(int libraryId, long sessionId) {
        if (libraryId == this.libraryId) {
            for (SessionSubscriber subscriber : this.connectionIdToSession.values()) {
                InternalSession session = subscriber.session();
                if (session.id() != sessionId) continue;
                return Pressure.apply(session.resetSequenceNumbers());
            }
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private void createSessionSubscriber(long connectionId, InternalSession session, InitiateSessionReply reply, SlowStatus slowStatus, FixDictionary fixDictionary) {
        MessageValidationStrategy validationStrategy = this.configuration.messageValidationStrategy();
        SessionParser parser = new SessionParser(session, validationStrategy, null, fixDictionary);
        SessionSubscriber subscriber = new SessionSubscriber(parser, session, this.receiveTimer, this.sessionTimer);
        subscriber.reply(reply);
        subscriber.handler(this.configuration.sessionAcquireHandler().onSessionAcquired(session, SlowStatus.SLOW == slowStatus));
        this.connectionIdToSession.put(connectionId, (Object)subscriber);
    }

    private InitiatorSession newInitiatorSession(long connectionId, int lastSentSequenceNumber, int lastReceivedSequenceNumber, SessionState state, SessionConfiguration sessionConfiguration, int sequenceIndex, boolean enableLastMsgSeqNumProcessed, FixDictionary fixDictionary) {
        int defaultInterval = this.configuration.defaultHeartbeatIntervalInS();
        GatewayPublication publication = this.transport.outboundPublication();
        MutableAsciiBuffer asciiBuffer = this.sessionBuffer();
        SessionProxy sessionProxy = this.sessionProxy(connectionId, fixDictionary);
        int initialReceivedSequenceNumber = this.initiatorNewSequenceNumber(sessionConfiguration, SessionConfiguration::initialReceivedSequenceNumber, lastReceivedSequenceNumber);
        int initialSentSequenceNumber = this.initiatorNewSequenceNumber(sessionConfiguration, SessionConfiguration::initialSentSequenceNumber, lastSentSequenceNumber);
        InitiatorSession session = new InitiatorSession(defaultInterval, connectionId, this.epochClock, sessionProxy, publication, this.sessionIdStrategy, this.configuration.sendingTimeWindowInMs(), this.fixCounters.receivedMsgSeqNo(connectionId), this.fixCounters.sentMsgSeqNo(connectionId), this.libraryId, initialSentSequenceNumber, sequenceIndex, state, sessionConfiguration != null && sessionConfiguration.resetSeqNum(), this.configuration.reasonableTransmissionTimeInMs(), asciiBuffer, enableLastMsgSeqNumProcessed, fixDictionary.beginString());
        session.initialLastReceivedMsgSeqNum(initialReceivedSequenceNumber - 1);
        return session;
    }

    private MutableAsciiBuffer sessionBuffer() {
        return new MutableAsciiBuffer(new byte[this.configuration.sessionBufferSize()]);
    }

    private int initiatorNewSequenceNumber(SessionConfiguration sessionConfiguration, ToIntFunction<SessionConfiguration> initialSequenceNumberGetter, int lastSequenceNumber) {
        int newSequenceNumber = lastSequenceNumber + 1;
        if (sessionConfiguration == null) {
            return newSequenceNumber;
        }
        int initialSequenceNumber = initialSequenceNumberGetter.applyAsInt(sessionConfiguration);
        if (initialSequenceNumber != -1) {
            return initialSequenceNumber;
        }
        if (sessionConfiguration.sequenceNumbersPersistent() && lastSequenceNumber != -1) {
            return newSequenceNumber;
        }
        return 1;
    }

    private InternalSession acceptSession(long connectionId, String address, SessionState state, int heartbeatIntervalInS, int sequenceIndex, boolean enableLastMsgSeqNumProcessed, FixDictionary fixDictionary) {
        GatewayPublication publication = this.transport.outboundPublication();
        long sendingTimeWindow = this.configuration.sendingTimeWindowInMs();
        AtomicCounter receivedMsgSeqNo = this.fixCounters.receivedMsgSeqNo(connectionId);
        AtomicCounter sentMsgSeqNo = this.fixCounters.sentMsgSeqNo(connectionId);
        MutableAsciiBuffer asciiBuffer = this.sessionBuffer();
        int split = address.lastIndexOf(58);
        int start = address.startsWith("/") ? 1 : 0;
        String host = address.substring(start, split);
        int port = Integer.parseInt(address.substring(split + 1));
        AcceptorSession session = new AcceptorSession(heartbeatIntervalInS, connectionId, this.epochClock, this.sessionProxy(connectionId, fixDictionary), publication, this.sessionIdStrategy, sendingTimeWindow, receivedMsgSeqNo, sentMsgSeqNo, this.libraryId, 1, sequenceIndex, state, this.configuration.reasonableTransmissionTimeInMs(), asciiBuffer, enableLastMsgSeqNumProcessed, fixDictionary.beginString());
        session.address(host, port);
        return session;
    }

    private SessionProxy sessionProxy(long connectionId, FixDictionary fixDictionary) {
        return this.configuration.sessionProxyFactory().make(this.configuration.sessionBufferSize(), this.transport.outboundPublication(), this.sessionIdStrategy, this.configuration.sessionCustomisationStrategy(), (EpochClock)new SystemEpochClock(), connectionId, this.libraryId, fixDictionary, LangUtil::rethrowUnchecked);
    }

    private void checkState() {
        if (this.state != 0) {
            throw new IllegalStateException("Library has been closed or is performing end of day operation");
        }
    }

    private void closeWithParent() {
        try {
            this.fixLibrary.internalClose();
        }
        finally {
            this.close();
        }
    }

    @Override
    public void close() {
        if (this.state != 4 && this.configuration.gracefulShutdown()) {
            this.connectionIdToSession.values().forEach(subscriber -> subscriber.session().disable());
            this.state = 4;
        }
    }
}

