/*
 * Decompiled with CFR 0.152.
 */
package uk.co.real_logic.artio.engine.logger;

import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.List;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.CollectionUtil;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.IdleStrategy;
import uk.co.real_logic.artio.CommonConfiguration;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.LogTag;
import uk.co.real_logic.artio.dictionary.generation.Exceptions;
import uk.co.real_logic.artio.engine.CompletionPosition;
import uk.co.real_logic.artio.engine.logger.Index;

public class Indexer
implements Agent,
ControlledFragmentHandler {
    private static final int LIMIT = 20;
    private final List<Index> indices;
    private final Subscription subscription;
    private final String agentNamePrefix;
    private final CompletionPosition completionPosition;
    private final int archiveReplayStream;
    private final boolean gracefulShutdown;

    public Indexer(List<Index> indices, Subscription subscription, String agentNamePrefix, CompletionPosition completionPosition, AeronArchive aeronArchive, ErrorHandler errorHandler, int archiveReplayStream, boolean gracefulShutdown) {
        this.indices = indices;
        this.subscription = subscription;
        this.agentNamePrefix = agentNamePrefix;
        this.completionPosition = completionPosition;
        this.archiveReplayStream = archiveReplayStream;
        this.gracefulShutdown = gracefulShutdown;
        this.catchIndexUp(aeronArchive, errorHandler);
    }

    public int doWork() {
        return this.subscription.controlledPoll((ControlledFragmentHandler)this, 20) + CollectionUtil.sum(this.indices, Index::doWork);
    }

    private void catchIndexUp(AeronArchive aeronArchive, ErrorHandler errorHandler) {
        IdleStrategy idleStrategy = CommonConfiguration.backoffIdleStrategy();
        AgentInvoker aeronInvoker = aeronArchive.context().aeron().conductorAgentInvoker();
        int size = this.indices.size();
        for (int i = 0; i < size; ++i) {
            Index index = this.indices.get(i);
            index.readLastPosition((aeronSessionId, recordingId, indexStoppedPosition) -> {
                block16: {
                    try {
                        long recordingStoppedPosition = aeronArchive.getStopPosition(recordingId);
                        if (recordingStoppedPosition <= indexStoppedPosition) break block16;
                        DebugLogger.log(LogTag.INDEX, "Catchup [%s]: recordingId = %d, recordingStopped @ %d, indexStopped @ %d%n", index.getName(), recordingId, recordingStoppedPosition, indexStoppedPosition);
                        long length = recordingStoppedPosition - indexStoppedPosition;
                        try (Subscription subscription = aeronArchive.replay(recordingId, indexStoppedPosition, length, "aeron:ipc", this.archiveReplayStream);){
                            while (subscription.imageCount() != 1) {
                                this.idle(idleStrategy, aeronInvoker);
                                aeronArchive.checkForErrorResponse();
                            }
                            idleStrategy.reset();
                            Image replayImage = subscription.imageAtIndex(0);
                            while (replayImage.position() < recordingStoppedPosition) {
                                replayImage.poll((FragmentHandler)index, 20);
                                this.idle(idleStrategy, aeronInvoker);
                            }
                            idleStrategy.reset();
                        }
                    }
                    catch (ArchiveException ex) {
                        errorHandler.onError((Throwable)ex);
                    }
                }
            });
        }
    }

    private void idle(IdleStrategy idleStrategy, AgentInvoker aeronInvoker) {
        if (aeronInvoker != null) {
            aeronInvoker.invoke();
        }
        idleStrategy.idle();
    }

    public ControlledFragmentHandler.Action onFragment(DirectBuffer buffer, int offset, int length, Header header) {
        int streamId = header.streamId();
        int aeronSessionId = header.sessionId();
        long endPosition = header.position();
        DebugLogger.log(LogTag.INDEX, "Indexing @ %d from [%d, %d]%n", endPosition, (long)streamId, (long)aeronSessionId);
        int size = this.indices.size();
        for (int i = 0; i < size; ++i) {
            Index index = this.indices.get(i);
            index.onFragment(buffer, offset, length, header);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    public void onClose() {
        if (this.gracefulShutdown) {
            this.quiesce();
            Exceptions.closeAll((AutoCloseable[])new AutoCloseable[]{() -> Exceptions.closeAll(this.indices), this.subscription});
        }
    }

    private void quiesce() {
        while (!this.completionPosition.hasCompleted()) {
            Thread.yield();
        }
        if (this.completionPosition.wasStartupComplete()) {
            return;
        }
        this.subscription.controlledPoll(this::quiesceFragment, Integer.MAX_VALUE);
    }

    private ControlledFragmentHandler.Action quiesceFragment(DirectBuffer buffer, int offset, int length, Header header) {
        if (this.completedPosition(header.sessionId()) <= header.position()) {
            return this.onFragment(buffer, offset, length, header);
        }
        return ControlledFragmentHandler.Action.CONTINUE;
    }

    private long completedPosition(int aeronSessionId) {
        return this.completionPosition.positions().get((long)aeronSessionId);
    }

    public String roleName() {
        return this.agentNamePrefix + "Indexer";
    }
}

