/*
 * Decompiled with CFR 0.152.
 */
package org.apache.commons.pipeline.driver.control;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageContext;
import org.apache.commons.pipeline.StageDriver;
import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.driver.FaultTolerance;
import org.apache.commons.pipeline.driver.control.AbstractPrioritizableStageDriver;

public class ExecutorStageDriver
extends AbstractPrioritizableStageDriver {
    private final Log log = LogFactory.getLog(ExecutorStageDriver.class);
    private final ThreadPoolExecutor threadPoolExecutor;
    private final Executor directExecutor = new Executor(){

        public void execute(Runnable r) {
            r.run();
        }
    };
    private final CountDownLatch startSignal;
    private volatile Executor executor;
    private int maxThreads;
    private int coreThreads;
    private final Feeder feeder = new Feeder(){

        public void feed(final Object obj) {
            if (ExecutorStageDriver.this.isInState(new StageDriver.State[]{StageDriver.State.ERROR})) {
                throw new IllegalStateException("Stage " + ExecutorStageDriver.this.stage + " is in state ERROR and is hence unable to process data.");
            }
            ExecutorStageDriver.this.executor.execute(new Runnable(){

                public void run() {
                    block5: {
                        try {
                            ExecutorStageDriver.this.startSignal.await();
                            ExecutorStageDriver.this.process(obj);
                        }
                        catch (InterruptedException e) {
                            throw new Error("Assertion failure: interrupted while awaiting start signal.", e);
                        }
                        catch (StageException e) {
                            ExecutorStageDriver.this.recordProcessingException(obj, e);
                            if (ExecutorStageDriver.this.faultTolerance == FaultTolerance.NONE) {
                                ExecutorStageDriver.this.setState(StageDriver.State.ERROR);
                            }
                        }
                        catch (RuntimeException e) {
                            ExecutorStageDriver.this.recordProcessingException(obj, e);
                            if (ExecutorStageDriver.this.faultTolerance != FaultTolerance.CHECKED && ExecutorStageDriver.this.faultTolerance != FaultTolerance.NONE) break block5;
                            throw e;
                        }
                    }
                }
            });
        }
    };

    public ExecutorStageDriver(Stage stage, StageContext context, FaultTolerance faultTolerance, int coreThreads, int maxThreads) {
        super(stage, context, faultTolerance);
        this.threadPoolExecutor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
        this.threadPoolExecutor.setCorePoolSize(coreThreads);
        this.threadPoolExecutor.setMaximumPoolSize(maxThreads);
        this.threadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler(){

            public void rejectedExecution(Runnable command, ThreadPoolExecutor exec) {
                ExecutorStageDriver.this.directExecutor.execute(command);
            }
        });
        this.executor = maxThreads == 0 ? this.directExecutor : this.threadPoolExecutor;
        this.startSignal = new CountDownLatch(1);
    }

    public Feeder getFeeder() {
        return this.feeder;
    }

    public synchronized void start() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            this.setState(StageDriver.State.STARTED);
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Preprocessing stage " + this.stage + "..."));
            }
            this.stage.preprocess();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Preprocessing for stage " + this.stage + " complete."));
            }
            this.testAndSetState(StageDriver.State.STARTED, StageDriver.State.RUNNING);
            this.startSignal.countDown();
        }
    }

    public synchronized void finish() throws StageException {
        try {
            this.testAndSetState(StageDriver.State.RUNNING, StageDriver.State.STOP_REQUESTED);
            this.threadPoolExecutor.shutdown();
            this.testAndSetState(StageDriver.State.STOP_REQUESTED, StageDriver.State.STOPPED);
            while (!this.threadPoolExecutor.isShutdown() && this.currentState != StageDriver.State.ERROR) {
                this.wait();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Postprocessing stage " + this.stage + "..."));
            }
            this.stage.postprocess();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Postprocessing for stage " + this.stage + " complete."));
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Unexpectedly interrupted while awaiting thread pool shutdown.", e);
        }
        finally {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Releasing resources for stage " + this.stage + "..."));
            }
            this.stage.release();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Stage " + this.stage + " released."));
            }
            this.testAndSetState(StageDriver.State.STOPPED, StageDriver.State.FINISHED);
        }
    }

    public synchronized void increasePriority(double amount) {
        this.maxThreads = (int)((double)this.maxThreads + amount);
        this.coreThreads += (int)(amount / 1.5);
        this.threadPoolExecutor.setCorePoolSize(this.coreThreads);
        this.threadPoolExecutor.setMaximumPoolSize(this.maxThreads);
        if (this.executor == this.directExecutor || this.maxThreads > 0) {
            this.executor = this.threadPoolExecutor;
        }
    }

    public synchronized void decreasePriority(double amount) {
        this.maxThreads = amount / 1.5 > (double)this.maxThreads ? 0 : this.maxThreads - (int)(amount / 1.5);
        this.coreThreads = amount > (double)this.coreThreads ? 0 : this.coreThreads - (int)amount;
        this.threadPoolExecutor.setCorePoolSize(this.coreThreads);
        this.threadPoolExecutor.setMaximumPoolSize(this.maxThreads);
        if (this.maxThreads == 0) {
            this.executor = this.directExecutor;
        }
    }

    public double getPriority() {
        return this.maxThreads;
    }
}

