/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.logger;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Subscription;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import uk.co.real_logic.artio.engine.logger.FixMessageConsumer;
import uk.co.real_logic.artio.ilink.ILinkMessageConsumer;
import uk.co.real_logic.artio.messages.FixMessageDecoder;
import uk.co.real_logic.artio.messages.ILinkMessageDecoder;
import uk.co.real_logic.artio.messages.MessageHeaderDecoder;
import uk.co.real_logic.artio.messages.ReplayerTimestampDecoder;

public class StreamTimestampZipper {
    private static final TimestampComparator TIMESTAMP_COMPARATOR = new TimestampComparator();
    private static final OffsetComparator OFFSET_COMPARATOR = new OffsetComparator();
    private final int compactionSize;
    private final StreamPoller[] pollers;
    private final FragmentAssembler fragmentAssembler;
    private final LogEntryHandler logEntryHandler;
    private final List<BufferedPosition> positions = new ArrayList<BufferedPosition>();
    private final ExpandableArrayBuffer reorderBuffer = new ExpandableArrayBuffer();
    private int reorderBufferOffset;

    public StreamTimestampZipper(Aeron aeron, String libraryAeronChannel, FixMessageConsumer fixMessageConsumer, ILinkMessageConsumer iLinkMessageConsumer, int compactionSize, int ... streamIds) {
        this.compactionSize = compactionSize;
        this.pollers = new StreamPoller[streamIds.length];
        for (int i = 0; i < streamIds.length; ++i) {
            int streamId = streamIds[i];
            this.pollers[i] = new StreamPoller(aeron.addSubscription(libraryAeronChannel, streamId));
        }
        this.logEntryHandler = new LogEntryHandler(fixMessageConsumer, iLinkMessageConsumer);
        this.fragmentAssembler = new FragmentAssembler((FragmentHandler)this.logEntryHandler);
    }

    public int poll() {
        int read = 0;
        StreamPoller[] pollers = this.pollers;
        int size = pollers.length;
        for (int i = 0; i < size; ++i) {
            read += pollers[i].poll(pollers, this.fragmentAssembler);
        }
        this.compact();
        return read += this.processReorderBuffer(pollers);
    }

    private int processReorderBuffer(StreamPoller[] pollers) {
        int read = 0;
        this.positions.sort(TIMESTAMP_COMPARATOR);
        Iterator<BufferedPosition> it = this.positions.iterator();
        while (it.hasNext()) {
            BufferedPosition position = it.next();
            long timestamp = position.timestamp;
            long minHandleTimestamp = StreamTimestampZipper.findMinOtherTimestamp(pollers, position.owner);
            if (timestamp > minHandleTimestamp) break;
            this.logEntryHandler.onBufferedMessage(position.offset, position.length);
            ++read;
            it.remove();
            this.updateOwnerPosition(position);
        }
        return read;
    }

    private void compact() {
        if (this.reorderBufferOffset > this.compactionSize) {
            this.positions.sort(OFFSET_COMPARATOR);
            int reorderBufferOffset = 0;
            for (BufferedPosition position : this.positions) {
                int offset = position.offset;
                if (offset == reorderBufferOffset) {
                    return;
                }
                int length = position.length;
                this.reorderBuffer.putBytes(reorderBufferOffset, (DirectBuffer)this.reorderBuffer, offset, length);
                reorderBufferOffset += length;
            }
            this.reorderBufferOffset = reorderBufferOffset;
        }
    }

    public int bufferPosition() {
        return this.reorderBufferOffset;
    }

    public int bufferCapacity() {
        return this.reorderBuffer.capacity();
    }

    private void updateOwnerPosition(BufferedPosition position) {
        StreamPoller owner = position.owner;
        for (BufferedPosition remainingPosition : this.positions) {
            if (remainingPosition.owner != owner) continue;
            owner.minBufferedTimestamp = remainingPosition.timestamp;
            return;
        }
        owner.minBufferedTimestamp = position.timestamp;
    }

    private static long findMinOtherTimestamp(StreamPoller[] pollers, StreamPoller owner) {
        long minOtherTimestamp = Long.MAX_VALUE;
        for (int i = 0; i < pollers.length; ++i) {
            StreamPoller poller = pollers[i];
            if (poller == owner) continue;
            minOtherTimestamp = Math.min(minOtherTimestamp, poller.minBufferedTimestamp);
        }
        return minOtherTimestamp;
    }

