/*
 * Decompiled with CFR 0.152.
 */
package org.apache.commons.pipeline.driver;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageContext;
import org.apache.commons.pipeline.StageDriver;
import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.driver.AbstractStageDriver;
import org.apache.commons.pipeline.driver.FaultTolerance;

public class ThreadPoolStageDriver
extends AbstractStageDriver {
    private final Log log = LogFactory.getLog(ThreadPoolStageDriver.class);
    private long timeout;
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private final int numThreads;
    private final BlockingQueue queue;
    private final Feeder feeder = new Feeder(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void feed(Object obj) {
            if (ThreadPoolStageDriver.this.log.isDebugEnabled()) {
                ThreadPoolStageDriver.this.log.debug((Object)(obj + " is being fed to stage " + ThreadPoolStageDriver.this.stage + " (" + ThreadPoolStageDriver.this.queue.remainingCapacity() + " available slots in queue)"));
            }
            try {
                ThreadPoolStageDriver.this.queue.put(obj);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object " + obj + " in queue for stage " + ThreadPoolStageDriver.this.stage, e);
            }
            ThreadPoolStageDriver threadPoolStageDriver = ThreadPoolStageDriver.this;
            synchronized (threadPoolStageDriver) {
                ThreadPoolStageDriver.this.notifyAll();
            }
        }
    };
    private Thread.UncaughtExceptionHandler workerThreadExceptionHandler = new Thread.UncaughtExceptionHandler(){

        public void uncaughtException(Thread t, Throwable e) {
            ThreadPoolStageDriver.this.setState(StageDriver.State.ERROR);
            ThreadPoolStageDriver.this.recordFatalError(e);
            ThreadPoolStageDriver.this.log.error((Object)("Uncaught exception in stage " + ThreadPoolStageDriver.this.stage), e);
        }
    };

    public ThreadPoolStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance, int numThreads) {
        super(stage, context, faultTolerance);
        this.numThreads = numThreads;
        this.startSignal = new CountDownLatch(1);
        this.doneSignal = new CountDownLatch(this.numThreads);
        this.queue = queue;
        this.timeout = timeout;
    }

    public Feeder getFeeder() {
        return this.feeder;
    }

    public synchronized void start() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            this.setState(StageDriver.State.STARTED);
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Preprocessing stage " + this.stage + "..."));
            }
            this.stage.preprocess();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Preprocessing for stage " + this.stage + " complete."));
            }
            this.log.debug((Object)("Starting worker threads for stage " + this.stage + "."));
            for (int i = 0; i < this.numThreads; ++i) {
                new LatchWorkerThread(i).start();
            }
        } else {
            throw new IllegalStateException("Attempt to start driver in state " + (Object)((Object)this.currentState));
        }
        this.testAndSetState(StageDriver.State.STARTED, StageDriver.State.RUNNING);
        this.startSignal.countDown();
        this.log.debug((Object)("Worker threads for stage " + this.stage + " started."));
    }

    public synchronized void finish() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            throw new IllegalStateException("The driver is not currently running.");
        }
        try {
            while (this.currentState != StageDriver.State.RUNNING && this.currentState != StageDriver.State.ERROR) {
                this.wait();
            }
            this.testAndSetState(StageDriver.State.RUNNING, StageDriver.State.STOP_REQUESTED);
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Waiting for worker threads to stop for stage " + this.stage + "."));
            }
            this.doneSignal.await();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Worker threads for stage " + this.stage + " halted"));
            }
            this.testAndSetState(StageDriver.State.STOP_REQUESTED, StageDriver.State.FINISHED);
            if (this.currentState != StageDriver.State.ERROR) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)("Postprocessing stage " + this.stage + "..."));
                }
                this.stage.postprocess();
                if (this.log.isDebugEnabled()) {
                    this.log.debug((Object)("Postprocessing for stage " + this.stage + " complete."));
                }
            }
        }
        catch (StageException e) {
            this.log.error((Object)("An error occurred during postprocessing of stage " + this.stage), (Throwable)e);
            this.recordFatalError(e);
            this.setState(StageDriver.State.ERROR);
        }
        catch (InterruptedException e) {
            throw new StageException(this.getStage(), "StageDriver unexpectedly interrupted while waiting for shutdown of worker threads.", e);
        }
        finally {
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Releasing resources for stage " + this.stage + "..."));
            }
            this.stage.release();
            if (this.log.isDebugEnabled()) {
                this.log.debug((Object)("Stage " + this.stage + " released."));
            }
        }
        this.testAndSetState(StageDriver.State.FINISHED, StageDriver.State.STOPPED);
    }

    public int getQueueSize() {
        return this.queue.size() + this.queue.remainingCapacity();
    }

    public long getTimeout() {
        return this.timeout;
    }

    public int getNumThreads() {
        return this.numThreads;
    }

    private class LatchWorkerThread
    extends Thread {
        final int threadID;

        LatchWorkerThread(int threadID) {
            this.setUncaughtExceptionHandler(ThreadPoolStageDriver.this.workerThreadExceptionHandler);
            this.threadID = threadID;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public final void run() {
            try {
                ThreadPoolStageDriver.this.startSignal.await();
                while (ThreadPoolStageDriver.this.currentState != StageDriver.State.ERROR) {
                    try {
                        Object obj = ThreadPoolStageDriver.this.queue.poll(ThreadPoolStageDriver.this.timeout, TimeUnit.MILLISECONDS);
                        if (obj == null) {
                            if (ThreadPoolStageDriver.this.currentState != StageDriver.State.STOP_REQUESTED) continue;
                            break;
                        }
                        try {
                            ThreadPoolStageDriver.this.stage.process(obj);
                        }
                        catch (StageException e) {
                            ThreadPoolStageDriver.this.recordProcessingException(obj, e);
                            if (ThreadPoolStageDriver.this.faultTolerance != FaultTolerance.NONE) continue;
                            throw e;
                        }
                        catch (RuntimeException e) {
                            ThreadPoolStageDriver.this.recordProcessingException(obj, e);
                            if (ThreadPoolStageDriver.this.faultTolerance != FaultTolerance.CHECKED && ThreadPoolStageDriver.this.faultTolerance != FaultTolerance.NONE) continue;
                            throw e;
                        }
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Worker thread " + this.threadID + " unexpectedly interrupted while waiting on data for stage " + ThreadPoolStageDriver.this.stage, e);
                    }
                }
                if (ThreadPoolStageDriver.this.log.isDebugEnabled()) {
                    ThreadPoolStageDriver.this.log.debug((Object)("Stage " + ThreadPoolStageDriver.this.stage + " (threadID: " + this.threadID + ") exited running state."));
                }
            }
            catch (StageException e) {
                ThreadPoolStageDriver.this.log.error((Object)("An error occurred in the stage " + ThreadPoolStageDriver.this.stage + " (threadID: " + this.threadID + ")"), (Throwable)e);
                ThreadPoolStageDriver.this.recordFatalError(e);
                ThreadPoolStageDriver.this.setState(StageDriver.State.ERROR);
            }
            catch (InterruptedException e) {
                ThreadPoolStageDriver.this.log.error((Object)("Stage " + ThreadPoolStageDriver.this.stage + " (threadID: " + this.threadID + ") interrupted while waiting for barrier"), (Throwable)e);
                ThreadPoolStageDriver.this.recordFatalError(e);
                ThreadPoolStageDriver.this.setState(StageDriver.State.ERROR);
            }
            finally {
                ThreadPoolStageDriver.this.doneSignal.countDown();
                ThreadPoolStageDriver e = ThreadPoolStageDriver.this;
                synchronized (e) {
                    ThreadPoolStageDriver.this.notifyAll();
                }
            }
        }
    }
}

