/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.framer;

import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.EpochClock;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.builder.AbstractSequenceResetEncoder;
import uk.co.real_logic.artio.builder.Encoder;
import uk.co.real_logic.artio.builder.SessionHeaderEncoder;
import uk.co.real_logic.artio.decoder.SessionHeaderDecoder;
import uk.co.real_logic.artio.engine.PossDupEnabler;
import uk.co.real_logic.artio.engine.framer.Continuation;
import uk.co.real_logic.artio.engine.framer.GatewaySession;
import uk.co.real_logic.artio.engine.framer.MessageTypeExtractor;
import uk.co.real_logic.artio.engine.logger.ReplayOperation;
import uk.co.real_logic.artio.engine.logger.ReplayQuery;
import uk.co.real_logic.artio.engine.logger.SequenceNumberIndexReader;
import uk.co.real_logic.artio.fields.UtcTimestampEncoder;
import uk.co.real_logic.artio.messages.FixMessageDecoder;
import uk.co.real_logic.artio.messages.FixMessageEncoder;
import uk.co.real_logic.artio.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.messages.MessageStatus;
import uk.co.real_logic.artio.messages.SessionReplyStatus;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.util.AsciiBuffer;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;

public class CatchupReplayer
implements ControlledFragmentHandler,
Continuation {
    private static final int ENCODE_BUFFER_SIZE = 8192;
    public static final int FRAME_LENGTH = 61 + FixMessageEncoder.bodyHeaderLength();
    private static final int OUT_OF_RANGE = -1;
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final FixMessageDecoder messageDecoder = new FixMessageDecoder();
    private final FixMessageEncoder messageEncoder = new FixMessageEncoder();
    private final AsciiBuffer asciiBuffer = new MutableAsciiBuffer();
    private final BufferClaim bufferClaim = new BufferClaim();
    private final PossDupEnabler possDupEnabler;
    private final SequenceNumberIndexReader receivedSequenceNumberIndex;
    private final ReplayQuery inboundMessages;
    private final GatewayPublication inboundPublication;
    private final ErrorHandler errorHandler;
    private final long correlationId;
    private final long connectionId;
    private final int libraryId;
    private final int lastReceivedSeqNum;
    private final int currentSequenceIndex;
    private final GatewaySession session;
    private final long catchupEndTimeInMs;
    private final long requiredPosition;
    private final SessionHeaderDecoder headerDecoder;
    private int replayFromSequenceNumber;
    private int replayFromSequenceIndex;
    private State state = State.AWAITING_INDEX;
    private String missingMessagesReason;
    private AbstractSequenceResetEncoder sequenceResetEncoder;
    private UtcTimestampEncoder timestampEncoder;
    private MutableAsciiBuffer encodeBuffer;
    private int heartbeatRangeSequenceNumberStart = -1;
    private ReplayOperation replayOperation = null;

    CatchupReplayer(SequenceNumberIndexReader receivedSequenceNumberIndex, ReplayQuery inboundMessages, GatewayPublication inboundPublication, ErrorHandler errorHandler, long correlationId, long connectionId, int libraryId, int lastReceivedSeqNum, int currentSequenceIndex, int replayFromSequenceNumber, int replayFromSequenceIndex, GatewaySession session, long catchupTimeout, EpochClock clock) {
        this.receivedSequenceNumberIndex = receivedSequenceNumberIndex;
        this.inboundMessages = inboundMessages;
        this.inboundPublication = inboundPublication;
        this.errorHandler = errorHandler;
        this.correlationId = correlationId;
        this.connectionId = connectionId;
        this.libraryId = libraryId;
        this.lastReceivedSeqNum = lastReceivedSeqNum;
        this.currentSequenceIndex = currentSequenceIndex;
        this.replayFromSequenceNumber = replayFromSequenceNumber;
        this.replayFromSequenceIndex = replayFromSequenceIndex;
        this.session = session;
        this.catchupEndTimeInMs = clock.time() + catchupTimeout;
        this.requiredPosition = inboundPublication.position();
        this.headerDecoder = session.fixDictionary().makeHeaderDecoder();
        this.possDupEnabler = new PossDupEnabler(this.bufferClaim, this::claimBuffer, this::onPreCommit, this::onIllegalState, errorHandler, clock, inboundPublication.maxPayloadLength(), LogTag.CATCHUP);
    }

    private void onPreCommit(MutableDirectBuffer buffer, int offset) {
        int frameOffset = offset + 8;
        this.messageEncoder.wrap(buffer, frameOffset).connection(this.connectionId).libraryId(this.libraryId).status(MessageStatus.CATCHUP_REPLAY);
    }

    private void onIllegalState(String msg) {
        this.errorHandler.onError((Throwable)new IllegalStateException(msg));
    }

    private boolean claimBuffer(int length) {
        return this.inboundPublication.claim(length, this.bufferClaim) > 0L;
    }

    public ControlledFragmentHandler.Action onFragment(DirectBuffer srcBuffer, int srcOffset, int srcLength, Header header) {
        int messageLength = srcLength - FRAME_LENGTH;
        int messageOffset = srcOffset + FRAME_LENGTH;
        this.messageHeaderDecoder.wrap(srcBuffer, srcOffset);
        this.messageDecoder.wrap(srcBuffer, srcOffset + 8, this.messageHeaderDecoder.blockLength(), this.messageHeaderDecoder.version());
        this.asciiBuffer.wrap(srcBuffer, messageOffset, messageLength);
        this.headerDecoder.decode(this.asciiBuffer, 0, messageLength);
        long messageType = MessageTypeExtractor.getMessageType(this.messageDecoder);
        if (messageType == 48L) {
            if (this.heartbeatRangeSequenceNumberStart == -1) {
                this.heartbeatRangeSequenceNumberStart = this.headerDecoder.msgSeqNum();
            }
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        if (this.heartbeatRangeSequenceNumberStart != -1 && !this.sendGapFill()) {
            return ControlledFragmentHandler.Action.ABORT;
        }
        return this.processNormalMessage(srcBuffer, srcOffset, srcLength, messageLength, messageOffset);
    }

    private boolean sendGapFill() {
        boolean sent;
        if (this.sequenceResetEncoder == null) {
            this.sequenceResetEncoder = this.session.fixDictionary().makeSequenceResetEncoder();
            this.timestampEncoder = new UtcTimestampEncoder();
            this.encodeBuffer = new MutableAsciiBuffer(new byte[8192]);
            this.sequenceResetEncoder.gapFillFlag(true);
            SessionHeaderEncoder header = this.sequenceResetEncoder.header().possDupFlag(true);
            header.senderCompID(this.headerDecoder.senderCompID());
            if (this.headerDecoder.hasSenderSubID()) {
                header.senderSubID(this.headerDecoder.senderSubID());
            }
            if (this.headerDecoder.hasSenderLocationID()) {
                header.senderLocationID(this.headerDecoder.senderLocationID());
            }
            header.targetCompID(this.headerDecoder.targetCompID());
            if (this.headerDecoder.hasTargetSubID()) {
                header.targetSubID(this.headerDecoder.targetSubID());
            }
            if (this.headerDecoder.hasTargetLocationID()) {
                header.targetLocationID(this.headerDecoder.targetLocationID());
            }
        }
        int heartbeatRangeSequenceNumberEnd = this.headerDecoder.msgSeqNum();
        this.sequenceResetEncoder.header().msgSeqNum(this.heartbeatRangeSequenceNumberStart);
        this.sequenceResetEncoder.newSeqNo(heartbeatRangeSequenceNumberEnd);
        this.sequenceResetEncoder.header().sendingTime(this.timestampEncoder.buffer(), this.timestampEncoder.encode(System.currentTimeMillis()));
        long result = this.sequenceResetEncoder.encode(this.encodeBuffer, 0);
        int encodedLength = Encoder.length((long)result);
        int encodedOffset = Encoder.offset((long)result);
        boolean bl = sent = this.inboundPublication.saveMessage((DirectBuffer)this.encodeBuffer, encodedOffset, encodedLength, this.libraryId, 52L, this.messageDecoder.session(), this.replayFromSequenceIndex, this.libraryId, MessageStatus.CATCHUP_REPLAY, heartbeatRangeSequenceNumberEnd) > 0L;
        if (sent) {
            this.heartbeatRangeSequenceNumberStart = -1;
        }
        return sent;
    }

    private ControlledFragmentHandler.Action processNormalMessage(DirectBuffer srcBuffer, int srcOffset, int srcLength, int messageLength, int messageOffset) {
        ControlledFragmentHandler.Action action = this.possDupEnabler.enablePossDupFlag(srcBuffer, messageOffset, messageLength, srcOffset, srcLength);
        if (action == ControlledFragmentHandler.Action.CONTINUE) {
            this.replayFromSequenceNumber = this.headerDecoder.msgSeqNum() + 1;
            this.replayFromSequenceIndex = this.messageDecoder.sequenceIndex();
        }
        return action;
    }

    @Override
    public long attempt() {
        DebugLogger.log(LogTag.CATCHUP, "Attempt replay for sessionId=%d%n", this.session.sessionId());
        switch (this.state) {
            case AWAITING_INDEX: {
                long indexedPosition = this.receivedSequenceNumberIndex.indexedPosition(this.inboundPublication.id());
                if (indexedPosition >= this.requiredPosition) {
                    this.state = State.REPLAY_QUERY;
                } else {
                    DebugLogger.log(LogTag.CATCHUP, "Awaiting index position: indexed=%d vs required=%d%n", indexedPosition, this.requiredPosition);
                }
                return -2L;
            }
            case REPLAY_QUERY: {
                if (this.notLoggingInboundMessages()) {
                    return this.switchToMissingMessages("Not logging inbound messages");
                }
                DebugLogger.log(LogTag.CATCHUP, "Querying for sessionId=%d, currently at (%d, %d)%n", this.session.sessionId(), (long)this.lastReceivedSeqNum, (long)this.currentSequenceIndex);
                this.replayOperation = this.inboundMessages.query(this, this.session.sessionId(), this.replayFromSequenceNumber, this.replayFromSequenceIndex, this.lastReceivedSeqNum, this.currentSequenceIndex, LogTag.CATCHUP);
                this.state = State.REPLAYING;
                return -2L;
            }
            case REPLAYING: {
                if (System.currentTimeMillis() > this.catchupEndTimeInMs) {
                    return this.switchToMissingMessages("Catchup operation timed out");
                }
                if (this.replayOperation.attemptReplay()) {
                    if (this.hasMissingMessages()) {
                        return this.switchToMissingMessages("Is missing messages from replay index query");
                    }
                    this.state = State.SEND_OK;
                    return this.sendOk(this.inboundPublication, this.correlationId, this.session);
                }
                return -2L;
            }
            case SEND_MISSING: {
                return this.sendMissingMessages();
            }
            case SEND_OK: {
                return this.sendOk(this.inboundPublication, this.correlationId, this.session);
            }
        }
        return 1L;
    }

    private long switchToMissingMessages(String reason) {
        this.state = State.SEND_MISSING;
        this.missingMessagesReason = reason;
        return this.sendMissingMessages();
    }

    private boolean hasMissingMessages() {
        return this.replayFromSequenceIndex < this.currentSequenceIndex || this.replayFromSequenceNumber < this.lastReceivedSeqNum;
    }

    private boolean notLoggingInboundMessages() {
        return this.inboundMessages == null;
    }

    private long sendOk(GatewayPublication publication, long correlationId, GatewaySession session) {
        return CatchupReplayer.sendOk(publication, correlationId, session, this.libraryId);
    }

    static long sendOk(GatewayPublication publication, long correlationId, GatewaySession session, int libraryId) {
        DebugLogger.log(LogTag.CATCHUP, "OK for sessionId=%d%n", session.sessionId());
        long position = publication.saveRequestSessionReply(libraryId, SessionReplyStatus.OK, correlationId);
        if (position >= 0L) {
            session.play();
        }
        return position;
    }

    private long sendMissingMessages() {
        DebugLogger.log(LogTag.CATCHUP, "Missing Messages for sessionId=%d%n", this.session.sessionId());
        long position = this.inboundPublication.saveRequestSessionReply(this.libraryId, SessionReplyStatus.MISSING_MESSAGES, this.correlationId);
        if (position > 0L) {
            this.errorHandler.onError((Throwable)new IllegalStateException(String.format("Failed to read correct number of messages for sessionId=%d, finished at [%d, %d] instead of [%d, %d] - %s", this.session.sessionId(), this.replayFromSequenceIndex, this.replayFromSequenceNumber, this.currentSequenceIndex, this.lastReceivedSeqNum, this.missingMessagesReason)));
            this.missingMessagesReason = null;
            this.session.play();
        }
        return position;
    }

    @Override
    public void close() {
        if (this.replayOperation != null) {
            this.replayOperation.close();
        }
    }

    private static enum State {
        AWAITING_INDEX,
        REPLAY_QUERY,
        REPLAYING,
        SEND_MISSING,
        SEND_OK;

    }
}

