/*
 * Decompiled with CFR 0.152.
 */
package org.apache.commons.pipeline.driver.control;

import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageContext;
import org.apache.commons.pipeline.StageDriver;
import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.driver.FaultTolerance;
import org.apache.commons.pipeline.driver.control.AbstractPrioritizableStageDriver;
import org.apache.commons.pipeline.util.BlockingQueueFactory;

public class BalancedPoolStageDriver
extends AbstractPrioritizableStageDriver {
    private Log log = LogFactory.getLog(BalancedPoolStageDriver.class);
    private final CountDownLatch startSignal;
    private int nextWorkerId = 0;
    private final Queue<BalancedWorker> workers = new ConcurrentLinkedQueue<BalancedWorker>();
    private int initialThreads;
    private long timeout;
    private TimeUnit timeoutTimeUnit;
    private final SwitchingFeeder feeder;

    public BalancedPoolStageDriver(Stage stage, StageContext context, BlockingQueueFactory queueFactory, int initialThreads, FaultTolerance faultTolerance, long timeout, TimeUnit timeoutTimeUnit) {
        super(stage, context, faultTolerance);
        this.feeder = new SwitchingFeeder(queueFactory.createQueue());
        this.startSignal = new CountDownLatch(1);
        this.initialThreads = initialThreads;
        this.timeout = timeout;
        this.timeoutTimeUnit = timeoutTimeUnit;
    }

    public Feeder getFeeder() {
        return this.feeder;
    }

    public void start() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            this.setState(StageDriver.State.STARTED);
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Preprocessing stage " + this.stage + "..."));
            }
            this.stage.preprocess();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Preprocessing for stage " + this.stage + " complete."));
            }
        } else {
            throw new IllegalStateException("Attempt to start driver in state " + (Object)((Object)this.currentState));
        }
        this.log.debug((Object)("Starting worker threads for stage " + this.stage + "."));
        this.addWorkers(this.initialThreads);
        this.testAndSetState(StageDriver.State.STARTED, StageDriver.State.RUNNING);
        this.startSignal.countDown();
        this.log.debug((Object)("Worker threads for stage " + this.stage + " started."));
    }

    public void finish() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            throw new IllegalStateException("The driver is not currently running.");
        }
        try {
            while (this.currentState != StageDriver.State.RUNNING && this.currentState != StageDriver.State.ERROR) {
                this.wait(this.timeout);
            }
            this.testAndSetState(StageDriver.State.RUNNING, StageDriver.State.STOP_REQUESTED);
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Waiting for worker threads to stop for stage " + this.stage + "."));
            }
            while (!this.workers.isEmpty()) {
                BalancedWorker worker = this.workers.remove();
                worker.awaitCompletion();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Worker threads for stage " + this.stage + " halted"));
            }
            this.testAndSetState(StageDriver.State.STOP_REQUESTED, StageDriver.State.FINISHED);
            if (this.currentState != StageDriver.State.ERROR) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)("Postprocessing stage " + this.stage + "..."));
                }
                this.stage.postprocess();
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)("Postprocessing for stage " + this.stage + " complete."));
                }
            }
        }
        catch (StageException e) {
            this.log.error((Object)("An error occurred during postprocessing of stage " + this.stage), (Throwable)e);
            this.recordFatalError(e);
            this.setState(StageDriver.State.ERROR);
        }
        catch (InterruptedException e) {
            throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
        }
        finally {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Releasing resources for stage " + this.stage + "..."));
            }
            this.stage.release();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Stage " + this.stage + " released."));
            }
        }
        this.testAndSetState(StageDriver.State.FINISHED, StageDriver.State.STOPPED);
    }

    private RuntimeException fatalError(Throwable t) {
        try {
            this.setState(StageDriver.State.ERROR);
            this.recordFatalError(t);
            this.stage.release();
            this.notifyAll();
        }
        catch (Exception e) {
            this.recordFatalError(e);
        }
        return new RuntimeException("Fatal error halted processing of stage: " + this.stage);
    }

    private synchronized void addWorkers(int count) {
        while (count-- > 0) {
            BalancedWorker worker = new BalancedWorker(this.nextWorkerId, this.feeder.queue);
            Thread workerThread = new Thread(worker);
            this.workers.add(worker);
            ++this.nextWorkerId;
            workerThread.start();
        }
    }

    private synchronized void removeWorkers(int count) throws InterruptedException {
        while (count-- > 0 && !this.workers.isEmpty()) {
            BalancedWorker worker;
            if (this.workers.size() > 1) {
                worker = this.workers.remove();
                worker.deactivate(false);
                worker.awaitCompletion();
                continue;
            }
            worker = this.workers.peek();
            worker.deactivate(true);
        }
    }

    public void increasePriority(double amount) {
        this.addWorkers((int)amount);
    }

    public void decreasePriority(double amount) {
        try {
            this.removeWorkers((int)amount);
        }
        catch (InterruptedException e) {
            throw new Error("Assertion failure: interrupted while awaiting worker thread stop for pool size reduction.", e);
        }
    }

    public double getPriority() {
        return this.getWorkerCount();
    }

    public synchronized int getWorkerCount() {
        return this.workers.size();
    }

    private class BalancedWorker
    implements Runnable {
        private volatile Runnability runnability = Runnability.RUNNABLE;
        private final int workerId;
        private final BlockingQueue queue;
        private final CountDownLatch doneSignal;

        public BalancedWorker(int workerId, BlockingQueue queue) {
            this.workerId = workerId;
            this.queue = queue;
            this.doneSignal = new CountDownLatch(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void run() {
            try {
                BalancedPoolStageDriver.this.startSignal.await();
                while (this.runnability != Runnability.NOT_RUNNABLE && BalancedPoolStageDriver.this.currentState != StageDriver.State.ERROR) {
                    try {
                        Object obj = this.queue.poll(BalancedPoolStageDriver.this.timeout, TimeUnit.MILLISECONDS);
                        if (obj == null) {
                            if (BalancedPoolStageDriver.this.currentState == StageDriver.State.STOP_REQUESTED) return;
                            if (this.runnability != Runnability.STOPPABLE) continue;
                            return;
                        }
                        try {
                            if (BalancedPoolStageDriver.this.log.isDebugEnabled()) {
                                BalancedPoolStageDriver.this.log.debug((Object)(BalancedPoolStageDriver.this.stage + ": processing asynchronously: " + obj));
                            }
                            BalancedPoolStageDriver.this.process(obj);
                        }
                        catch (StageException e) {
                            BalancedPoolStageDriver.this.recordProcessingException(obj, e);
                            if (BalancedPoolStageDriver.this.faultTolerance != FaultTolerance.NONE) continue;
                            throw e;
                        }
                        catch (RuntimeException e) {
                            BalancedPoolStageDriver.this.recordProcessingException(obj, e);
                            if (BalancedPoolStageDriver.this.faultTolerance != FaultTolerance.CHECKED && BalancedPoolStageDriver.this.faultTolerance != FaultTolerance.NONE) continue;
                            throw e;
                        }
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Worker thread " + this.workerId + " unexpectedly interrupted while waiting on data for stage " + BalancedPoolStageDriver.this.stage, e);
                        return;
                    }
                }
            }
            catch (StageException e) {
                BalancedPoolStageDriver.this.log.error((Object)("An error occurred in the stage " + BalancedPoolStageDriver.this.stage + " (workerID: " + this.workerId + ")"), (Throwable)e);
                BalancedPoolStageDriver.this.recordFatalError(e);
                BalancedPoolStageDriver.this.setState(StageDriver.State.ERROR);
                return;
            }
            catch (InterruptedException e) {
                BalancedPoolStageDriver.this.log.error((Object)("Stage " + BalancedPoolStageDriver.this.stage + " (workerId: " + this.workerId + ") interrupted while waiting for barrier"), (Throwable)e);
                BalancedPoolStageDriver.this.recordFatalError(e);
                BalancedPoolStageDriver.this.setState(StageDriver.State.ERROR);
                return;
            }
            finally {
                this.doneSignal.countDown();
            }
        }

        public void deactivate(boolean waitForQueue) {
            this.runnability = waitForQueue ? Runnability.STOPPABLE : Runnability.NOT_RUNNABLE;
        }

        public void awaitCompletion() throws InterruptedException {
            this.doneSignal.await();
        }
    }

    private class SwitchingFeeder
    implements Feeder {
        private final BlockingQueue queue;

        public SwitchingFeeder(BlockingQueue queue) {
            this.queue = queue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void feed(Object obj) {
            block10: {
                BalancedPoolStageDriver balancedPoolStageDriver = BalancedPoolStageDriver.this;
                synchronized (balancedPoolStageDriver) {
                    if (!BalancedPoolStageDriver.this.isInState(new StageDriver.State[]{StageDriver.State.RUNNING, StageDriver.State.STOP_REQUESTED}) || BalancedPoolStageDriver.this.workers.size() > 1 || BalancedPoolStageDriver.this.workers.size() == 1 && ((BalancedWorker)BalancedPoolStageDriver.this.workers.peek()).runnability == Runnability.RUNNABLE) {
                        try {
                            if (BalancedPoolStageDriver.this.log.isDebugEnabled()) {
                                BalancedPoolStageDriver.this.log.debug((Object)(BalancedPoolStageDriver.this.stage + ": Queueing object: " + obj));
                            }
                            this.queue.put(obj);
                        }
                        catch (InterruptedException e) {
                            throw new Error("Assertion failure: thread interrupted while attempting to enqueue data object.", e);
                        }
                        return;
                    }
                }
                try {
                    if (BalancedPoolStageDriver.this.log.isDebugEnabled()) {
                        BalancedPoolStageDriver.this.log.debug((Object)(BalancedPoolStageDriver.this.stage + ":Processing object directly: " + obj));
                    }
                    BalancedPoolStageDriver.this.process(obj);
                }
                catch (StageException e) {
                    BalancedPoolStageDriver.this.recordProcessingException(obj, e);
                    if (BalancedPoolStageDriver.this.faultTolerance != FaultTolerance.NONE) break block10;
                    throw BalancedPoolStageDriver.this.fatalError(e);
                }
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static enum Runnability {
        RUNNABLE,
        STOPPABLE,
        NOT_RUNNABLE;

    }
}

