/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.session;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.stateless.engine.DataflowAbortedException;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stateless.session.StatelessProcessSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatelessProcessSession
extends StandardProcessSession {
    private static final Logger logger = LoggerFactory.getLogger(StatelessProcessSession.class);
    private final Connectable connectable;
    private final RepositoryContextFactory repositoryContextFactory;
    private final ProcessContextFactory processContextFactory;
    private final ProvenanceEventRepository provenanceEventRepository;
    private final ExecutionProgress executionProgress;
    private final AsynchronousCommitTracker tracker;
    private boolean requireSynchronousCommits;

    public StatelessProcessSession(Connectable connectable, RepositoryContextFactory repositoryContextFactory, ProvenanceEventRepository provenanceEventRepository, ProcessContextFactory processContextFactory, ExecutionProgress progress, boolean requireSynchronousCommits, AsynchronousCommitTracker tracker) {
        super(repositoryContextFactory.createRepositoryContext(connectable, provenanceEventRepository), progress::isCanceled, (PerformanceTracker)new NopPerformanceTracker());
        this.connectable = connectable;
        this.repositoryContextFactory = repositoryContextFactory;
        this.provenanceEventRepository = provenanceEventRepository;
        this.processContextFactory = processContextFactory;
        this.executionProgress = progress;
        this.requireSynchronousCommits = requireSynchronousCommits;
        this.tracker = tracker;
    }

    public void commitAsync() {
        if (!this.requireSynchronousCommits) {
            super.commitAsync();
            return;
        }
        super.commit();
    }

    public void commitAsync(Runnable onSuccess) {
        this.commitAsync(onSuccess, null);
    }

    public void commitAsync(Runnable onSuccess, Consumer<Throwable> onFailure) {
        if (!this.requireSynchronousCommits) {
            super.commitAsync();
            this.tracker.addCallback(this.connectable, onSuccess, onFailure, (ProcessSession)this);
            return;
        }
        try {
            super.commit();
        }
        catch (Throwable t) {
            logger.error("Failed to commit Process Session {} for {}", new Object[]{this, this.connectable, t});
            onFailure.accept(t);
            return;
        }
        try {
            onSuccess.run();
        }
        catch (Exception e) {
            logger.error("Committed Process Session {} for {} but failed to trigger success callback", new Object[]{this, this.connectable, e});
        }
    }

    protected void commit(StandardProcessSession.Checkpoint checkpoint, boolean asynchronous) {
        this.assertProgressNotCanceled();
        this.requireSynchronousCommits = this.requireSynchronousCommits || !asynchronous;
        int flowFileCounts = checkpoint.getFlowFilesIn() + checkpoint.getFlowFilesOut() + checkpoint.getFlowFilesRemoved();
        if (flowFileCounts > 0) {
            this.tracker.recordProgress(checkpoint.getFlowFilesOut() + checkpoint.getFlowFilesRemoved(), checkpoint.getBytesOut() + checkpoint.getBytesRemoved());
        }
        super.commit(checkpoint, asynchronous);
        if (!this.requireSynchronousCommits) {
            this.queueFollowOnComponents();
            return;
        }
        long followOnStart = System.nanoTime();
        this.triggerFollowOnComponents();
        long followOnNanos = System.nanoTime() - followOnStart;
        this.registerProcessEvent(this.connectable, -followOnNanos);
        this.awaitAcknowledgment();
    }

    private void triggerFollowOnComponents() {
        if (this.executionProgress.isTerminalPort(this.connectable)) {
            return;
        }
        block0: for (Connection connection : this.connectable.getConnections()) {
            while (!connection.getFlowFileQueue().isEmpty()) {
                Connectable connectable = connection.getDestination();
                if (this.isFailurePortGuaranteed(connectable)) {
                    throw new FailurePortEncounteredException("FlowFile was transferred to Port " + connectable.getName() + ", which is marked as a Failure Port", connectable.getName());
                }
                if (StandardStatelessFlow.isTerminalPort(connectable)) continue block0;
                this.triggerNext(connectable);
            }
        }
    }

    private void queueFollowOnComponents() {
        if (this.executionProgress.isTerminalPort(this.connectable)) {
            return;
        }
        for (Connection connection : this.connectable.getConnections()) {
            if (connection.getFlowFileQueue().isEmpty()) continue;
            Connectable connectable = connection.getDestination();
            if (this.isFailurePortGuaranteed(connectable)) {
                throw new FailurePortEncounteredException("FlowFile was transferred to Port " + connectable.getName() + ", which is marked as a Failure Port", connectable.getName());
            }
            if (StandardStatelessFlow.isTerminalPort(connectable)) continue;
            this.tracker.addConnectable(connectable);
        }
    }

    private boolean isFailurePortGuaranteed(Connectable connectable) {
        ConnectableType connectableType = connectable.getConnectableType();
        if (connectableType != ConnectableType.OUTPUT_PORT && connectableType != ConnectableType.FUNNEL) {
            return false;
        }
        if (this.executionProgress.isFailurePort(connectable.getName())) {
            return true;
        }
        for (Connection outboundConnection : connectable.getConnections()) {
            if (!this.isFailurePortGuaranteed(outboundConnection.getDestination())) continue;
            return true;
        }
        return false;
    }

    private void triggerNext(Connectable connectable) {
        this.assertProgressNotCanceled();
        ProcessContext connectableContext = this.processContextFactory.createProcessContext(connectable);
        StatelessProcessSessionFactory connectableSessionFactory = new StatelessProcessSessionFactory(connectable, this.repositoryContextFactory, this.provenanceEventRepository, this.processContextFactory, this.executionProgress, this.requireSynchronousCommits, new AsynchronousCommitTracker(this.tracker.getRootGroup()));
        logger.debug("Triggering {}", (Object)connectable);
        long start = System.nanoTime();
        try {
            connectable.onTrigger(connectableContext, (ProcessSessionFactory)connectableSessionFactory);
        }
        catch (Throwable t) {
            this.abortProcessing(t);
            throw t;
        }
        long nanos = System.nanoTime() - start;
        this.registerProcessEvent(connectable, nanos);
    }

    private void assertProgressNotCanceled() {
        if (this.executionProgress.isCanceled()) {
            logger.info("Completed processing for {} but execution has been canceled. Will not commit session.", (Object)this.connectable);
            this.abortProcessing(null);
            throw new DataflowAbortedException();
        }
    }

    private void awaitAcknowledgment() {
        ExecutionProgress.CompletionAction completionAction;
        if (this.executionProgress.isDataQueued()) {
            logger.debug("Completed processing for {} but data is queued for processing so will allow Process Session to complete without waiting for acknowledgment", (Object)this.connectable);
            return;
        }
        logger.debug("Completed processing for {}; no data is queued for processing so will await acknowledgment of completion", (Object)this.connectable);
        try {
            completionAction = this.executionProgress.awaitCompletionAction();
        }
        catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for dataflow completion to be acknowledged. Will roll back session.");
            this.abortProcessing(e);
            throw new DataflowAbortedException();
        }
        if (completionAction == ExecutionProgress.CompletionAction.CANCEL) {
            logger.info("Dataflow completed but action was canceled instead of being acknowledged. Will roll back session.");
            this.abortProcessing(null);
            throw new DataflowAbortedException();
        }
    }

    private void abortProcessing(Throwable cause) {
        if (cause == null) {
            this.executionProgress.notifyExecutionCanceled();
        } else {
            this.executionProgress.notifyExecutionFailed(cause);
        }
        try {
            this.rollback(false, true);
        }
        finally {
            this.purgeFlowFiles();
        }
    }

    private void purgeFlowFiles() {
        ProcessGroup rootGroup = this.getRootGroup();
        List allConnections = rootGroup.findAllConnections();
        for (Connection connection : allConnections) {
            DrainableFlowFileQueue flowFileQueue = (DrainableFlowFileQueue)connection.getFlowFileQueue();
            ArrayList<FlowFileRecord> flowFileRecords = new ArrayList<FlowFileRecord>(flowFileQueue.size().getObjectCount());
            flowFileQueue.drainTo(flowFileRecords);
            for (FlowFileRecord flowFileRecord : flowFileRecords) {
                this.getRepositoryContext().getContentRepository().decrementClaimantCount(flowFileRecord.getContentClaim());
            }
        }
    }

    private ProcessGroup getRootGroup() {
        ProcessGroup group = this.connectable.getProcessGroup();
        return this.getRootGroup(group);
    }

    private ProcessGroup getRootGroup(ProcessGroup group) {
        ProcessGroup parent = group.getParent();
        if (parent == null) {
            return group;
        }
        return this.getRootGroup(parent);
    }

    private void registerProcessEvent(Connectable connectable, long processingNanos) {
        try {
            StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
            procEvent.setProcessingNanos(processingNanos);
            procEvent.setInvocations(1);
            this.getRepositoryContext().getFlowFileEventRepository().updateRepository((FlowFileEvent)procEvent, connectable.getIdentifier());
        }
        catch (IOException e) {
            logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate.", connectable.getRunnableComponent(), (Object)e);
        }
    }

    public String toString() {
        return "StatelessProcessSession[id=" + this.getSessionId() + "]";
    }
}

