/*
 * Decompiled with CFR 0.152.
 */
package org.apache.batchee.container.impl.controller;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionReducer;
import javax.batch.operations.JobExecutionAlreadyCompleteException;
import javax.batch.operations.JobExecutionNotMostRecentException;
import javax.batch.operations.JobRestartException;
import javax.batch.operations.JobStartException;
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.Metric;
import javax.batch.runtime.StepExecution;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.impl.StepContextImpl;
import org.apache.batchee.container.impl.controller.BaseStepController;
import org.apache.batchee.container.impl.controller.PartitionedStepBuilder;
import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.jsl.CloneUtility;
import org.apache.batchee.container.proxy.InjectionReferences;
import org.apache.batchee.container.proxy.PartitionAnalyzerProxy;
import org.apache.batchee.container.proxy.PartitionMapperProxy;
import org.apache.batchee.container.proxy.PartitionReducerProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.StepListenerProxy;
import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.BatchPartitionPlan;
import org.apache.batchee.container.util.BatchPartitionWorkUnit;
import org.apache.batchee.container.util.BatchWorkUnit;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.PartitionsBuilderConfig;
import org.apache.batchee.jaxb.Analyzer;
import org.apache.batchee.jaxb.JSLJob;
import org.apache.batchee.jaxb.JSLProperties;
import org.apache.batchee.jaxb.PartitionMapper;
import org.apache.batchee.jaxb.PartitionReducer;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;

