/*
 * Decompiled with CFR 0.152.
 */
package org.apache.commons.pipeline.driver;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pipeline.Feeder;
import org.apache.commons.pipeline.Stage;
import org.apache.commons.pipeline.StageContext;
import org.apache.commons.pipeline.StageDriver;
import org.apache.commons.pipeline.StageException;
import org.apache.commons.pipeline.driver.AbstractStageDriver;
import org.apache.commons.pipeline.driver.FaultTolerance;

public class DedicatedThreadStageDriver
extends AbstractStageDriver {
    private final Log log = LogFactory.getLog(DedicatedThreadStageDriver.class);
    private long timeout;
    private Thread workerThread;
    private BlockingQueue queue;
    private final Feeder feeder = new Feeder(){

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void feed(Object obj) {
            if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                DedicatedThreadStageDriver.this.log.debug((Object)(obj + " is being fed to stage " + DedicatedThreadStageDriver.this.stage + " (" + DedicatedThreadStageDriver.this.queue.remainingCapacity() + " available slots in queue)"));
            }
            try {
                DedicatedThreadStageDriver.this.queue.put(obj);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException("Unexpected interrupt while waiting for space to become available for object " + obj + " in queue for stage " + DedicatedThreadStageDriver.this.stage, e);
            }
            DedicatedThreadStageDriver dedicatedThreadStageDriver = DedicatedThreadStageDriver.this;
            synchronized (dedicatedThreadStageDriver) {
                DedicatedThreadStageDriver.this.notifyAll();
            }
        }
    };
    private Thread.UncaughtExceptionHandler workerThreadExceptionHandler = new Thread.UncaughtExceptionHandler(){

        public void uncaughtException(Thread t, Throwable e) {
            DedicatedThreadStageDriver.this.setState(StageDriver.State.ERROR);
            DedicatedThreadStageDriver.this.recordFatalError(e);
            DedicatedThreadStageDriver.this.log.error((Object)("Uncaught exception in stage " + DedicatedThreadStageDriver.this.stage), e);
        }
    };

    public DedicatedThreadStageDriver(Stage stage, StageContext context, BlockingQueue queue, long timeout, FaultTolerance faultTolerance) {
        super(stage, context, faultTolerance);
        this.queue = queue;
        this.timeout = timeout;
    }

    public Feeder getFeeder() {
        return this.feeder;
    }

    public synchronized void start() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            this.log.debug((Object)("Starting worker thread for stage " + this.stage + "."));
            this.workerThread = new WorkerThread(this.stage);
            this.workerThread.start();
            this.log.debug((Object)("Worker thread for stage " + this.stage + " started."));
            try {
                while (this.currentState != StageDriver.State.RUNNING && this.currentState != StageDriver.State.ERROR) {
                    this.wait();
                }
            }
            catch (InterruptedException e) {
                throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for thread startup.", e);
            }
        } else {
            throw new IllegalStateException("Attempt to start driver in state " + (Object)((Object)this.currentState));
        }
    }

    public synchronized void finish() throws StageException {
        if (this.currentState == StageDriver.State.STOPPED) {
            throw new IllegalStateException("The driver is not currently running.");
        }
        try {
            while (this.currentState != StageDriver.State.RUNNING && this.currentState != StageDriver.State.ERROR) {
                this.wait();
            }
            this.testAndSetState(StageDriver.State.RUNNING, StageDriver.State.STOP_REQUESTED);
            while (this.currentState != StageDriver.State.FINISHED && this.currentState != StageDriver.State.ERROR) {
                this.wait();
            }
            this.log.debug((Object)("Waiting for worker thread stop for stage " + this.stage + "."));
            this.workerThread.join();
            this.log.debug((Object)("Worker thread for stage " + this.stage + " halted"));
        }
        catch (InterruptedException e) {
            throw new StageException(this.getStage(), "Worker thread unexpectedly interrupted while waiting for graceful shutdown.", e);
        }
        this.setState(StageDriver.State.STOPPED);
    }

    public int getQueueSize() {
        return this.queue.size() + this.queue.remainingCapacity();
    }

    public long getTimeout() {
        return this.timeout;
    }

    private class WorkerThread
    extends Thread {
        private Stage stage;

        public WorkerThread(Stage stage) {
            this.setUncaughtExceptionHandler(DedicatedThreadStageDriver.this.workerThreadExceptionHandler);
            this.stage = stage;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public final void run() {
            DedicatedThreadStageDriver.this.setState(StageDriver.State.STARTED);
            try {
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Preprocessing stage " + this.stage + "..."));
                }
                this.stage.preprocess();
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Preprocessing for stage " + this.stage + " complete."));
                }
                DedicatedThreadStageDriver.this.testAndSetState(StageDriver.State.STARTED, StageDriver.State.RUNNING);
                while (DedicatedThreadStageDriver.this.currentState != StageDriver.State.ERROR) {
                    try {
                        Object obj = DedicatedThreadStageDriver.this.queue.poll(DedicatedThreadStageDriver.this.timeout, TimeUnit.MILLISECONDS);
                        if (obj == null) {
                            if (DedicatedThreadStageDriver.this.currentState != StageDriver.State.STOP_REQUESTED) continue;
                            break;
                        }
                        try {
                            this.stage.process(obj);
                        }
                        catch (StageException e) {
                            DedicatedThreadStageDriver.this.recordProcessingException(obj, e);
                            if (DedicatedThreadStageDriver.this.faultTolerance != FaultTolerance.NONE) continue;
                            throw e;
                        }
                        catch (RuntimeException e) {
                            DedicatedThreadStageDriver.this.recordProcessingException(obj, e);
                            if (DedicatedThreadStageDriver.this.faultTolerance != FaultTolerance.CHECKED && DedicatedThreadStageDriver.this.faultTolerance != FaultTolerance.NONE) continue;
                            throw e;
                        }
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Worker thread unexpectedly interrupted while waiting on data for stage " + this.stage, e);
                    }
                }
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Stage " + this.stage + " exited running state."));
                }
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Postprocessing stage " + this.stage + "..."));
                }
                this.stage.postprocess();
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Postprocessing for stage " + this.stage + " complete."));
                }
            }
            catch (StageException e) {
                DedicatedThreadStageDriver.this.log.error((Object)("An error occurred in the stage " + this.stage), (Throwable)e);
                DedicatedThreadStageDriver.this.recordFatalError(e);
                DedicatedThreadStageDriver.this.setState(StageDriver.State.ERROR);
            }
            finally {
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Releasing resources for stage " + this.stage + "..."));
                }
                this.stage.release();
                if (DedicatedThreadStageDriver.this.log.isDebugEnabled()) {
                    DedicatedThreadStageDriver.this.log.debug((Object)("Stage " + this.stage + " released."));
                }
            }
            DedicatedThreadStageDriver.this.testAndSetState(StageDriver.State.STOP_REQUESTED, StageDriver.State.FINISHED);
        }
    }
}

