/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.logger;

import io.aeron.FragmentAssembler;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.Comparator;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import uk.co.real_logic.artio.ArtioLogHeader;
import uk.co.real_logic.artio.engine.logger.FixMessageConsumer;
import uk.co.real_logic.artio.engine.logger.ReproductionFixProtocolConsumer;
import uk.co.real_logic.artio.fixp.FixPMessageConsumer;
import uk.co.real_logic.artio.messages.ApplicationHeartbeatDecoder;
import uk.co.real_logic.artio.messages.ConnectDecoder;
import uk.co.real_logic.artio.messages.FixMessageDecoder;
import uk.co.real_logic.artio.messages.FixPMessageDecoder;
import uk.co.real_logic.artio.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.messages.ReplayerTimestampDecoder;

public class StreamTimestampZipper
implements AutoCloseable {
    private static final TimestampComparator TIMESTAMP_COMPARATOR = new TimestampComparator();
    private static final UnstableReverseTimestampComparator REVERSE_TIMESTAMP_COMPARATOR = new UnstableReverseTimestampComparator();
    private static final OffsetComparator OFFSET_COMPARATOR = new OffsetComparator();
    private final int maximumBufferSize;
    private final int compactionSize;
    private final StreamPoller[] pollers;
    private final FragmentAssembler fragmentAssembler;
    private final LogEntryHandler logEntryHandler;
    private final ExpandableArrayBuffer reorderBuffer;
    private final boolean lazilyCompact;
    private final ArrayList<BufferedPosition> positions = new ArrayList();
    private int reorderBufferOffset;

    public StreamTimestampZipper(FixMessageConsumer fixMessageConsumer, FixPMessageConsumer fixPMessageConsumer, int compactionSize, int maximumBufferSize, boolean lazilyCompact, Poller ... pollers) {
        this.maximumBufferSize = maximumBufferSize;
        this.lazilyCompact = lazilyCompact;
        this.compactionSize = compactionSize;
        this.pollers = new StreamPoller[pollers.length];
        for (int i = 0; i < pollers.length; ++i) {
            this.pollers[i] = new StreamPoller(pollers[i]);
        }
        this.reorderBuffer = new ExpandableArrayBuffer(compactionSize);
        this.logEntryHandler = new LogEntryHandler(fixMessageConsumer, fixPMessageConsumer);
        this.fragmentAssembler = new FragmentAssembler((FragmentHandler)this.logEntryHandler);
    }

    public int poll(int fragmentLimit) {
        int read = 0;
        StreamPoller[] pollers = this.pollers;
        int size = pollers.length;
        for (int i = 0; i < size && (read += pollers[i].poll(pollers, this.fragmentAssembler, fragmentLimit)) < fragmentLimit; ++i) {
        }
        if (!(read <= 0 || read > fragmentLimit || this.lazilyCompact && this.reorderBufferOffset <= this.compactionSize)) {
            read += this.processReorderBuffer(pollers, fragmentLimit - read);
            this.compact();
        }
        return read;
    }

    private int processReorderBuffer(StreamPoller[] pollers, int fragmentLimit) {
        int lastIndex;
        int i;
        ArrayList<BufferedPosition> positions = this.positions;
        int read = 0;
        positions.sort(REVERSE_TIMESTAMP_COMPARATOR);
        for (i = lastIndex = positions.size() - 1; i >= 0; --i) {
            BufferedPosition position = positions.get(i);
            long timestamp = position.timestamp;
            StreamPoller owner = position.owner;
            long timestampLowWaterMark = this.findMinLowWaterMark(pollers, owner);
            if (timestamp > timestampLowWaterMark) break;
            owner.handledTimestamp(timestamp);
            owner.elementsInBuffer--;
            this.logEntryHandler.owner = owner;
            this.logEntryHandler.onBufferedMessage(position.offset, position.length);
            this.updateOwnerTimestamp(positions, i, owner);
            if (++read >= fragmentLimit) break;
        }
        ++i;
        for (int j = lastIndex; j >= i; --j) {
            positions.remove(j);
        }
        return read;
    }

    private void updateOwnerTimestamp(ArrayList<BufferedPosition> positions, int i, StreamPoller owner) {
        for (int j = i - 1; j >= 0; --j) {
            BufferedPosition remainingPosition = positions.get(j);
            if (remainingPosition.owner != owner) continue;
            owner.minBufferedTimestamp = remainingPosition.timestamp;
            return;
        }
        owner.nothingBuffered();
    }

    private boolean compact() {
        if (this.reorderBufferOffset > this.compactionSize) {
            this.positions.sort(OFFSET_COMPARATOR);
            int reorderBufferOffset = 0;
            for (BufferedPosition position : this.positions) {
                int offset = position.offset;
                if (offset == reorderBufferOffset) {
                    return false;
                }
                int length = position.length;
                position.offset = reorderBufferOffset;
                this.reorderBuffer.putBytes(reorderBufferOffset, (DirectBuffer)this.reorderBuffer, offset, length);
                int newReorderBufferOffset = reorderBufferOffset + length;
                StreamTimestampZipper.validateReorderBufferOffset(length, reorderBufferOffset, newReorderBufferOffset);
                reorderBufferOffset = newReorderBufferOffset;
            }
            this.reorderBufferOffset = reorderBufferOffset;
        }
        return true;
    }

    public int bufferPosition() {
        return this.reorderBufferOffset;
    }

    public int bufferCapacity() {
        return this.reorderBuffer.capacity();
    }

    private void dumpBuffer() {
        LogEntryHandler logEntryHandler = this.logEntryHandler;
        ArrayList<BufferedPosition> positions = this.positions;
        int size = positions.size();
        positions.sort(TIMESTAMP_COMPARATOR);
        for (int i = 0; i < size; ++i) {
            BufferedPosition position = (BufferedPosition)positions.get(i);
            logEntryHandler.owner = position.owner;
            logEntryHandler.onBufferedMessage(position.offset, position.length);
        }
        positions.clear();
        for (StreamPoller poller : this.pollers) {
            poller.elementsInBuffer = 0;
        }
        this.reorderBufferOffset = 0;
    }

    @Override
    public void close() {
        this.dumpBuffer();
        for (StreamPoller poller : this.pollers) {
            poller.close();
        }
    }

    private long findMinLowWaterMark(StreamPoller[] pollers, StreamPoller owner) {
        long timestampLowWaterMark = Long.MAX_VALUE;
        for (int i = 0; i < pollers.length; ++i) {
            StreamPoller poller = pollers[i];
            if (poller == owner || poller.isDrained()) continue;
            timestampLowWaterMark = Math.min(timestampLowWaterMark, poller.timestampLowWaterMark());
        }
        return timestampLowWaterMark;
    }

    private static void validateReorderBufferOffset(int length, int reorderBufferOffset, int newReorderBufferOffset) {
        if (newReorderBufferOffset < 0) {
            throw new IllegalStateException("Detected negative newReorderBufferOffset: " + newReorderBufferOffset + ", reorderBufferOffset=" + reorderBufferOffset + ", length=" + length);
        }
    }

    public static interface Poller {
        public int poll(FragmentAssembler var1, int var2);

        public int streamId();

        public void close();

        public boolean isComplete();
    }

    class LogEntryHandler
    implements FragmentHandler {
        private final MessageHeaderDecoder messageHeader = new MessageHeaderDecoder();
        private final FixMessageDecoder fixMessage = new FixMessageDecoder();
        private final FixPMessageDecoder fixpMessage = new FixPMessageDecoder();
        private final ReplayerTimestampDecoder replayerTimestamp = new ReplayerTimestampDecoder();
        private final ApplicationHeartbeatDecoder applicationHeartbeat = new ApplicationHeartbeatDecoder();
        private final ConnectDecoder connect = new ConnectDecoder();
        private final FixMessageConsumer fixHandler;
        private final ReproductionFixProtocolConsumer reproductionFixProtocolHandler;
        private final FixPMessageConsumer fixPHandler;
        StreamPoller owner;
        long maxTimestampToHandle;

        LogEntryHandler(FixMessageConsumer fixHandler, FixPMessageConsumer fixPHandler) {
            this.fixHandler = fixHandler;
            this.reproductionFixProtocolHandler = fixHandler instanceof ReproductionFixProtocolConsumer ? (ReproductionFixProtocolConsumer)fixHandler : null;
            this.fixPHandler = fixPHandler;
        }

        public void onFragment(DirectBuffer buffer, int start, int length, Header header) {
            ReproductionFixProtocolConsumer reproductionHandler;
            int offset = start;
            this.messageHeader.wrap(buffer, offset);
            int templateId = this.messageHeader.templateId();
            int blockLength = this.messageHeader.blockLength();
            int version = this.messageHeader.version();
            if (templateId == 1) {
                this.onFixMessage(buffer, start, length, offset, blockLength, version);
            } else if (templateId == 61) {
                this.onReplayTimestamp(buffer, offset, blockLength, version);
            } else if (templateId == 16) {
                this.applicationHeartbeat.wrap(buffer, offset += 8, blockLength, version);
                long timestampInNs = this.applicationHeartbeat.timestampInNs();
                ReproductionFixProtocolConsumer reproductionHandler2 = this.reproductionFixProtocolHandler;
                if (reproductionHandler2 != null) {
                    if (timestampInNs <= this.maxTimestampToHandle) {
                        this.owner.handledTimestamp(timestampInNs);
                        reproductionHandler2.onApplicationHeartbeat(this.applicationHeartbeat, buffer, offset, length);
                    } else {
                        this.putBufferedMessage(buffer, start, length, timestampInNs);
                    }
                } else {
                    this.owner.handledTimestamp(timestampInNs);
                }
            } else if (templateId == 58) {
                this.fixpMessage.wrap(buffer, offset += 8, blockLength, version);
                offset += 24;
                long timestamp = this.fixpMessage.enqueueTime();
                if (timestamp <= this.maxTimestampToHandle) {
                    this.owner.handledTimestamp(timestamp);
                    this.fixPHandler.onMessage(this.fixpMessage, buffer, offset, this.owner.header);
                } else {
                    this.putBufferedMessage(buffer, start, length, timestamp);
                }
            } else if (templateId == 33 && (reproductionHandler = this.reproductionFixProtocolHandler) != null) {
                this.connect.wrap(buffer, offset += 8, blockLength, version);
                long timestamp = this.connect.timestamp();
                if (timestamp <= this.maxTimestampToHandle) {
                    this.owner.handledTimestamp(timestamp);
                    reproductionHandler.onConnect(this.connect, buffer, start, length);
                } else {
                    this.putBufferedMessage(buffer, start, length, timestamp);
                }
            }
        }

        private void onReplayTimestamp(DirectBuffer buffer, int start, int blockLength, int version) {
            int offset = start + 8;
            this.replayerTimestamp.wrap(buffer, offset, blockLength, version);
            long timestampInNs = this.replayerTimestamp.timestamp();
            this.owner.handledTimestamp(timestampInNs);
        }

        private void onFixMessage(DirectBuffer buffer, int start, int length, int prevOffset, int blockLength, int version) {
            long timestamp;
            int offset = prevOffset + 8;
            FixMessageDecoder fixMessage = this.fixMessage;
            fixMessage.wrap(buffer, offset, blockLength, version);
            if (version >= FixMessageDecoder.metaDataSinceVersion()) {
                offset += FixMessageDecoder.metaDataHeaderLength() + fixMessage.metaDataLength();
                fixMessage.skipMetaData();
            }
            if ((timestamp = fixMessage.timestamp()) <= this.maxTimestampToHandle) {
                this.owner.handledTimestamp(timestamp);
                this.onFixMessage(offset, buffer, fixMessage);
            } else {
                this.putBufferedMessage(buffer, start, length, timestamp);
            }
        }

        private void putBufferedMessage(DirectBuffer buffer, int start, int length, long timestamp) {
            if (StreamTimestampZipper.this.reorderBufferOffset + length > StreamTimestampZipper.this.maximumBufferSize) {
                StreamTimestampZipper.this.dumpBuffer();
            }
            int reorderBufferOffset = StreamTimestampZipper.this.reorderBufferOffset;
            this.owner.bufferedTimestamp(timestamp);
            this.owner.elementsInBuffer++;
            StreamTimestampZipper.this.reorderBuffer.putBytes(reorderBufferOffset, buffer, start, length);
            StreamTimestampZipper.this.positions.add(new BufferedPosition(this.owner, timestamp, reorderBufferOffset, length));
            int newReorderBufferOffset = reorderBufferOffset + length;
            StreamTimestampZipper.this.reorderBufferOffset = newReorderBufferOffset;
            StreamTimestampZipper.validateReorderBufferOffset(length, reorderBufferOffset, newReorderBufferOffset);
        }

        void reset(long minOtherTimestamp, StreamPoller owner) {
            this.maxTimestampToHandle = minOtherTimestamp;
            this.owner = owner;
        }

        public void onBufferedMessage(int start, int length) {
            ReproductionFixProtocolConsumer reproductionHandler;
            int offset = start;
            ExpandableArrayBuffer buffer = StreamTimestampZipper.this.reorderBuffer;
            MessageHeaderDecoder messageHeader = this.messageHeader;
            messageHeader.wrap((DirectBuffer)buffer, offset);
            int templateId = messageHeader.templateId();
            int blockLength = messageHeader.blockLength();
            int version = messageHeader.version();
            offset += 8;
            if (templateId == 1) {
                FixMessageDecoder fixMessage = this.fixMessage;
                fixMessage.wrap((DirectBuffer)buffer, offset, blockLength, version);
                if (version >= FixMessageDecoder.metaDataSinceVersion()) {
                    offset += FixMessageDecoder.metaDataHeaderLength() + fixMessage.metaDataLength();
                    fixMessage.skipMetaData();
                }
                this.onFixMessage(offset, (DirectBuffer)buffer, fixMessage);
            } else if (templateId == 58) {
                this.fixpMessage.wrap((DirectBuffer)buffer, offset, blockLength, version);
                this.fixPHandler.onMessage(this.fixpMessage, (DirectBuffer)buffer, offset += 24, this.owner.header);
            } else if (templateId == 33) {
                ReproductionFixProtocolConsumer reproductionHandler2 = this.reproductionFixProtocolHandler;
                if (reproductionHandler2 != null) {
                    ConnectDecoder connect = this.connect;
                    connect.wrap((DirectBuffer)buffer, offset, blockLength, version);
                    reproductionHandler2.onConnect(connect, (DirectBuffer)buffer, start, length);
                }
            } else if (templateId == 16 && (reproductionHandler = this.reproductionFixProtocolHandler) != null) {
                ApplicationHeartbeatDecoder applicationHeartbeat = this.applicationHeartbeat;
                applicationHeartbeat.wrap((DirectBuffer)buffer, offset, blockLength, version);
                reproductionHandler.onApplicationHeartbeat(applicationHeartbeat, (DirectBuffer)buffer, offset, length);
            }
        }

        private void onFixMessage(int offset, DirectBuffer buffer, FixMessageDecoder fixMessage) {
            int messageLength = fixMessage.bodyLength();
            this.fixHandler.onMessage(fixMessage, buffer, offset + 57 + FixMessageDecoder.bodyHeaderLength(), messageLength, this.owner.header);
        }
    }

    class StreamPoller {
        private static final long NOTHING_BUFFERED = -1L;
        private final ArtioLogHeader header;
        private final Poller poller;
        private long minBufferedTimestamp = -1L;
        private long maxHandledTimestamp;
        private boolean isDrained = false;
        private int elementsInBuffer = 0;

        StreamPoller(Poller poller) {
            this.poller = poller;
            this.header = new ArtioLogHeader(poller.streamId());
        }

        public int poll(StreamPoller[] pollers, FragmentAssembler fragmentAssembler, int fragmentLimit) {
            long minOtherTimestamp = StreamTimestampZipper.this.findMinLowWaterMark(pollers, this);
            StreamTimestampZipper.this.logEntryHandler.reset(minOtherTimestamp, this);
            return this.poller.poll(fragmentAssembler, fragmentLimit);
        }

        long timestampLowWaterMark() {
            return this.minBufferedTimestamp == -1L ? this.maxHandledTimestamp : this.minBufferedTimestamp;
        }

        void handledTimestamp(long timestamp) {
            this.maxHandledTimestamp = Math.max(this.maxHandledTimestamp, timestamp);
        }

        void bufferedTimestamp(long timestamp) {
            this.minBufferedTimestamp = this.minBufferedTimestamp == -1L ? timestamp : Math.min(this.minBufferedTimestamp, timestamp);
        }

        public void nothingBuffered() {
            this.minBufferedTimestamp = -1L;
        }

        public String toString() {
            return "StreamPoller{header=" + this.header + ", isDrained=" + this.isDrained + ", poller=" + this.poller + '}';
        }

        public void close() {
            this.poller.close();
        }

        boolean isDrained() {
            if (this.isDrained) {
                return true;
            }
            if (!this.poller.isComplete()) {
                return false;
            }
            if (this.elementsInBuffer > 0) {
                return false;
            }
            this.isDrained = true;
            return true;
        }
    }

    static class OffsetComparator
    implements Comparator<BufferedPosition> {
        OffsetComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            return Long.compare(o1.offset, o2.offset);
        }
    }

    static class UnstableReverseTimestampComparator
    implements Comparator<BufferedPosition> {
        UnstableReverseTimestampComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            int timestampCompare = Long.compare(o2.timestamp, o1.timestamp);
            if (timestampCompare == 0) {
                return Long.compare(o2.offset, o1.offset);
            }
            return timestampCompare;
        }
    }

    static class TimestampComparator
    implements Comparator<BufferedPosition> {
        TimestampComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            return Long.compare(o1.timestamp, o2.timestamp);
        }
    }

    static class BufferedPosition {
        final StreamPoller owner;
        final long timestamp;
        int offset;
        final int length;

        BufferedPosition(StreamPoller owner, long timestamp, int offset, int length) {
            this.owner = owner;
            this.timestamp = timestamp;
            this.offset = offset;
            this.length = length;
        }

        public String toString() {
            return "BufferedPosition{owner=" + this.owner.poller.streamId() + ", timestamp=" + this.timestamp + ", offset=" + this.offset + ", length=" + this.length + '}';
        }
    }
}