public class PartitionedStepController
extends BaseStepController {
    private static final int DEFAULT_PARTITION_INSTANCES = 1;
    private static final int DEFAULT_THREADS = 0;
    private PartitionPlan plan = null;
    private int partitions = 1;
    private int threads = 0;
    private Properties[] partitionProperties = null;
    private volatile List<BatchPartitionWorkUnit> parallelBatchWorkUnits;
    private PartitionReducerProxy partitionReducerProxy = null;
    int numPreviouslyCompleted = 0;
    private PartitionAnalyzerProxy analyzerProxy = null;
    final List<JSLJob> subJobs = new ArrayList<JSLJob>();
    protected List<StepListenerProxy> stepListeners = null;
    List<BatchPartitionWorkUnit> completedWork = new ArrayList<BatchPartitionWorkUnit>();
    BlockingQueue<BatchPartitionWorkUnit> completedWorkQueue = null;
    private final BatchArtifactFactory factory;

    protected PartitionedStepController(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, ServicesManager servicesManager) {
        super(jobExecutionImpl, step, stepContext, rootJobExecutionId, servicesManager);
        this.factory = servicesManager.service(BatchArtifactFactory.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        this.updateBatchStatus(BatchStatus.STOPPING);
        List<JSLJob> list = this.subJobs;
        synchronized (list) {
            if (this.parallelBatchWorkUnits != null) {
                for (BatchWorkUnit batchWorkUnit : this.parallelBatchWorkUnits) {
                    try {
                        this.kernelService.stopJob(batchWorkUnit.getJobExecutionImpl().getExecutionId());
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                }
            }
        }
    }

    private PartitionPlan generatePartitionPlan() {
        BatchPartitionPlan plan = null;
        Integer previousNumPartitions = null;
        PartitionMapper partitionMapper = this.step.getPartition().getMapper();
        if (this.stepStatus.getNumPartitions() != null) {
            previousNumPartitions = this.stepStatus.getNumPartitions();
        }
        if (partitionMapper != null) {
            List<Property> propertyList = partitionMapper.getProperties() == null ? null : partitionMapper.getProperties().getPropertyList();
            InjectionReferences injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, propertyList);
            PartitionMapperProxy partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(this.factory, partitionMapper.getRef(), injectionRef, this.stepContext, this.jobExecutionImpl);
            PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
            plan = new BatchPartitionPlan();
            plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
            if (mapperPlan.getPartitionsOverride() || previousNumPartitions == null) {
                plan.setPartitions(mapperPlan.getPartitions());
            } else {
                plan.setPartitions(previousNumPartitions);
            }
            if (mapperPlan.getThreads() == 0) {
                plan.setThreads(plan.getPartitions());
            } else {
                plan.setThreads(mapperPlan.getThreads());
            }
            plan.setPartitionProperties(mapperPlan.getPartitionProperties());
        } else if (this.step.getPartition().getPlan() != null) {
            int numThreads;
            String threadsAttr;
            String partitionsAttr = this.step.getPartition().getPlan().getPartitions();
            int numPartitions = Integer.MIN_VALUE;
            Properties[] partitionProps = null;
            if (partitionsAttr != null) {
                try {
                    numPartitions = Integer.parseInt(partitionsAttr);
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + this.step.getId() + ", with instances=" + partitionsAttr, e);
                }
                partitionProps = new Properties[numPartitions];
                if (numPartitions < 1) {
                    throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + this.step.getId() + ", with instances=" + partitionsAttr);
                }
            }
            if ((threadsAttr = this.step.getPartition().getPlan().getThreads()) != null) {
                try {
                    numThreads = Integer.parseInt(threadsAttr);
                    if (numThreads == 0) {
                        numThreads = numPartitions;
                    }
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + this.step.getId() + ", with threads=" + threadsAttr, e);
                }
                if (numThreads < 0) {
                    throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + this.step.getId() + ", with threads=" + threadsAttr);
                }
            } else {
                numThreads = numPartitions;
            }
            if (this.step.getPartition().getPlan().getProperties() != null) {
                List<JSLProperties> jslProperties = this.step.getPartition().getPlan().getProperties();
                for (JSLProperties props : jslProperties) {
                    int targetPartition = Integer.parseInt(props.getPartition());
                    try {
                        partitionProps[targetPartition] = CloneUtility.jslPropertiesToJavaProperties(props);
                    }
                    catch (ArrayIndexOutOfBoundsException e) {
                        throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are " + jslProperties.size() + " partition properties lists defined. Remember that partition indexing is 0 based like Java arrays.", e);
                    }
                }
            }
            plan = new BatchPartitionPlan();
            plan.setPartitions(numPartitions);
            plan.setThreads(numThreads);
            plan.setPartitionProperties(partitionProps);
            plan.setPartitionsOverride(false);
        }
        this.partitions = plan.getPartitions();
        this.threads = plan.getThreads();
        this.partitionProperties = plan.getPartitionProperties();
        return plan;
    }

    @Override
    protected void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
        this.plan = this.generatePartitionPlan();
        this.stepStatus.setNumPartitions(this.plan.getPartitions());
        if (this.plan.getPartitionsOverride() && this.partitionReducerProxy != null) {
            this.partitionReducerProxy.rollbackPartitionedStep();
        }
        if (this.analyzerProxy != null) {
            this.analyzerStatusQueue = new LinkedBlockingQueue();
        }
        this.completedWorkQueue = new LinkedBlockingQueue<BatchPartitionWorkUnit>();
        this.buildSubJobBatchWorkUnits();
        this.executeAndWaitForCompletion();
        this.checkCompletedWork();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void buildSubJobBatchWorkUnits() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
        List<JSLJob> list = this.subJobs;
        synchronized (list) {
            if (this.jobExecutionImpl.getJobContext().getBatchStatus().equals((Object)BatchStatus.STOPPING)) {
                return;
            }
            for (int instance = 0; instance < this.partitions; ++instance) {
                this.subJobs.add(PartitionedStepBuilder.buildPartitionSubJob(this.jobExecutionImpl.getInstanceId(), this.jobExecutionImpl.getJobContext(), this.step, instance));
            }
            PartitionsBuilderConfig config = new PartitionsBuilderConfig(this.subJobs, this.partitionProperties, this.analyzerStatusQueue, this.completedWorkQueue, this.jobExecutionImpl.getExecutionId());
            this.parallelBatchWorkUnits = this.stepStatus.getStartCount() > 1 && !this.plan.getPartitionsOverride() ? this.kernelService.buildOnRestartParallelPartitions(config) : this.kernelService.buildNewParallelPartitions(config, this.jobExecutionImpl.getJobContext(), this.stepContext);
        }
    }

    private void executeAndWaitForCompletion() throws JobRestartException {
        if (this.jobExecutionImpl.getJobContext().getBatchStatus().equals((Object)BatchStatus.STOPPING)) {
            return;
        }
        int numTotalForThisExecution = this.parallelBatchWorkUnits.size();
        this.numPreviouslyCompleted = this.partitions - numTotalForThisExecution;
        int numCurrentCompleted = 0;
        int numCurrentSubmitted = 0;
        int i = 0;
        while (i < this.threads && i < numTotalForThisExecution) {
            BatchWorkUnit workUnit = this.parallelBatchWorkUnits.get(i);
            if (this.stepStatus.getStartCount() > 1 && !this.plan.getPartitionsOverride()) {
                this.kernelService.restartGeneratedJob(workUnit);
            } else {
                this.kernelService.startGeneratedJob(workUnit);
            }
            ++i;
            ++numCurrentSubmitted;
        }
        while (true) {
            block11: {
                try {
                    while (this.analyzerProxy != null) {
                        PartitionDataWrapper dataWrapper = (PartitionDataWrapper)this.analyzerStatusQueue.take();
                        if (PartitionDataWrapper.PartitionEventType.ANALYZE_COLLECTOR_DATA.equals((Object)dataWrapper.getEventType())) {
                            this.analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
                            continue;
                        }
                        if (PartitionDataWrapper.PartitionEventType.ANALYZE_STATUS.equals((Object)dataWrapper.getEventType())) {
                            this.analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
                            this.completedWork.add(this.completedWorkQueue.take());
                            break block11;
                        }
                        throw new IllegalStateException("Invalid partition state");
                    }
                    this.completedWork.add(this.completedWorkQueue.take());
                }
                catch (InterruptedException e) {
                    throw new BatchContainerRuntimeException(e);
                }
            }
            if (++numCurrentCompleted >= numTotalForThisExecution) break;
            if (numCurrentSubmitted >= numTotalForThisExecution) continue;
            if (this.stepStatus.getStartCount() > 1) {
                this.kernelService.startGeneratedJob(this.parallelBatchWorkUnits.get(numCurrentSubmitted++));
                continue;
            }
            this.kernelService.restartGeneratedJob(this.parallelBatchWorkUnits.get(numCurrentSubmitted++));
        }
    }

    private void checkCompletedWork() {
        boolean rollback = false;
        for (BatchWorkUnit batchWorkUnit : this.completedWork) {
            BatchStatus batchStatus;
            List<StepExecution> steps = this.persistenceManagerService.getStepExecutionsForJobExecution(batchWorkUnit.getJobExecutionImpl().getExecutionId());
            if (steps.size() == 1) {
                for (Metric metric : steps.iterator().next().getMetrics()) {
                    this.stepContext.getMetric(metric.getType()).incValueBy(metric.getValue());
                }
            }
            if (!(batchStatus = batchWorkUnit.getJobExecutionImpl().getJobContext().getBatchStatus()).equals((Object)BatchStatus.FAILED)) continue;
            rollback = true;
            this.stepContext.setBatchStatus(BatchStatus.FAILED);
        }
        if (rollback) {
            if (this.partitionReducerProxy != null) {
                this.partitionReducerProxy.rollbackPartitionedStep();
            }
            throw new BatchContainerRuntimeException("One or more partitions failed");
        }
        if (this.partitionReducerProxy != null) {
            this.partitionReducerProxy.beforePartitionedStepCompletion();
        }
    }

    @Override
    protected void setupStepArtifacts() {
        PartitionReducer partitionReducer;
        InjectionReferences injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, null);
        this.stepListeners = this.jobExecutionImpl.getListenerFactory().getStepListeners(this.step, injectionRef, this.stepContext, this.jobExecutionImpl);
        Analyzer analyzer = this.step.getPartition().getAnalyzer();
        if (analyzer != null) {
            List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
            injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, propList);
            this.analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(this.factory, analyzer.getRef(), injectionRef, this.stepContext, this.jobExecutionImpl);
        }
        if ((partitionReducer = this.step.getPartition().getReducer()) != null) {
            List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
            injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, propList);
            this.partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(this.factory, partitionReducer.getRef(), injectionRef, this.stepContext, this.jobExecutionImpl);
        }
    }

    @Override
    protected void invokePreStepArtifacts() {
        if (this.stepListeners != null) {
            for (StepListenerProxy listenerProxy : this.stepListeners) {
                listenerProxy.beforeStep();
            }
        }
        if (this.partitionReducerProxy != null) {
            this.partitionReducerProxy.beginPartitionedStep();
        }
    }

    @Override
    protected void invokePostStepArtifacts() {
        if (this.partitionReducerProxy != null) {
            if (BatchStatus.COMPLETED.equals((Object)this.stepContext.getBatchStatus())) {
                this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionReducer.PartitionStatus.COMMIT);
            } else {
                this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionReducer.PartitionStatus.ROLLBACK);
            }
        }
        if (this.stepListeners != null) {
            for (StepListenerProxy listenerProxy : this.stepListeners) {
                listenerProxy.afterStep();
            }
        }
    }

    @Override
    protected void sendStatusFromPartitionToAnalyzerIfPresent() {
    }
}

