/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.hub.flow.impl;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.marklogic.hub.FlowManager;
import com.marklogic.hub.HubClient;
import com.marklogic.hub.HubConfig;
import com.marklogic.hub.dataservices.JobService;
import com.marklogic.hub.flow.Flow;
import com.marklogic.hub.flow.FlowInputs;
import com.marklogic.hub.flow.FlowRunner;
import com.marklogic.hub.flow.FlowStatusListener;
import com.marklogic.hub.flow.RunFlowResponse;
import com.marklogic.hub.flow.impl.JobStatus;
import com.marklogic.hub.impl.FlowManagerImpl;
import com.marklogic.hub.impl.HubConfigImpl;
import com.marklogic.hub.step.RunStepResponse;
import com.marklogic.hub.step.StepRunner;
import com.marklogic.hub.step.StepRunnerFactory;
import com.marklogic.hub.step.impl.Step;
import com.marklogic.hub.step.impl.StepRunnerUtil;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FlowRunnerImpl
implements FlowRunner {
    @Autowired
    private HubConfig hubConfig;
    @Autowired
    private FlowManager flowManager;
    private HubClient hubClient;
    final AtomicBoolean isRunning = new AtomicBoolean(false);
    final AtomicBoolean isJobCancelled = new AtomicBoolean(false);
    final AtomicBoolean isJobSuccess = new AtomicBoolean(true);
    final AtomicBoolean jobStoppedOnError = new AtomicBoolean(false);
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    String runningJobId;
    Step runningStep;
    Flow runningFlow;
    StepRunner stepRunner;
    final ConcurrentHashMap<String, Queue<String>> stepsMap = new ConcurrentHashMap();
    final Map<String, Flow> flowMap = new ConcurrentHashMap<String, Flow>();
    final Map<String, RunFlowResponse> flowResp = new ConcurrentHashMap<String, RunFlowResponse>();
    final Map<String, FlowContext> flowContextMap = new ConcurrentHashMap<String, FlowContext>();
    final Queue<String> jobQueue = new ConcurrentLinkedQueue<String>();
    final List<FlowStatusListener> flowStatusListeners = new ArrayList<FlowStatusListener>();
    ThreadPoolExecutor threadPool;

    public FlowRunnerImpl() {
    }

    public FlowRunnerImpl(HubConfig hubConfig, FlowManager flowManager) {
        this.hubConfig = hubConfig;
        this.flowManager = flowManager;
    }

    public FlowRunnerImpl(String host, String username, String password) {
        this(new HubConfigImpl(host, username, password).newHubClient());
    }

    public FlowRunnerImpl(HubClient hubClient) {
        this.hubClient = hubClient;
        this.flowManager = new FlowManagerImpl(hubClient);
    }

    @Deprecated
    public FlowRunnerImpl(HubConfig hubConfig) {
        this(hubConfig.newHubClient());
    }

    @Override
    public FlowRunner onStatusChanged(FlowStatusListener listener) {
        this.flowStatusListeners.add(listener);
        return this;
    }

    @Override
    public RunFlowResponse runFlow(FlowInputs flowInputs) {
        String flowName = flowInputs.getFlowName();
        if (StringUtils.isEmpty((CharSequence)flowName)) {
            throw new IllegalArgumentException("Cannot run flow; no flow name provided");
        }
        Flow flow = this.flowManager.getFullFlow(flowName);
        return this.runFlow(flow, flowInputs.getSteps(), flowInputs.getJobId(), flowInputs.getOptions(), flowInputs.getStepConfig());
    }

    protected RunFlowResponse runFlow(Flow flow, List<String> stepNumbers, String jobId, Map<String, Object> runtimeOptions, Map<String, Object> stepConfig) {
        FlowContext flowContext = new FlowContext(flow, runtimeOptions);
        if (flowContext.jobOutputIsEnabled) {
            flowContext.jobService = JobService.on(this.hubClient != null ? this.hubClient.getJobsClient() : this.hubConfig.newJobDbClient());
        }
        FlowRunnerImpl.configureStopOnError(flow, runtimeOptions);
        if (stepNumbers == null) {
            stepNumbers = new ArrayList<String>(flow.getSteps().keySet());
        }
        if (stepConfig != null && !stepConfig.isEmpty()) {
            flow.setOverrideStepConfig(stepConfig);
        }
        flow.setRuntimeOptions(runtimeOptions);
        Iterator<String> stepItr = stepNumbers.iterator();
        ConcurrentLinkedQueue<String> stepsQueue = new ConcurrentLinkedQueue<String>();
        while (stepItr.hasNext()) {
            String stepNum = stepItr.next();
            Step tmpStep = flow.getStep(stepNum);
            if (tmpStep == null) {
                throw new RuntimeException("Step " + stepNum + " not found in the flow");
            }
            stepsQueue.add(stepNum);
        }
        if (jobId == null) {
            jobId = UUID.randomUUID().toString();
        }
        RunFlowResponse response = new RunFlowResponse(jobId);
        response.setFlowName(flow.getName());
        this.flowResp.put(jobId, response);
        this.stepsMap.put(jobId, stepsQueue);
        this.flowMap.put(jobId, flow);
        this.flowContextMap.put(jobId, flowContext);
        this.jobQueue.add(jobId);
        if (!this.isRunning.get()) {
            StepRunnerFactory stepRunnerFactory = this.hubClient != null ? new StepRunnerFactory(this.hubClient) : new StepRunnerFactory(this.hubConfig);
            this.initializeFlow(stepRunnerFactory, jobId);
        }
        return response;
    }

    protected static void configureStopOnError(Flow flow, Map<String, Object> options) {
        Object value;
        if (options != null && (Boolean.TRUE.equals(value = options.get("stopOnError")) || "true".equals(value))) {
            flow.setStopOnError(true);
        }
    }

    void initializeFlow(StepRunnerFactory stepRunnerFactory, String jobId) {
        this.isRunning.set(true);
        this.isJobSuccess.set(true);
        this.isJobCancelled.set(false);
        this.jobStoppedOnError.set(false);
        this.runningJobId = jobId;
        this.runningFlow = this.flowMap.get(this.runningJobId);
        FlowContext flowContext = this.flowContextMap.get(jobId);
        if (flowContext.jobOutputIsEnabled) {
            flowContext.jobService.startJob(jobId, this.runningFlow.getName());
        }
        if (this.threadPool == null || this.threadPool.isTerminated()) {
            int maxPoolSize = Math.max(Runtime.getRuntime().availableProcessors() / 2, 2);
            this.threadPool = new CustomPoolExecutor(2, maxPoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        }
        this.threadPool.execute(new FlowRunnerTask(stepRunnerFactory, this.runningFlow, this.runningJobId));
    }

    @Override
    public void stopJob(String jobId) {
        if (this.stepsMap.containsKey(jobId)) {
            this.stepsMap.get(jobId).clear();
            this.stepsMap.remove(jobId);
        }
        this.isJobCancelled.set(true);
        if (jobId.equals(this.runningJobId) && this.stepRunner != null) {
            this.stepRunner.stop();
        }
    }

    protected static void copyJobDataToResponse(RunFlowResponse response, RunFlowResponse jobDocument) {
        response.setStartTime(jobDocument.getStartTime());
        response.setEndTime(jobDocument.getEndTime());
        response.setUser(jobDocument.getUser());
        response.setLastAttemptedStep(jobDocument.getLastAttemptedStep());
        response.setLastCompletedStep(jobDocument.getLastCompletedStep());
    }

    @Override
    public void awaitCompletion() {
        try {
            this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
        if (this.threadPool != null) {
            this.threadPool.awaitTermination(timeout, unit);
        }
    }

    public List<String> getQueuedJobIdsFromFlow(String flowName) {
        return this.flowMap.entrySet().stream().filter(entry -> flowName.equals(((Flow)entry.getValue()).getName())).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    public RunFlowResponse getJobResponseById(String jobId) {
        return this.flowResp.get(jobId);
    }

    public boolean isJobRunning() {
        return this.isRunning.get();
    }

    public String getRunningStepKey() {
        return this.runningFlow.getSteps().entrySet().stream().filter(entry -> Objects.equals(entry.getValue(), this.runningStep)).map(Map.Entry::getKey).collect(Collectors.joining());
    }

    public Flow getRunningFlow() {
        return this.runningFlow;
    }

    static class FlowContext {
        boolean jobOutputIsEnabled = true;
        JobService jobService;

        FlowContext(Flow flow, Map<String, Object> runtimeOptions) {
            this.calculateJobOutputIsEnabled(flow, runtimeOptions);
        }

        private void calculateJobOutputIsEnabled(Flow flow, Map<String, Object> runtimeOptions) {
            String optionName = "disableJobOutput";
            if (runtimeOptions != null && runtimeOptions.containsKey("disableJobOutput")) {
                this.jobOutputIsEnabled = !Boolean.parseBoolean(runtimeOptions.get("disableJobOutput").toString());
            } else {
                JsonNode flowOptions = flow.getOptions();
                if (flowOptions != null && flowOptions.has("disableJobOutput")) {
                    this.jobOutputIsEnabled = !flowOptions.get("disableJobOutput").asBoolean();
                }
            }
        }
    }

    class CustomPoolExecutor
    extends ThreadPoolExecutor {
        public CustomPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        public void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t == null && r instanceof Future) {
                try {
                    ((Future)((Object)r)).get();
                }
                catch (CancellationException e) {
                    t = e;
                }
                catch (ExecutionException e) {
                    t = e.getCause();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (t != null) {
                FlowRunnerImpl.this.logger.error("Caught error while running FlowRunnerTask: " + t.getMessage());
                FlowRunnerTask flowRunnerTask = (FlowRunnerTask)r;
                if (flowRunnerTask.getStepQueue() == null || flowRunnerTask.getStepQueue().isEmpty() || FlowRunnerImpl.this.runningFlow.isStopOnError()) {
                    FlowRunnerImpl.this.jobQueue.remove();
                    if (!FlowRunnerImpl.this.jobQueue.isEmpty()) {
                        FlowRunnerImpl.this.initializeFlow(flowRunnerTask.stepRunnerFactory, FlowRunnerImpl.this.jobQueue.peek());
                    } else {
                        FlowRunnerImpl.this.isRunning.set(false);
                        FlowRunnerImpl.this.threadPool.shutdownNow();
                    }
                } else if (FlowRunnerImpl.this.threadPool != null && !FlowRunnerImpl.this.threadPool.isTerminating()) {
                    FlowRunnerImpl.this.threadPool.execute(new FlowRunnerTask(flowRunnerTask.stepRunnerFactory, FlowRunnerImpl.this.runningFlow, FlowRunnerImpl.this.runningJobId));
                }
            }
        }
    }

    private class FlowRunnerTask
    implements Runnable {
        final StepRunnerFactory stepRunnerFactory;
        private final String jobId;
        private final Flow flow;
        private Queue<String> stepQueue;

        public Queue<String> getStepQueue() {
            return this.stepQueue;
        }

        FlowRunnerTask(StepRunnerFactory stepRunnerFactory, Flow flow, String jobId) {
            this.stepRunnerFactory = stepRunnerFactory;
            this.jobId = jobId;
            this.flow = flow;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Collection stepResps;
            long failedStepCount;
            RunFlowResponse resp = FlowRunnerImpl.this.flowResp.get(FlowRunnerImpl.this.runningJobId);
            resp.setFlowName(FlowRunnerImpl.this.runningFlow.getName());
            this.stepQueue = FlowRunnerImpl.this.stepsMap.get(this.jobId);
            FlowContext flowContext = FlowRunnerImpl.this.flowContextMap.get(this.jobId);
            HashMap<String, RunStepResponse> stepOutputs = new HashMap<String, RunStepResponse>();
            long[] currSuccessfulEvents = new long[]{0L};
            long[] currFailedEvents = new long[]{0L};
            int[] currPercentComplete = new int[]{0};
            resp.setStartTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            while (!this.stepQueue.isEmpty()) {
                String stepNum = this.stepQueue.poll();
                FlowRunnerImpl.this.runningStep = FlowRunnerImpl.this.runningFlow.getSteps().get(stepNum);
                HashMap<String, Object> runtimeOptions = this.flow.getRuntimeOptions() != null ? new HashMap<String, Object>(this.flow.getRuntimeOptions()) : new HashMap();
                AtomicLong errorCount = new AtomicLong();
                AtomicLong successCount = new AtomicLong();
                RunStepResponse stepResp2 = null;
                try {
                    boolean stepFailed;
                    FlowRunnerImpl.this.stepRunner = this.stepRunnerFactory.getStepRunner(FlowRunnerImpl.this.runningFlow, stepNum).withJobId(this.jobId).withRuntimeOptions(runtimeOptions).onItemComplete((jobID, itemID) -> successCount.incrementAndGet()).onItemFailed((jobId, itemId) -> {
                        errorCount.incrementAndGet();
                        if (this.flow.isStopOnError()) {
                            this.stopJobOnError(jobId);
                        }
                    }).onStatusChanged((jobId, percentComplete, jobStatus, successfulEvents, failedEvents, message) -> FlowRunnerImpl.this.flowStatusListeners.forEach(listener -> {
                        currSuccessfulEvents[0] = successfulEvents;
                        currFailedEvents[0] = failedEvents;
                        currPercentComplete[0] = percentComplete;
                        listener.onStatusChanged(jobId, FlowRunnerImpl.this.runningStep, jobStatus, percentComplete, successfulEvents, failedEvents, FlowRunnerImpl.this.runningStep.getName() + " : " + message);
                    }));
                    if (this.flow.getOverrideStepConfig() != null) {
                        FlowRunnerImpl.this.stepRunner.withStepConfig(this.flow.getOverrideStepConfig());
                    }
                    stepResp2 = FlowRunnerImpl.this.stepRunner.run();
                    FlowRunnerImpl.this.stepRunner.awaitCompletion();
                    boolean bl = stepFailed = stepResp2.getStatus() != null && stepResp2.getStatus().startsWith("failed");
                    if (stepFailed && FlowRunnerImpl.this.runningFlow.isStopOnError()) {
                        this.stopJobOnError(FlowRunnerImpl.this.runningJobId);
                    }
                    stepOutputs.put(stepNum, stepResp2);
                }
                catch (Exception e) {
                    try {
                        stepResp2 = RunStepResponse.withFlow(this.flow).withStep(stepNum);
                        stepResp2.withJobId(FlowRunnerImpl.this.runningJobId);
                        if (FlowRunnerImpl.this.stepRunner != null) {
                            stepResp2.setCounts(successCount.get() + errorCount.get(), successCount.get(), errorCount.get(), (long)Math.ceil((double)successCount.get() / (double)FlowRunnerImpl.this.stepRunner.getBatchSize()), (long)Math.ceil((double)errorCount.get() / (double)FlowRunnerImpl.this.stepRunner.getBatchSize()));
                        } else {
                            stepResp2.setCounts(0L, 0L, 0L, 0L, 0L);
                        }
                        StringWriter errors = new StringWriter();
                        e.printStackTrace(new PrintWriter(errors));
                        stepResp2.withStepOutput(errors.toString());
                        stepResp2.withSuccess(false);
                        if (successCount.get() > 0L) {
                            stepResp2.withStatus("completed with errors step " + stepNum);
                        } else {
                            stepResp2.withStatus("failed step " + stepNum);
                        }
                        RunStepResponse finalStepResp = stepResp2;
                        try {
                            FlowRunnerImpl.this.flowStatusListeners.forEach(listener -> listener.onStatusChanged(this.jobId, FlowRunnerImpl.this.runningStep, JobStatus.FAILED.toString(), currPercentComplete[0], currSuccessfulEvents[0], currFailedEvents[0], FlowRunnerImpl.this.runningStep.getName() + " " + Arrays.toString(finalStepResp.stepOutput.toArray())));
                        }
                        catch (Exception ex) {
                            FlowRunnerImpl.this.logger.error("Unable to invoke FlowStatusListener: " + ex.getMessage());
                        }
                        if (FlowRunnerImpl.this.runningFlow.isStopOnError()) {
                            this.stopJobOnError(FlowRunnerImpl.this.runningJobId);
                        }
                        stepOutputs.put(stepNum, stepResp2);
                    }
                    catch (Throwable throwable) {
                        stepOutputs.put(stepNum, stepResp2);
                        if (stepResp2 != null && !stepResp2.isSuccess()) {
                            FlowRunnerImpl.this.isJobSuccess.set(false);
                        }
                        throw throwable;
                    }
                    if (stepResp2 == null || stepResp2.isSuccess()) continue;
                    FlowRunnerImpl.this.isJobSuccess.set(false);
                    continue;
                }
                if (stepResp2 == null || stepResp2.isSuccess()) continue;
                FlowRunnerImpl.this.isJobSuccess.set(false);
            }
            resp.setStepResponses(stepOutputs);
            JobStatus jobStatus2 = FlowRunnerImpl.this.isJobCancelled.get() ? (FlowRunnerImpl.this.runningFlow.isStopOnError() && FlowRunnerImpl.this.jobStoppedOnError.get() ? JobStatus.STOP_ON_ERROR : JobStatus.CANCELED) : (!FlowRunnerImpl.this.isJobSuccess.get() ? ((failedStepCount = (stepResps = stepOutputs.values()).stream().filter(stepResp -> stepResp.getStatus().contains("failed step ")).count()) == (long)stepResps.size() ? JobStatus.FAILED : JobStatus.FINISHED_WITH_ERRORS) : JobStatus.FINISHED);
            resp.setJobStatus(jobStatus2.toString());
            try {
                if (flowContext.jobOutputIsEnabled) {
                    flowContext.jobService.finishJob(this.jobId, jobStatus2.toString());
                }
            }
            catch (Exception e) {
                FlowRunnerImpl.this.logger.error("Unable to finish job with ID: " + this.jobId + "; cause: " + e.getMessage());
            }
            finally {
                JsonNode jobNode = null;
                if (flowContext.jobOutputIsEnabled) {
                    try {
                        jobNode = flowContext.jobService.getJob(this.jobId);
                    }
                    catch (Exception e) {
                        FlowRunnerImpl.this.logger.error("Unable to get job document with ID: " + this.jobId + ": cause: " + e.getMessage());
                    }
                }
                if (jobNode != null) {
                    try {
                        RunFlowResponse jobDoc = (RunFlowResponse)new ObjectMapper().treeToValue((TreeNode)jobNode.get("job"), RunFlowResponse.class);
                        FlowRunnerImpl.copyJobDataToResponse(resp, jobDoc);
                    }
                    catch (Exception e) {
                        FlowRunnerImpl.this.logger.error("Unable to copy job data to RunFlowResponse, cause: " + e.getMessage());
                    }
                } else {
                    resp.setEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
                }
                if (!FlowRunnerImpl.this.isJobSuccess.get()) {
                    try {
                        FlowRunnerImpl.this.flowStatusListeners.forEach(listener -> listener.onStatusChanged(this.jobId, FlowRunnerImpl.this.runningStep, jobStatus2.toString(), currPercentComplete[0], currSuccessfulEvents[0], currFailedEvents[0], JobStatus.FAILED.toString()));
                    }
                    catch (Exception ex) {
                        FlowRunnerImpl.this.logger.error("Unable to invoke FlowStatusListener: " + ex.getMessage());
                    }
                } else {
                    try {
                        FlowRunnerImpl.this.flowStatusListeners.forEach(listener -> listener.onStatusChanged(this.jobId, FlowRunnerImpl.this.runningStep, jobStatus2.toString(), currPercentComplete[0], currSuccessfulEvents[0], currFailedEvents[0], JobStatus.FINISHED.toString()));
                    }
                    catch (Exception ex) {
                        FlowRunnerImpl.this.logger.error("Unable to invoke FlowStatusListener: " + ex.getMessage());
                    }
                }
                FlowRunnerImpl.this.jobQueue.remove();
                if (FlowRunnerImpl.this.stepsMap.containsKey(this.jobId)) {
                    FlowRunnerImpl.this.stepsMap.get(this.jobId).clear();
                    FlowRunnerImpl.this.stepsMap.remove(this.jobId);
                }
                FlowRunnerImpl.this.flowMap.remove(this.jobId);
                FlowRunnerImpl.this.flowContextMap.remove(this.jobId);
                FlowRunnerImpl.this.flowResp.remove(FlowRunnerImpl.this.runningJobId);
                if (!FlowRunnerImpl.this.jobQueue.isEmpty()) {
                    FlowRunnerImpl.this.initializeFlow(this.stepRunnerFactory, FlowRunnerImpl.this.jobQueue.peek());
                } else {
                    FlowRunnerImpl.this.isRunning.set(false);
                    FlowRunnerImpl.this.threadPool.shutdownNow();
                    FlowRunnerImpl.this.runningFlow = null;
                }
            }
        }

        private void stopJobOnError(String jobId) {
            FlowRunnerImpl.this.jobStoppedOnError.set(true);
            FlowRunnerImpl.this.stopJob(jobId);
        }
    }
}