    class LogEntryHandler
    implements FragmentHandler {
        private final MessageHeaderDecoder messageHeader = new MessageHeaderDecoder();
        private final FixMessageDecoder fixMessage = new FixMessageDecoder();
        private final ILinkMessageDecoder iLinkMessage = new ILinkMessageDecoder();
        private final ReplayerTimestampDecoder replayerTimestamp = new ReplayerTimestampDecoder();
        private final FixMessageConsumer fixHandler;
        private final ILinkMessageConsumer iLinkHandler;
        StreamPoller owner;
        long maxTimestampToHandle;
        long minBufferedTimestamp;

        LogEntryHandler(FixMessageConsumer fixHandler, ILinkMessageConsumer iLinkHandler) {
            this.fixHandler = fixHandler;
            this.iLinkHandler = iLinkHandler;
        }

        public void onFragment(DirectBuffer buffer, int start, int length, Header header) {
            int offset = start;
            this.messageHeader.wrap(buffer, offset);
            int templateId = this.messageHeader.templateId();
            int blockLength = this.messageHeader.blockLength();
            int version = this.messageHeader.version();
            if (templateId == 1) {
                long timestamp;
                this.fixMessage.wrap(buffer, offset += 8, blockLength, version);
                if (version >= FixMessageDecoder.metaDataSinceVersion()) {
                    offset += FixMessageDecoder.metaDataHeaderLength() + this.fixMessage.metaDataLength();
                    this.fixMessage.skipMetaData();
                }
                if ((timestamp = this.fixMessage.timestamp()) <= this.maxTimestampToHandle) {
                    this.fixHandler.onMessage(this.fixMessage, buffer, offset, length, header);
                } else {
                    if (this.minBufferedTimestamp == 0L) {
                        this.minBufferedTimestamp = timestamp;
                    }
                    StreamTimestampZipper.this.reorderBuffer.putBytes(StreamTimestampZipper.this.reorderBufferOffset, buffer, start, length);
                    StreamTimestampZipper.this.positions.add(new BufferedPosition(this.owner, timestamp, StreamTimestampZipper.this.reorderBufferOffset, length));
                    StreamTimestampZipper.this.reorderBufferOffset = StreamTimestampZipper.this.reorderBufferOffset + length;
                }
            } else if (templateId == 61) {
                this.replayerTimestamp.wrap(buffer, offset += 8, blockLength, version);
                long timestamp = this.replayerTimestamp.timestamp();
                if (this.minBufferedTimestamp == 0L) {
                    this.minBufferedTimestamp = timestamp;
                }
            } else if (templateId == 58) {
                this.iLinkMessage.wrap(buffer, offset += 8, blockLength, version);
                offset += 16;
                long timestamp = this.iLinkMessage.enqueueTime();
                if (timestamp <= this.maxTimestampToHandle) {
                    this.iLinkHandler.onBusinessMessage(this.iLinkMessage, buffer, offset, header);
                } else {
                    if (this.minBufferedTimestamp == 0L) {
                        this.minBufferedTimestamp = timestamp;
                    }
                    StreamTimestampZipper.this.reorderBuffer.putBytes(StreamTimestampZipper.this.reorderBufferOffset, buffer, start, length);
                    StreamTimestampZipper.this.positions.add(new BufferedPosition(this.owner, timestamp, StreamTimestampZipper.this.reorderBufferOffset, length));
                    StreamTimestampZipper.this.reorderBufferOffset = StreamTimestampZipper.this.reorderBufferOffset + length;
                }
            }
        }

        void reset(long minOtherTimestamp, StreamPoller owner) {
            this.maxTimestampToHandle = minOtherTimestamp;
            this.minBufferedTimestamp = 0L;
            this.owner = owner;
        }

        public void onBufferedMessage(int start, int length) {
            ExpandableArrayBuffer buffer = StreamTimestampZipper.this.reorderBuffer;
            int offset = start;
            this.messageHeader.wrap((DirectBuffer)buffer, offset);
            int templateId = this.messageHeader.templateId();
            int blockLength = this.messageHeader.blockLength();
            int version = this.messageHeader.version();
            if (templateId == 1) {
                this.fixMessage.wrap((DirectBuffer)buffer, offset += 8, blockLength, version);
                if (version >= FixMessageDecoder.metaDataSinceVersion()) {
                    offset += FixMessageDecoder.metaDataHeaderLength() + this.fixMessage.metaDataLength();
                    this.fixMessage.skipMetaData();
                }
                this.fixHandler.onMessage(this.fixMessage, (DirectBuffer)buffer, offset, length, null);
            } else if (templateId == 58) {
                this.iLinkMessage.wrap((DirectBuffer)buffer, offset += 8, blockLength, version);
                this.iLinkHandler.onBusinessMessage(this.iLinkMessage, (DirectBuffer)buffer, offset += 16, null);
            }
        }
    }

    class StreamPoller {
        private final Subscription subscription;
        private long minBufferedTimestamp;

        StreamPoller(Subscription subscription) {
            this.subscription = subscription;
        }

        public int poll(StreamPoller[] pollers, FragmentAssembler fragmentAssembler) {
            long minOtherTimestamp = StreamTimestampZipper.findMinOtherTimestamp(pollers, this);
            StreamTimestampZipper.this.logEntryHandler.reset(minOtherTimestamp, this);
            int read = this.subscription.poll((FragmentHandler)fragmentAssembler, 10);
            long minBufferedTimestampByPoll = ((StreamTimestampZipper)StreamTimestampZipper.this).logEntryHandler.minBufferedTimestamp;
            if (minBufferedTimestampByPoll != 0L) {
                this.minBufferedTimestamp = minBufferedTimestampByPoll;
            }
            return read;
        }
    }

    static class OffsetComparator
    implements Comparator<BufferedPosition> {
        OffsetComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            return Long.compare(o1.offset, o2.offset);
        }
    }

    static class TimestampComparator
    implements Comparator<BufferedPosition> {
        TimestampComparator() {
        }

        @Override
        public int compare(BufferedPosition o1, BufferedPosition o2) {
            return Long.compare(o1.timestamp, o2.timestamp);
        }
    }

    static class BufferedPosition {
        final StreamPoller owner;
        final long timestamp;
        final int offset;
        final int length;

        BufferedPosition(StreamPoller owner, long timestamp, int offset, int length) {
            this.owner = owner;
            this.timestamp = timestamp;
            this.offset = offset;
            this.length = length;
        }

        public String toString() {
            return "BufferedPosition{owner=" + this.owner + ", timestamp=" + this.timestamp + ", offset=" + this.offset + ", length=" + this.length + '}';
        }
    }
}

