/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.logger;

import io.aeron.FragmentAssembler;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import uk.co.real_logic.artio.ArtioLogHeader;
import uk.co.real_logic.artio.engine.logger.FixMessageConsumer;
import uk.co.real_logic.artio.fixp.FixPMessageConsumer;
import uk.co.real_logic.artio.messages.FixMessageDecoder;
import uk.co.real_logic.artio.messages.FixPMessageDecoder;
import uk.co.real_logic.artio.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.messages.ReplayerTimestampDecoder;

public class StreamTimestampZipper {
    private static final TimestampComparator TIMESTAMP_COMPARATOR = new TimestampComparator();
    private static final OffsetComparator OFFSET_COMPARATOR = new OffsetComparator();
    private final int compactionSize;
    private final StreamPoller[] pollers;
    private final FragmentAssembler fragmentAssembler;
    private final LogEntryHandler logEntryHandler;
    private final List<BufferedPosition> positions = new ArrayList<BufferedPosition>();
    private final ExpandableArrayBuffer reorderBuffer = new ExpandableArrayBuffer();
    private int reorderBufferOffset;

    public StreamTimestampZipper(FixMessageConsumer fixMessageConsumer, FixPMessageConsumer fixPMessageConsumer, int compactionSize, Poller ... pollers) {
        this.compactionSize = compactionSize;
        this.pollers = new StreamPoller[pollers.length];
        for (int i = 0; i < pollers.length; ++i) {
            this.pollers[i] = new StreamPoller(pollers[i]);
        }
        this.logEntryHandler = new LogEntryHandler(fixMessageConsumer, fixPMessageConsumer);
        this.fragmentAssembler = new FragmentAssembler((FragmentHandler)this.logEntryHandler);
    }

    public int poll() {
        int read = 0;
        StreamPoller[] pollers = this.pollers;
        int size = pollers.length;
        for (int i = 0; i < size; ++i) {
            read += pollers[i].poll(pollers, this.fragmentAssembler);
        }
        this.compact();
        return read += this.processReorderBuffer(pollers);
    }

    private int processReorderBuffer(StreamPoller[] pollers) {
        int read = 0;
        this.positions.sort(TIMESTAMP_COMPARATOR);
        Iterator<BufferedPosition> it = this.positions.iterator();
        while (it.hasNext()) {
            BufferedPosition position = it.next();
            long timestamp = position.timestamp;
            long timestampLowWaterMark = StreamTimestampZipper.findMinLowWaterMark(pollers, position.owner);
            if (timestamp > timestampLowWaterMark) break;
            position.owner.handledTimestamp(timestamp);
            this.logEntryHandler.owner = position.owner;
            this.logEntryHandler.onBufferedMessage(position.offset, position.length);
            ++read;
            it.remove();
            this.updateOwnerPosition(position);
        }
        return read;
    }

    private void compact() {
        if (this.reorderBufferOffset > this.compactionSize) {
            this.positions.sort(OFFSET_COMPARATOR);
            int reorderBufferOffset = 0;
            for (BufferedPosition position : this.positions) {
                int offset = position.offset;
                if (offset == reorderBufferOffset) {
                    return;
                }
                int length = position.length;
                position.offset = reorderBufferOffset;
                this.reorderBuffer.putBytes(reorderBufferOffset, (DirectBuffer)this.reorderBuffer, offset, length);
                reorderBufferOffset += length;
            }
            this.reorderBufferOffset = reorderBufferOffset;
        }
    }

    public int bufferPosition() {
        return this.reorderBufferOffset;
    }

    public int bufferCapacity() {
        return this.reorderBuffer.capacity();
    }

    private void updateOwnerPosition(BufferedPosition position) {
        StreamPoller owner = position.owner;
        for (BufferedPosition remainingPosition : this.positions) {
            if (remainingPosition.owner != owner) continue;
            owner.minBufferedTimestamp = remainingPosition.timestamp;
            return;
        }
        owner.nothingBuffered();
    }

