/*
 * Decompiled with CFR 0.152.
 */
package com.tc.async.impl;

import com.tc.async.api.ConfigurationContext;
import com.tc.async.api.EventHandler;
import com.tc.async.api.EventHandlerException;
import com.tc.async.api.Sink;
import com.tc.async.api.Source;
import com.tc.async.api.Stage;
import com.tc.async.impl.ContextWrapper;
import com.tc.async.impl.DirectSink;
import com.tc.async.impl.StageQueue;
import com.tc.exception.TCNotRunningException;
import com.tc.exception.TCRuntimeException;
import com.tc.logging.TCLoggerProvider;
import com.tc.properties.TCPropertiesImpl;
import com.tc.util.concurrent.QueueFactory;
import com.tc.util.concurrent.ThreadUtil;
import java.util.Arrays;
import org.slf4j.Logger;

public class StageImpl<EC>
implements Stage<EC> {
    private static final long pollTime = 3000L;
    private final String name;
    private final EventHandler<EC> handler;
    private final StageQueue<EC> stageQueue;
    private final Sink<EC> sink;
    private final WorkerThread<EC>[] threads;
    private final ThreadGroup group;
    private final Logger logger;
    private final int sleepMs;
    private final boolean pausable;
    private volatile boolean paused;
    private volatile boolean shutdown = true;

    public StageImpl(TCLoggerProvider loggerProvider, String name, EventHandler<EC> handler, int queueCount, ThreadGroup group, QueueFactory<ContextWrapper<EC>> queueFactory, int queueSize, boolean canBeDirect) {
        this.logger = loggerProvider.getLogger(Stage.class.getName() + ": " + name);
        this.name = name;
        this.handler = handler;
        this.threads = new WorkerThread[queueCount];
        this.stageQueue = StageQueue.StageQueueFactory.factory(queueCount, queueFactory, loggerProvider, name, queueSize);
        this.sink = !canBeDirect ? this.stageQueue : new DirectSink<EC>(this.handler, this.stageQueue::isEmpty, this.stageQueue);
        this.group = group;
        this.sleepMs = TCPropertiesImpl.getProperties().getInt("seda." + name + ".sleepMs", 0);
        if (this.sleepMs > 0) {
            this.logger.warn("Sleep of " + this.sleepMs + "ms enabled for stage " + name);
        }
        this.pausable = TCPropertiesImpl.getProperties().getBoolean("seda." + name + ".pausable", false);
        if (this.pausable) {
            this.logger.warn("Stage pausing is enabled for stage " + name);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() {
        StageImpl stageImpl = this;
        synchronized (stageImpl) {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
        }
        this.stageQueue.close();
        this.stopThreads();
        this.handler.destroy();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(ConfigurationContext context) {
        StageImpl stageImpl = this;
        synchronized (stageImpl) {
            if (!this.shutdown) {
                return;
            }
            this.shutdown = false;
        }
        this.handler.initializeContext(context);
        this.startThreads();
    }

    @Override
    public Sink<EC> getSink() {
        return this.sink;
    }

    @Override
    public int pause() {
        this.paused = true;
        return this.stageQueue.size();
    }

    @Override
    public void unpause() {
        this.paused = false;
    }

    @Override
    public void clear() {
        boolean interrupted = Thread.interrupted();
        this.stageQueue.clear();
        for (WorkerThread<EC> wt : this.threads) {
            try {
                if (wt == null) continue;
                ((WorkerThread)wt).waitForIdle();
            }
            catch (InterruptedException ie) {
                interrupted = true;
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void startThreads() {
        for (int i = 0; i < this.threads.length; ++i) {
            String threadName = "WorkerThread(" + this.name + ", " + i;
            threadName = this.threads.length > 1 ? threadName + ", " + this.stageQueue.getSource(i).getSourceName() + ")" : threadName + ")";
            this.threads[i] = new WorkerThread<EC>(threadName, this.stageQueue.getSource(i), this.handler, this.group, this.logger, this.sleepMs, this.pausable, this.name);
            this.threads[i].start();
        }
    }

    private synchronized void stopThreads() {
        for (WorkerThread<EC> thread : this.threads) {
            try {
                thread.join();
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        }
    }

    @Override
    public String getName() {
        return this.name;
    }

    public String toString() {
        return "StageImpl(" + this.name + ")";
    }

    void waitForIdle() {
        Arrays.stream(this.threads).forEach(t -> ((WorkerThread)t).waitForIdleUninterruptibly());
    }

    private static boolean isTCNotRunningException(Throwable e) {
        Throwable rootCause = null;
        while (e != null) {
            rootCause = e;
            e = e.getCause();
        }
        return rootCause instanceof TCNotRunningException;
    }

    private class WorkerThread<EC>
    extends Thread {
        private final Source<ContextWrapper<EC>> source;
        private final EventHandler<EC> handler;
        private final Logger tcLogger;
        private final int sleepMs;
        private final boolean pausable;
        private volatile boolean idle;
        private final Object idleLock;
        private boolean waitingForIdle;
        private final String stageName;

        public WorkerThread(String name, Source<ContextWrapper<EC>> source, EventHandler<EC> handler, ThreadGroup group, Logger logger, int sleepMs, boolean pausable, String stageName) {
            super(group, name);
            this.idle = false;
            this.idleLock = new Object();
            this.waitingForIdle = false;
            this.tcLogger = logger;
            this.setDaemon(true);
            this.source = source;
            this.handler = handler;
            this.sleepMs = sleepMs;
            this.pausable = pausable;
            this.stageName = stageName;
        }

        private void handleStageDebugPauses() {
            if (this.sleepMs > 0) {
                ThreadUtil.reallySleep(this.sleepMs);
            }
            while (StageImpl.this.paused || this.pausable && "paused".equalsIgnoreCase(System.getProperty(this.stageName))) {
                if (!StageImpl.this.paused) {
                    this.tcLogger.info("Stage paused, sleeping for 1s");
                }
                ThreadUtil.reallySleep(1000L);
            }
        }

        public boolean isIdle() {
            return this.idle;
        }

        @Override
        public void run() {
            while (!StageImpl.this.shutdown || !this.source.isEmpty()) {
                ContextWrapper<EC> ctxt = null;
                try {
                    this.setToIdle();
                    ctxt = this.source.poll(3000L);
                    if (ctxt == null) continue;
                    this.idle = false;
                    this.handleStageDebugPauses();
                    ctxt.runWithHandler(this.handler);
                }
                catch (InterruptedException ie) {
                    if (StageImpl.this.shutdown) continue;
                    throw new TCRuntimeException(ie);
                }
                catch (EventHandlerException ie) {
                    if (StageImpl.this.shutdown) continue;
                    throw new TCRuntimeException(ie);
                }
                catch (Exception e) {
                    if (StageImpl.isTCNotRunningException(e)) {
                        if (StageImpl.this.shutdown) continue;
                        this.tcLogger.info("Ignoring " + TCNotRunningException.class.getSimpleName() + " while handling context: " + ctxt);
                        continue;
                    }
                    throw new TCRuntimeException("Uncaught exception in stage", e);
                }
                finally {
                    ctxt = null;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void setToIdle() {
            if (!this.idle && this.source.isEmpty()) {
                this.idle = true;
                Object object = this.idleLock;
                synchronized (object) {
                    if (this.waitingForIdle) {
                        this.idleLock.notifyAll();
                    }
                }
            }
        }

        private void waitForIdleUninterruptibly() {
            boolean interrupted = false;
            boolean localIdle = false;
            while (!localIdle) {
                try {
                    this.waitForIdle();
                    localIdle = true;
                }
                catch (InterruptedException ie) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void waitForIdle() throws InterruptedException {
            while (!this.idle) {
                Object object = this.idleLock;
                synchronized (object) {
                    this.waitingForIdle = true;
                    this.idleLock.wait();
                    this.waitingForIdle = false;
                }
            }
        }
    }
}

