/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.hub.step.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.Batcher;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.ext.helper.LoggingObject;
import com.marklogic.hub.DatabaseKind;
import com.marklogic.hub.HubClient;
import com.marklogic.hub.dataservices.StepRunnerService;
import com.marklogic.hub.dataservices.StepService;
import com.marklogic.hub.error.DataHubConfigurationException;
import com.marklogic.hub.flow.Flow;
import com.marklogic.hub.step.ResponseHolder;
import com.marklogic.hub.step.RunStepResponse;
import com.marklogic.hub.step.StepDefinition;
import com.marklogic.hub.step.StepItemCompleteListener;
import com.marklogic.hub.step.StepItemFailureListener;
import com.marklogic.hub.step.StepRunner;
import com.marklogic.hub.step.StepStatusListener;
import com.marklogic.hub.step.impl.SourceQueryCollector;
import com.marklogic.hub.step.impl.StepMetrics;
import com.marklogic.hub.step.impl.StepRunnerUtil;
import com.marklogic.hub.util.DiskQueue;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

public class ScriptStepRunner
extends LoggingObject
implements StepRunner {
    private static final int MAX_ERROR_MESSAGES = 10;
    private Flow flow;
    private int batchSize;
    private int threadCount;
    private Map<String, Object> combinedOptions;
    private int previousPercentComplete;
    private boolean stopOnFailure = false;
    private String jobId;
    private boolean isFullOutput = false;
    private String step = "1";
    private final List<StepItemCompleteListener> stepItemCompleteListeners = new ArrayList<StepItemCompleteListener>();
    private final List<StepItemFailureListener> stepItemFailureListeners = new ArrayList<StepItemFailureListener>();
    private final List<StepStatusListener> stepStatusListeners = new ArrayList<StepStatusListener>();
    private Map<String, Object> stepConfig = new HashMap<String, Object>();
    private final HubClient hubClient;
    private Thread runningThread = null;
    private DataMovementManager dataMovementManager = null;
    private QueryBatcher queryBatcher = null;
    private final AtomicBoolean isStopped = new AtomicBoolean(false);
    private StepDefinition stepDef;

    public ScriptStepRunner(HubClient hubClient) {
        this.hubClient = hubClient;
    }

    @Override
    public StepRunner withFlow(Flow flow) {
        this.flow = flow;
        return this;
    }

    @Override
    public StepRunner withStep(String step) {
        this.step = step;
        return this;
    }

    @Override
    public StepRunner withJobId(String jobId) {
        this.jobId = jobId;
        return this;
    }

    public StepRunner withStepDefinition(StepDefinition stepDefinition) {
        this.stepDef = stepDefinition;
        return this;
    }

    @Override
    public StepRunner withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }

    @Override
    public StepRunner withThreadCount(int threadCount) {
        this.threadCount = threadCount;
        return this;
    }

    @Override
    public StepRunner withStopOnFailure(boolean stopOnFailure) {
        this.stopOnFailure = stopOnFailure;
        return this;
    }

    @Override
    public StepRunner withRuntimeOptions(Map<String, Object> runtimeOptions) {
        if (this.flow == null) {
            throw new DataHubConfigurationException("Flow has to be set before setting options");
        }
        this.combinedOptions = StepRunnerUtil.makeCombinedOptions(this.flow, this.stepDef, this.step, runtimeOptions);
        return this;
    }

    @Override
    public StepRunner withStepConfig(Map<String, Object> stepConfig) {
        this.stepConfig = stepConfig;
        return this;
    }

    @Override
    public StepRunner onItemComplete(StepItemCompleteListener listener) {
        this.stepItemCompleteListeners.add(listener);
        return this;
    }

    @Override
    public StepRunner onItemFailed(StepItemFailureListener listener) {
        this.stepItemFailureListeners.add(listener);
        return this;
    }

    @Override
    public StepRunner onStatusChanged(StepStatusListener listener) {
        this.stepStatusListeners.add(listener);
        return this;
    }

    @Override
    public void awaitCompletion() {
        try {
            this.awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        }
        catch (InterruptedException | TimeoutException exception) {
            // empty catch block
        }
    }

    @Override
    public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        if (this.runningThread != null) {
            this.runningThread.join(unit.convert(timeout, unit));
            if (this.runningThread.getState() != Thread.State.TERMINATED) {
                if (this.dataMovementManager != null && this.queryBatcher != null) {
                    this.dataMovementManager.stopJob((Batcher)this.queryBatcher);
                }
                this.runningThread.interrupt();
                throw new TimeoutException("Timeout occurred after " + timeout + " " + (Object)((Object)unit));
            }
        }
    }

    private boolean jobOutputIsEnabled() {
        if (this.combinedOptions != null && this.combinedOptions.containsKey("disableJobOutput")) {
            return !Boolean.parseBoolean(this.combinedOptions.get("disableJobOutput").toString());
        }
        return true;
    }

    @Override
    public RunStepResponse run() {
        DiskQueue<String> uris;
        this.runningThread = null;
        if (this.stepConfig.get("batchSize") != null) {
            this.batchSize = (Integer)this.stepConfig.get("batchSize");
        }
        if (this.stepConfig.get("threadCount") != null) {
            this.threadCount = (Integer)this.stepConfig.get("threadCount");
        }
        if (this.stepConfig.get("stopOnFailure") != null) {
            this.withStopOnFailure(Boolean.parseBoolean(this.stepConfig.get("stopOnFailure").toString()));
        }
        RunStepResponse runStepResponse = StepRunnerUtil.createStepResponse(this.flow, this.step, this.jobId);
        if (this.combinedOptions == null) {
            this.combinedOptions = new HashMap<String, Object>();
        } else if (this.combinedOptions.get("fullOutput") != null) {
            this.isFullOutput = Boolean.parseBoolean(this.combinedOptions.get("fullOutput").toString());
        }
        this.combinedOptions.put("flow", this.flow.getName());
        this.combinedOptions.put("jobId", this.jobId);
        runStepResponse.setStepStartTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
        if (this.jobOutputIsEnabled()) {
            StepService.on(this.hubClient.getJobsClient()).startStep(this.jobId, this.step, this.flow.getName(), new ObjectMapper().valueToTree(this.combinedOptions));
        }
        try {
            String sourceDatabase = this.combinedOptions.get("sourceDatabase") != null ? StepRunnerUtil.objectToString(this.combinedOptions.get("sourceDatabase")) : this.hubClient.getDbName(DatabaseKind.STAGING);
            this.logger.info(String.format("Collecting items for step '%s' in flow '%s'", this.step, this.flow.getName()));
            uris = this.runCollector(sourceDatabase);
        }
        catch (Exception e) {
            runStepResponse.setCounts(0L, 0L, 0L, 0L, 0L).withStatus("failed step " + this.step);
            StringWriter errors = new StringWriter();
            e.printStackTrace(new PrintWriter(errors));
            runStepResponse.withStepOutput(errors.toString());
            if (this.jobOutputIsEnabled()) {
                JsonNode jobDoc = StepService.on(this.hubClient.getJobsClient()).finishStep(this.jobId, this.step, "failed step " + this.step, (JsonNode)runStepResponse.toObjectNode());
                try {
                    return StepRunnerUtil.getResponse(jobDoc, this.step);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            runStepResponse.setStepEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            return runStepResponse;
        }
        return this.runHarmonizer(runStepResponse, uris);
    }

    @Override
    public void stop() {
        this.isStopped.set(true);
        if (this.queryBatcher != null) {
            this.dataMovementManager.stopJob((Batcher)this.queryBatcher);
        }
    }

    @Override
    public RunStepResponse run(Collection<String> uris) {
        this.runningThread = null;
        if (this.jobOutputIsEnabled()) {
            StepService.on(this.hubClient.getJobsClient()).startStep(this.jobId, this.step, this.flow.getName(), new ObjectMapper().valueToTree(this.combinedOptions));
        }
        RunStepResponse runStepResponse = StepRunnerUtil.createStepResponse(this.flow, this.step, this.jobId);
        return this.runHarmonizer(runStepResponse, uris);
    }

    @Override
    public int getBatchSize() {
        return this.batchSize;
    }

    private DiskQueue<String> runCollector(String sourceDatabase) {
        SourceQueryCollector collector = new SourceQueryCollector(this.hubClient, sourceDatabase);
        this.stepStatusListeners.forEach(listener -> listener.onStatusChange(this.jobId, 0, "running step " + this.step, 0L, 0L, "running collector"));
        return !this.isStopped.get() ? collector.run(this.flow.getName(), this.step, this.combinedOptions) : null;
    }

    private RunStepResponse runHarmonizer(RunStepResponse runStepResponse, Collection<String> uris) {
        StepMetrics stepMetrics = new StepMetrics();
        int urisCount = uris != null ? uris.size() : 0;
        this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), 0, "running step " + this.step, 0L, 0L, "starting step execution"));
        if (urisCount == 0) {
            this.logger.info("No items found to process");
            String stepStatus = this.isStopped.get() ? "canceled step " + this.step : "completed step " + this.step;
            this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), 100, stepStatus, 0L, 0L, stepStatus.contains("completed step ") ? "collector returned 0 items" : "job was stopped"));
            runStepResponse.setCounts(0L, 0L, 0L, 0L, 0L);
            runStepResponse.withStatus(stepStatus);
            if (this.jobOutputIsEnabled()) {
                JsonNode jobDoc = StepService.on(this.hubClient.getJobsClient()).finishStep(this.jobId, this.step, stepStatus, (JsonNode)runStepResponse.toObjectNode());
                try {
                    return StepRunnerUtil.getResponse(jobDoc, this.step);
                }
                catch (Exception ex) {
                    this.logger.warn("Unexpected error getting step response: " + ex.getMessage(), (Throwable)ex);
                    return runStepResponse;
                }
            }
            runStepResponse.setStepEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            return runStepResponse;
        }
        double batchCount = Math.ceil((double)urisCount / (double)this.batchSize);
        if (batchCount == 1.0) {
            this.logger.info(this.format("Count of items collected: %d; will be processed in a single batch based on batchSize of %d", new Object[]{urisCount, this.batchSize}));
        } else {
            this.logger.info(this.format("Count of items collected: %d; will be processed in %d batches based on batchSize of %d", new Object[]{urisCount, (int)batchCount, this.batchSize}));
        }
        Vector errorMessages = new Vector();
        String finalDatabaseName = this.hubClient.getDbName(DatabaseKind.FINAL);
        String stagingDatabaseName = this.hubClient.getDbName(DatabaseKind.STAGING);
        String sourceDatabase = Optional.ofNullable((String)this.combinedOptions.get("sourceDatabase")).orElse(stagingDatabaseName);
        DatabaseClient executeClient = sourceDatabase.equals(finalDatabaseName) ? this.hubClient.getFinalClient() : (sourceDatabase.equals(stagingDatabaseName) ? this.hubClient.getStagingClient() : this.hubClient.getStagingClient(sourceDatabase));
        this.dataMovementManager = executeClient.newDataMovementManager();
        ObjectMapper objectMapper = new ObjectMapper();
        HashMap<String, JobTicket> ticketWrapper = new HashMap<String, JobTicket>();
        HashMap fullOutputMap = new HashMap();
        this.queryBatcher = this.dataMovementManager.newQueryBatcher(uris.iterator()).withBatchSize(this.batchSize).withThreadCount(this.threadCount).withJobId(runStepResponse.getJobId()).onUrisReady(batch -> {
            block15: {
                try {
                    JobTicket jobTicket;
                    ObjectNode inputs = objectMapper.createObjectNode();
                    inputs.put("flowName", this.flow.getName());
                    inputs.put("stepNumber", this.step);
                    inputs.put("jobId", runStepResponse.getJobId());
                    HashMap<String, Object> batchOptions = new HashMap<String, Object>(this.combinedOptions);
                    batchOptions.put("uris", batch.getItems());
                    inputs.set("options", objectMapper.valueToTree(batchOptions));
                    this.logger.debug(String.format("Processing %d items in batch %d of %d", ((String[])batch.getItems()).length, batch.getJobBatchNumber(), (int)batchCount));
                    StepRunnerService stepRunner = StepRunnerService.on(batch.getClient());
                    JsonNode jsonResponse = stepRunner.processBatch(stepRunner.newSessionState(), (JsonNode)inputs);
                    ResponseHolder response = (ResponseHolder)objectMapper.readerFor(ResponseHolder.class).readValue(jsonResponse);
                    stepMetrics.getDocumentWritten().addAndGet(response.documentWritten - response.errorCount < 0L ? 0L : response.documentWritten - response.errorCount);
                    stepMetrics.getFailedEvents().addAndGet(response.errorCount);
                    stepMetrics.getSuccessfulEvents().addAndGet(response.totalCount - response.errorCount);
                    if (response.errors != null && errorMessages.size() < 10) {
                        errorMessages.addAll(response.errors.stream().limit(10 - errorMessages.size()).map(StepRunnerUtil::jsonToString).collect(Collectors.toList()));
                    }
                    if (this.isFullOutput && response.documents != null) {
                        try {
                            for (JsonNode node : response.documents) {
                                if (!node.has("uri")) continue;
                                fullOutputMap.put(node.get("uri").asText(), node);
                            }
                        }
                        catch (Exception ex) {
                            this.logger.warn("Unable to add written documents to fullOutput map in RunStepResponse; cause: " + ex.getMessage());
                        }
                    }
                    if (response.errorCount < 1L) {
                        stepMetrics.getSuccessfulBatches().addAndGet(1L);
                    } else {
                        stepMetrics.getFailedBatches().addAndGet(1L);
                    }
                    int percentComplete = (int)((double)stepMetrics.getSuccessfulBatchesCount() / batchCount * 100.0);
                    if (percentComplete != this.previousPercentComplete && percentComplete % 5 == 0) {
                        this.previousPercentComplete = percentComplete;
                        this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), percentComplete, "running step " + this.step, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), ""));
                    }
                    if (!this.stepItemCompleteListeners.isEmpty()) {
                        response.completedItems.forEach(item -> this.stepItemCompleteListeners.forEach(listener -> listener.processCompletion(runStepResponse.getJobId(), (String)item)));
                    }
                    if (!this.stepItemFailureListeners.isEmpty()) {
                        response.failedItems.forEach(item -> this.stepItemFailureListeners.forEach(listener -> listener.processFailure(runStepResponse.getJobId(), (String)item)));
                    }
                    if (this.stopOnFailure && response.errorCount > 0L && (jobTicket = (JobTicket)ticketWrapper.get("jobTicket")) != null) {
                        this.dataMovementManager.stopJob(jobTicket);
                    }
                }
                catch (Exception e) {
                    if (errorMessages.size() < 10) {
                        errorMessages.add(e.toString());
                    }
                    stepMetrics.getFailedBatches().addAndGet(1L);
                    stepMetrics.getFailedEvents().addAndGet(((String[])batch.getItems()).length);
                    if (this.flow == null || !this.flow.isStopOnError()) break block15;
                    JobTicket jobTicket = (JobTicket)ticketWrapper.get("jobTicket");
                    if (jobTicket != null) {
                        this.dataMovementManager.stopJob(jobTicket);
                    }
                    this.stepItemFailureListeners.forEach(listener -> listener.processFailure(runStepResponse.getJobId(), null));
                }
            }
        }).onQueryFailure(failure -> {
            stepMetrics.getFailedBatches().addAndGet(1L);
            stepMetrics.getFailedEvents().addAndGet(this.batchSize);
        });
        if (!this.isStopped.get()) {
            this.logger.info(String.format("Starting processing of items for step '%s' in flow '%s'", this.step, this.flow.getName()));
            JobTicket jobTicket = this.dataMovementManager.startJob(this.queryBatcher);
            ticketWrapper.put("jobTicket", jobTicket);
        }
        this.runningThread = new Thread(() -> {
            this.queryBatcher.awaitCompletion();
            this.logger.info(String.format("Finished processing of items for step '%s' in flow '%s'", this.step, this.flow.getName()));
            if (uris instanceof DiskQueue) {
                ((DiskQueue)uris).close();
            }
            String stepStatus = this.determineStepStatus(stepMetrics);
            this.stepStatusListeners.forEach(listener -> listener.onStatusChange(runStepResponse.getJobId(), 100, stepStatus, stepMetrics.getSuccessfulEventsCount(), stepMetrics.getFailedEventsCount(), ""));
            this.dataMovementManager.stopJob((Batcher)this.queryBatcher);
            runStepResponse.setCounts(urisCount, stepMetrics.documentWritten.get(), stepMetrics.getFailedEventsCount(), stepMetrics.getSuccessfulBatchesCount(), stepMetrics.getFailedBatchesCount());
            runStepResponse.withStatus(stepStatus);
            runStepResponse.setDocumentWritten(stepMetrics.documentWritten.get());
            if (!errorMessages.isEmpty()) {
                runStepResponse.withStepOutput(errorMessages);
            }
            if (this.isFullOutput) {
                runStepResponse.withFullOutput(fullOutputMap);
            }
            if (this.jobOutputIsEnabled()) {
                JsonNode jobDoc = null;
                try {
                    jobDoc = StepService.on(this.hubClient.getJobsClient()).finishStep(this.jobId, this.step, stepStatus, (JsonNode)runStepResponse.toObjectNode());
                }
                catch (Exception e) {
                    this.logger.error(e.getMessage());
                }
                if (jobDoc != null) {
                    try {
                        RunStepResponse tempResp = StepRunnerUtil.getResponse(jobDoc, this.step);
                        runStepResponse.setStepStartTime(tempResp.getStepStartTime());
                        runStepResponse.setStepEndTime(tempResp.getStepEndTime());
                    }
                    catch (Exception ex) {
                        this.logger.error(ex.getMessage());
                    }
                }
            } else {
                runStepResponse.setStepEndTime(StepRunnerUtil.getCurrentTimeWithTimeZone());
            }
        });
        this.runningThread.start();
        return runStepResponse;
    }

    private String determineStepStatus(StepMetrics stepMetrics) {
        if (stepMetrics.getFailedEventsCount() > 0L && this.stopOnFailure) {
            return "stop on error in step " + this.step;
        }
        if (this.isStopped.get()) {
            return "canceled step " + this.step;
        }
        if (stepMetrics.getFailedEventsCount() > 0L && stepMetrics.getSuccessfulEventsCount() > 0L) {
            return "completed with errors step " + this.step;
        }
        if (stepMetrics.getFailedEventsCount() == 0L) {
            return "completed step " + this.step;
        }
        return "failed step " + this.step;
    }
}