    public void onClose() {
        LogEntryHandler logEntryHandler = this.logEntryHandler;
        List<BufferedPosition> positions = this.positions;
        int size = positions.size();
        positions.sort(TIMESTAMP_COMPARATOR);
        for (int i = 0; i < size; ++i) {
            BufferedPosition position = positions.get(i);
            logEntryHandler.owner = position.owner;
            logEntryHandler.onBufferedMessage(position.offset, position.length);
        }
        positions.clear();
        this.reorderBufferOffset = 0;
        for (StreamPoller poller : this.pollers) {
            poller.close();
        }
    }

    private static long findMinLowWaterMark(StreamPoller[] pollers, StreamPoller owner) {
        long timestampLowWaterMark = Long.MAX_VALUE;
        for (int i = 0; i < pollers.length; ++i) {
            StreamPoller poller = pollers[i];
            if (poller == owner) continue;
            timestampLowWaterMark = Math.min(timestampLowWaterMark, poller.timestampLowWaterMark());
        }
        return timestampLowWaterMark;
    }

    public static interface Poller {
        public int poll(FragmentAssembler var1);

        public int streamId();

        public void close();
    }

    class LogEntryHandler
    implements FragmentHandler {
        private final MessageHeaderDecoder messageHeader = new MessageHeaderDecoder();
        private final FixMessageDecoder fixMessage = new FixMessageDecoder();
        private final FixPMessageDecoder iLinkMessage = new FixPMessageDecoder();
        private final ReplayerTimestampDecoder replayerTimestamp = new ReplayerTimestampDecoder();
        private final FixMessageConsumer fixHandler;
        private final FixPMessageConsumer fixPHandler;
        StreamPoller owner;
        long maxTimestampToHandle;

        LogEntryHandler(FixMessageConsumer fixHandler, FixPMessageConsumer fixPHandler) {
            this.fixHandler = fixHandler;
            this.fixPHandler = fixPHandler;
        }

        public void onFragment(DirectBuffer buffer, int start, int length, Header header) {
            int offset = start;
            this.messageHeader.wrap(buffer, offset);
            int templateId = this.messageHeader.templateId();
            int blockLength = this.messageHeader.blockLength();
            int version = this.messageHeader.version();
            if (templateId == 1) {
                long timestamp;
                this.fixMessage.wrap(buffer, offset += 8, blockLength, version);
                if (version >= FixMessageDecoder.metaDataSinceVersion()) {
                    offset += FixMessageDecoder.metaDataHeaderLength() + this.fixMessage.metaDataLength();
                    this.fixMessage.skipMetaData();
                }
                if ((timestamp = this.fixMessage.timestamp()) <= this.maxTimestampToHandle) {
                    this.owner.handledTimestamp(timestamp);
                    this.fixHandler.onMessage(this.fixMessage, buffer, offset, length, this.owner.header);
                } else {
                    this.putBufferedMessage(buffer, start, length, timestamp);
                }
            } else if (templateId == 61) {
                this.replayerTimestamp.wrap(buffer, offset += 8, blockLength, version);
                long timestamp = this.replayerTimestamp.timestamp();
                this.owner.handledTimestamp(timestamp);
            } else if (templateId == 58) {
                this.iLinkMessage.wrap(buffer, offset += 8, blockLength, version);
                offset += 24;
                long timestamp = this.iLinkMessage.enqueueTime();
                if (timestamp <= this.maxTimestampToHandle) {
                    this.owner.handledTimestamp(timestamp);
                    this.fixPHandler.onMessage(this.iLinkMessage, buffer, offset, this.owner.header);
                } else {
                    this.putBufferedMessage(buffer, start, length, timestamp);
                }
            }
        }

        private void putBufferedMessage(DirectBuffer buffer, int start, int length, long timestamp) {
            this.owner.bufferedTimestamp(timestamp);
            StreamTimestampZipper.this.reorderBuffer.putBytes(StreamTimestampZipper.this.reorderBufferOffset, buffer, start, length);
            StreamTimestampZipper.this.positions.add(new BufferedPosition(this.owner, timestamp, StreamTimestampZipper.this.reorderBufferOffset, length));
            StreamTimestampZipper.this.reorderBufferOffset = StreamTimestampZipper.this.reorderBufferOffset + length;
        }

        void reset(long minOtherTimestamp, StreamPoller owner) {
            this.maxTimestampToHandle = minOtherTimestamp;
            this.owner = owner;
        }

        public void onBufferedMessage(int start, int length) {
            int offset = start;
            ExpandableArrayBuffer buffer = StreamTimestampZipper.this.reorderBuffer;
            MessageHeaderDecoder messageHeader = this.messageHeader;
            messageHeader.wrap((DirectBuffer)buffer, offset);
            int templateId = messageHeader.templateId();
            int blockLength = messageHeader.blockLength();
            int version = messageHeader.version();
            if (templateId == 1) {
                this.fixMessage.wrap((DirectBuffer)buffer, offset += 8, blockLength, version);
                if (version >= FixMessageDecoder.metaDataSinceVersion()) {
                    offset += FixMessageDecoder.metaDataHeaderLength() + this.fixMessage.metaDataLength();
                    this.fixMessage.skipMetaData();
                }
                this.fixHandler.onMessage(this.fixMessage, (DirectBuffer)buffer, offset, length, this.owner.header);
            } else if (templateId == 58) {
                this.iLinkMessage.wrap((DirectBuffer)buffer, offset += 8, blockLength, version);
                this.fixPHandler.onMessage(this.iLinkMessage, (DirectBuffer)buffer, offset += 24, this.owner.header);
            }
        }
    }

