/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.framer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Collections;
import java.util.List;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.engine.ReproductionMessageHandler;
import uk.co.real_logic.artio.engine.framer.ConnectionBackPressureEvent;
import uk.co.real_logic.artio.engine.framer.ReproductionLog;
import uk.co.real_logic.artio.engine.framer.TcpChannel;
import uk.co.real_logic.artio.engine.framer.TcpChannelSupplier;
import uk.co.real_logic.artio.messages.ConnectDecoder;
import uk.co.real_logic.artio.util.CharFormatter;

public class ReproductionTcpChannelSupplier
extends TcpChannelSupplier {
    private final CharFormatter missedEvent = new CharFormatter("Missed BP event: conn=%s,index=%s-%s");
    private final Long2ObjectHashMap<ReproductionTcpChannel> connectionIdToChannel = new Long2ObjectHashMap();
    private final ReproductionMessageHandler reproductionMessageHandler;
    private final ReproductionLog reproductionLog;
    private long connectionId;
    private String address;
    private Runnable endOperation;

    public ReproductionTcpChannelSupplier(ReproductionMessageHandler reproductionMessageHandler, ReproductionLog reproductionLog) {
        this.reproductionMessageHandler = reproductionMessageHandler;
        this.reproductionLog = reproductionLog;
    }

    public void registerEndOperation(Runnable endOperation) {
        this.endOperation = endOperation;
    }

    @Override
    public void open(InetSocketAddress address, TcpChannelSupplier.InitiatedChannelHandler channelHandler) throws IOException {
    }

    @Override
    public void stopConnecting(InetSocketAddress address) throws IOException {
    }

    @Override
    public int pollSelector(long timeInMs, TcpChannelSupplier.NewChannelHandler handler) throws IOException {
        if (this.address != null) {
            ReproductionTcpChannel channel = new ReproductionTcpChannel(this.connectionId, this.getEvents());
            this.connectionIdToChannel.put(this.connectionId, (Object)channel);
            handler.onNewChannel(timeInMs, channel);
            this.address = null;
        }
        return 0;
    }

    private List<ConnectionBackPressureEvent> getEvents() {
        if (this.reproductionLog == null) {
            return Collections.emptyList();
        }
        return this.reproductionLog.lookupEvents(this.connectionId);
    }

    @Override
    public void unbind() throws IOException {
    }

    @Override
    public void bind() throws IOException {
    }

    @Override
    public void close() throws Exception {
    }

    public void enqueueConnect(ConnectDecoder connectDecoder) {
        this.connectionId = connectDecoder.connection();
        this.address = connectDecoder.address();
    }

    public boolean enqueueMessage(long connectionId, DirectBuffer buffer, int initialOffset, int messageOffset, int length, boolean isResendRequest) {
        ReproductionTcpChannel channel = (ReproductionTcpChannel)this.connectionIdToChannel.get(connectionId);
        if (channel != null) {
            return channel.enqueueMessage(buffer, initialOffset, messageOffset, length, isResendRequest);
        }
        return false;
    }

    class ReproductionTcpChannel
    extends TcpChannel {
        private final ExpandableArrayBuffer reproductionBuffer;
        private final List<ConnectionBackPressureEvent> events;
        private int nextEvent;
        private final long connectionId;
        private int length;
        private boolean isResendRequest;

        ReproductionTcpChannel(long connectionId, List<ConnectionBackPressureEvent> events) throws IOException {
            super(ReproductionTcpChannelSupplier.this.address);
            this.reproductionBuffer = new ExpandableArrayBuffer();
            this.nextEvent = 0;
            this.connectionId = connectionId;
            this.events = events;
        }

        @Override
        public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
            return null;
        }

        @Override
        public int write(ByteBuffer src, int seqNum, boolean replay) throws IOException {
            ByteBuffer writeBuffer = src;
            List<ConnectionBackPressureEvent> events = this.events;
            if (events != null) {
                writeBuffer = this.checkBackpressureEvent(src, seqNum, replay, events);
            }
            int remaining = writeBuffer.remaining();
            ReproductionTcpChannelSupplier.this.reproductionMessageHandler.onMessage(this.connectionId, writeBuffer);
            return remaining;
        }

        private ByteBuffer checkBackpressureEvent(ByteBuffer src, int seqNum, boolean replay, List<ConnectionBackPressureEvent> events) {
            ConnectionBackPressureEvent event;
            int nextEvent = this.nextEvent;
            if (nextEvent < events.size() && (event = events.get(nextEvent)).replay() == replay) {
                int eventSeqNum = event.seqNum();
                if (seqNum > eventSeqNum) {
                    int candidateEventNum;
                    for (candidateEventNum = nextEvent; candidateEventNum < events.size(); ++candidateEventNum) {
                        event = events.get(candidateEventNum);
                        eventSeqNum = event.seqNum();
                        if (replay == event.replay() && seqNum >= eventSeqNum) break;
                    }
                    this.nextEvent = candidateEventNum;
                    if (DebugLogger.IS_REPLAY_LOG_TAG_ENABLED) {
                        ReproductionTcpChannelSupplier.this.missedEvent.clear().with(this.connectionId).with(nextEvent).with(candidateEventNum);
                        DebugLogger.log(LogTag.REPRODUCTION, ReproductionTcpChannelSupplier.this.missedEvent);
                    }
                }
                if (seqNum == eventSeqNum) {
                    int written = event.written();
                    ByteBuffer writeBuffer = src.duplicate();
                    writeBuffer.limit(writeBuffer.position() + written);
                    this.nextEvent = nextEvent + 1;
                    return writeBuffer;
                }
            }
            return src;
        }

        @Override
        public int read(ByteBuffer dst) throws IOException {
            int length = this.length;
            if (length > 0) {
                this.reproductionBuffer.getBytes(0, dst, length);
                this.length = 0;
                if (!this.isResendRequest) {
                    ReproductionTcpChannelSupplier.this.endOperation.run();
                }
                return length;
            }
            return 0;
        }

        @Override
        public void close() {
        }

        @Override
        public void onReplayComplete(long correlationId) {
            ReproductionTcpChannelSupplier.this.endOperation.run();
        }

        public boolean enqueueMessage(DirectBuffer buffer, int initialOffset, int messageOffset, int length, boolean isResendRequest) {
            if (this.length != 0) {
                return false;
            }
            this.reproductionBuffer.putBytes(0, buffer, initialOffset + messageOffset, length);
            this.isResendRequest = isResendRequest;
            this.length = length;
            return true;
        }
    }
}

