/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.framer;

import io.aeron.logbuffer.ControlledFragmentHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.concurrent.status.AtomicCounter;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.engine.ByteBufferUtil;
import uk.co.real_logic.artio.engine.MessageTimingHandler;
import uk.co.real_logic.artio.engine.SenderSequenceNumber;
import uk.co.real_logic.artio.engine.framer.BlockablePosition;
import uk.co.real_logic.artio.engine.framer.Framer;
import uk.co.real_logic.artio.engine.framer.TcpChannel;
import uk.co.real_logic.artio.engine.logger.ArchiveDescriptor;
import uk.co.real_logic.artio.messages.DisconnectReason;
import uk.co.real_logic.artio.protocol.GatewayPublication;

class FixSenderEndPoint {
    private static final int HEADER_LENGTH = 8;
    private static final int REPLAY_MESSAGE = -1;
    private final long connectionId;
    private final TcpChannel channel;
    private final AtomicCounter bytesInBuffer;
    private final AtomicCounter invalidLibraryAttempts;
    private final ErrorHandler errorHandler;
    private final Framer framer;
    private final int maxBytesInBuffer;
    private final long slowConsumerTimeoutInMs;
    private final StreamTracker outboundTracker;
    private final StreamTracker replayTracker;
    private final SenderSequenceNumber senderSequenceNumber;
    private final MessageTimingHandler messageTimingHandler;
    private int libraryId;
    private long sessionId;
    private long sendingTimeoutTimeInMs;
    private boolean replayPaused;

    FixSenderEndPoint(long connectionId, int libraryId, BlockablePosition outboundBlockablePosition, BlockablePosition replayBlockablePosition, TcpChannel channel, AtomicCounter bytesInBuffer, AtomicCounter invalidLibraryAttempts, ErrorHandler errorHandler, Framer framer, int maxBytesInBuffer, long slowConsumerTimeoutInMs, long timeInMs, SenderSequenceNumber senderSequenceNumber, MessageTimingHandler messageTimingHandler) {
        this.connectionId = connectionId;
        this.libraryId = libraryId;
        this.channel = channel;
        this.bytesInBuffer = bytesInBuffer;
        this.invalidLibraryAttempts = invalidLibraryAttempts;
        this.errorHandler = errorHandler;
        this.framer = framer;
        this.maxBytesInBuffer = maxBytesInBuffer;
        this.slowConsumerTimeoutInMs = slowConsumerTimeoutInMs;
        this.senderSequenceNumber = senderSequenceNumber;
        this.outboundTracker = new StreamTracker(outboundBlockablePosition);
        this.replayTracker = new StreamTracker(replayBlockablePosition);
        this.messageTimingHandler = messageTimingHandler;
        this.sendingTimeoutTimeInMs = timeInMs + slowConsumerTimeoutInMs;
    }

    void onOutboundMessage(int libraryId, DirectBuffer directBuffer, int offset, int bodyLength, int sequenceNumber, long position, long timeInMs) {
        if (this.isWrongLibraryId(libraryId)) {
            this.invalidLibraryAttempts.increment();
            return;
        }
        if (this.replayPaused) {
            this.dropFurtherBehind(bodyLength);
            return;
        }
        if (this.attemptFramedMessage(directBuffer, offset, bodyLength, timeInMs, position, this.outboundTracker) && this.messageTimingHandler != null) {
            this.messageTimingHandler.onMessage(sequenceNumber, this.connectionId);
        }
        this.senderSequenceNumber.onNewMessage(sequenceNumber);
    }