    class StreamPoller {
        private static final long NOTHING_BUFFERED = -1L;
        private final ArtioLogHeader header;
        private final Poller poller;
        private long minBufferedTimestamp = -1L;
        private long maxHandledTimestamp;

        StreamPoller(Poller poller) {
            this.poller = poller;
            this.header = new ArtioLogHeader(poller.streamId());
        }

        public int poll(StreamPoller[] pollers, FragmentAssembler fragmentAssembler) {
            long minOtherTimestamp = StreamTimestampZipper.findMinLowWaterMark(pollers, this);
            StreamTimestampZipper.this.logEntryHandler.reset(minOtherTimestamp, this);
            return this.poller.poll(fragmentAssembler);
        }

        long timestampLowWaterMark() {
            return this.minBufferedTimestamp == -1L ? this.maxHandledTimestamp : this.minBufferedTimestamp;
        }

        void handledTimestamp(long timestamp) {
            this.maxHandledTimestamp = timestamp;
        }

        void bufferedTimestamp(long timestamp) {
            this.minBufferedTimestamp = this.minBufferedTimestamp == -1L ? timestamp : Math.min(this.minBufferedTimestamp, timestamp);
        }

        public void nothingBuffered() {
            this.minBufferedTimestamp = -1L;
        }

        public String toString() {
            return "StreamPoller{header=" + this.header + '}';
        }

        public void close() {
            this.poller.close();
        }
    }

    static class OffsetComparator
    implements Comparator<BufferedPosition> {
        OffsetComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            return Long.compare(o1.offset, o2.offset);
        }
    }

    static class TimestampComparator
    implements Comparator<BufferedPosition> {
        TimestampComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            return Long.compare(o1.timestamp, o2.timestamp);
        }
    }

    static class BufferedPosition {
        final StreamPoller owner;
        final long timestamp;
        int offset;
        final int length;

        BufferedPosition(StreamPoller owner, long timestamp, int offset, int length) {
            this.owner = owner;
            this.timestamp = timestamp;
            this.offset = offset;
            this.length = length;
        }

        public String toString() {
            return "BufferedPosition{owner=" + this.owner.poller.streamId() + ", timestamp=" + this.timestamp + ", offset=" + this.offset + ", length=" + this.length + '}';
        }
    }
}