    ControlledFragmentHandler.Action onReplayMessage(DirectBuffer directBuffer, int offset, int bodyLength, long timeInMs, long position) {
        if (!this.isSlowConsumer()) {
            this.replayPaused = true;
        }
        this.attemptFramedMessage(directBuffer, offset, bodyLength, timeInMs, position, this.replayTracker);
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    ControlledFragmentHandler.Action onSlowReplayMessage(DirectBuffer buffer, int offset, int bodyLength, long timeInMs, long position, int metaDataLength) {
        if (!this.outboundTracker.partiallySentMessage) {
            this.replayPaused = true;
        }
        int totalFrameSize = GatewayPublication.FRAME_SIZE + metaDataLength;
        int offsetAfterHeader = offset - totalFrameSize;
        int length = bodyLength + totalFrameSize;
        return this.attemptSlowMessage(buffer, offsetAfterHeader, length, position, bodyLength, timeInMs, this.replayTracker, metaDataLength, -1);
    }

    private boolean attemptFramedMessage(DirectBuffer directBuffer, int offset, int bodyLength, long timeInMs, long position, StreamTracker tracker) {
        if (this.isSlowConsumer()) {
            this.dropFurtherBehind(bodyLength);
            return false;
        }
        try {
            int written = this.writeFramedMessage(directBuffer, offset, bodyLength, timeInMs);
            if (written == bodyLength) {
                tracker.sentPosition = position;
                return true;
            }
            this.becomeSlowConsumer(written, bodyLength, position, tracker);
        }
        catch (IOException ex) {
            this.onError(ex);
        }
        return false;
    }

    private void dropFurtherBehind(int bodyLength) {
        long bytesInBuffer = this.bytesInBufferWeak() + (long)bodyLength;
        if (bytesInBuffer > (long)this.maxBytesInBuffer) {
            this.removeEndpoint(DisconnectReason.SLOW_CONSUMER);
        }
        this.bytesInBuffer.setOrdered(bytesInBuffer);
    }

    private int writeFramedMessage(DirectBuffer directBuffer, int offset, int length, long timeInMs) throws IOException {
        ByteBuffer buffer = directBuffer.byteBuffer();
        int startLimit = buffer.limit();
        int startPosition = buffer.position();
        ByteBufferUtil.limit(buffer, offset + length);
        ByteBufferUtil.position(buffer, offset);
        int written = this.channel.write(buffer);
        if (written > 0) {
            ByteBufferUtil.position(buffer, offset);
            DebugLogger.log(LogTag.FIX_MESSAGE_TCP, "Written  ", buffer, written);
            this.updateSendingTimeoutTimeInMs(timeInMs, written);
            buffer.limit(startLimit).position(startPosition);
        }
        return written;
    }

    private void updateSendingTimeoutTimeInMs(long timeInMs, int written) {
        if (written > 0) {
            this.sendingTimeoutTimeInMs = timeInMs + this.slowConsumerTimeoutInMs;
        }
    }

    private void onError(Exception ex) {
        this.errorHandler.onError((Throwable)new Exception(String.format("Exception reported for sessionId=%d,connectionId=%d", this.sessionId, this.connectionId), ex));
        this.removeEndpoint(DisconnectReason.EXCEPTION);
    }

    private void becomeSlowConsumer(int written, int bodyLength, long position, StreamTracker tracker) {
        int remainingBytes = bodyLength - written;
        this.bytesInBuffer.setOrdered((long)remainingBytes);
        this.sendSlowStatus(true);
        tracker.sentPosition = position - (long)remainingBytes;
        tracker.partiallySentMessage = true;
    }

    private void becomeNormalConsumer() {
        this.sendSlowStatus(false);
    }

    private void sendSlowStatus(boolean hasBecomeSlow) {
        this.framer.slowStatus(this.libraryId, this.connectionId, hasBecomeSlow);
    }

    private void removeEndpoint(DisconnectReason reason) {
        this.framer.onDisconnect(this.libraryId, this.connectionId, reason);
    }

    public long connectionId() {
        return this.connectionId;
    }

    public void libraryId(int libraryId, BlockablePosition blockablePosition) {
        this.libraryId = libraryId;
        this.outboundTracker.blockablePosition = blockablePosition;
    }

    public int libraryId() {
        return this.libraryId;
    }

    public void close() {
        this.senderSequenceNumber.close();
        this.bytesInBuffer.close();
        this.invalidLibraryAttempts.close();
    }

    ControlledFragmentHandler.Action onSlowOutboundMessage(DirectBuffer directBuffer, int offsetAfterHeader, int length, long position, int bodyLength, int libraryId, long timeInMs, int metaDataLength, int sequenceNumber) {
        if (this.isWrongLibraryId(libraryId)) {
            this.invalidLibraryAttempts.increment();
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        if (this.replayPaused) {
            return this.blockPosition(position, length, this.outboundTracker);
        }
        return this.attemptSlowMessage(directBuffer, offsetAfterHeader, length, position, bodyLength, timeInMs, this.outboundTracker, metaDataLength, sequenceNumber);
    }

    private ControlledFragmentHandler.Action attemptSlowMessage(DirectBuffer directBuffer, int offsetAfterHeader, int length, long position, int bodyLength, long timeInMs, StreamTracker tracker, int metaDataLength, int sequenceNumber) {
        if (!this.isSlowConsumer()) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        long skipPosition = tracker.skipPosition;
        if (position > skipPosition) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        long sentPosition = tracker.sentPosition;
        if (position <= sentPosition) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        if (this.partiallySentOtherStream(tracker)) {
            return this.blockPosition(position, length, tracker);
        }
        try {
            int bytesPreviouslySent;
            int remainingLength;
            long startOfMessage = position - (long)length;
            if (sentPosition < startOfMessage) {
                remainingLength = bodyLength;
                bytesPreviouslySent = 0;
            } else {
                remainingLength = (int)(position - sentPosition);
                bytesPreviouslySent = bodyLength - remainingLength;
            }
            int dataOffset = offsetAfterHeader + GatewayPublication.FRAME_SIZE + metaDataLength + bytesPreviouslySent;
            ByteBuffer buffer = directBuffer.byteBuffer();
            ByteBufferUtil.limit(buffer, dataOffset + remainingLength);
            ByteBufferUtil.position(buffer, dataOffset);
            int written = this.channel.write(buffer);
            this.bytesInBuffer.getAndAddOrdered((long)(-written));
            this.updateSendingTimeoutTimeInMs(timeInMs, written);
            if (bodyLength > written + bytesPreviouslySent) {
                tracker.sentPosition = position - (long)remainingLength + (long)written;
                return this.blockPosition(position, length, tracker);
            }
            tracker.sentPosition = position;
            tracker.partiallySentMessage = false;
            tracker.skipPosition = Long.MAX_VALUE;
            if (sequenceNumber != -1 && this.messageTimingHandler != null) {
                this.messageTimingHandler.onMessage(sequenceNumber, this.connectionId);
            }
            if (!this.isSlowConsumer()) {
                this.becomeNormalConsumer();
            }
        }
        catch (IOException ex) {
            this.onError(ex);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private ControlledFragmentHandler.Action blockPosition(long messagePosition, int messageLength, StreamTracker tracker) {
        int frameLength = 32 + messageLength + 8;
        int alignedLength = ArchiveDescriptor.alignTerm(frameLength);
        long messageStartPosition = messagePosition - (long)alignedLength;
        tracker.blockablePosition.blockPosition(messageStartPosition);
        tracker.skipPosition = messagePosition;
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private boolean partiallySentOtherStream(StreamTracker tracker) {
        return tracker == this.outboundTracker ? this.replayTracker.partiallySentMessage : this.outboundTracker.partiallySentMessage;
    }

    private boolean isWrongLibraryId(int libraryId) {
        return libraryId != this.libraryId;
    }

    boolean isSlowConsumer() {
        return this.bytesInBufferWeak() > 0L;
    }

    long bytesInBuffer() {
        return this.bytesInBuffer.get();
    }

    private long bytesInBufferWeak() {
        return this.bytesInBuffer.getWeak();
    }

    void sessionId(long sessionId) {
        this.sessionId = sessionId;
    }

    long sessionId() {
        return this.sessionId;
    }

    boolean checkTimeouts(long timeInMs) {
        if (this.isSlowConsumer() && timeInMs > this.sendingTimeoutTimeInMs) {
            this.errorHandler.onError((Throwable)new IllegalStateException(String.format("Slow Consumer Disconnected conn=%d,sess=%d @ time %d, Due to not being able to write since %d", this.connectionId, this.sessionId, timeInMs, this.sendingTimeoutTimeInMs - this.slowConsumerTimeoutInMs)));
            this.removeEndpoint(DisconnectReason.SLOW_CONSUMER);
            return true;
        }
        return false;
    }

    ControlledFragmentHandler.Action onReplayComplete() {
        if (!this.replayTracker.partiallySentMessage) {
            this.replayPaused = false;
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    boolean replayPaused() {
        return this.replayPaused;
    }

    static class StreamTracker {
        private long sentPosition;
        private long skipPosition = Long.MAX_VALUE;
        private boolean partiallySentMessage = false;
        private BlockablePosition blockablePosition;

        StreamTracker(BlockablePosition blockablePosition) {
            this.blockablePosition = blockablePosition;
        }
    }
}

