/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.core.agent;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.digdag.core.ErrorReporter;
import io.digdag.core.agent.AgentConfig;
import io.digdag.core.agent.AgentId;
import io.digdag.core.agent.OperatorManager;
import io.digdag.core.agent.TaskServerApi;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.metrics.DigdagMetrics;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiThreadAgent
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(MultiThreadAgent.class);
    private final AgentConfig config;
    private final AgentId agentId;
    private final TaskServerApi taskServer;
    private final OperatorManager runner;
    private final TransactionManager transactionManager;
    private final ErrorReporter errorReporter;
    private final Object addActiveTaskLock = new Object();
    private final BlockingQueue<Runnable> executorQueue;
    private final ThreadPoolExecutor executor;
    private final AtomicInteger activeTaskCount = new AtomicInteger(0);
    private final DigdagMetrics metrics;
    private volatile boolean stop = false;

    public MultiThreadAgent(AgentConfig config, AgentId agentId, TaskServerApi taskServer, OperatorManager runner, TransactionManager transactionManager, ErrorReporter errorReporter, DigdagMetrics metrics) {
        this.agentId = agentId;
        this.config = config;
        this.taskServer = taskServer;
        this.runner = runner;
        this.transactionManager = transactionManager;
        this.errorReporter = errorReporter;
        this.metrics = metrics;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(false).setNameFormat("task-thread-%d").build();
        if (config.getMaxThreads() > 0) {
            this.executorQueue = new LinkedBlockingQueue<Runnable>();
            this.executor = new ThreadPoolExecutor(config.getMaxThreads(), config.getMaxThreads(), 0L, TimeUnit.SECONDS, this.executorQueue, threadFactory);
        } else {
            this.executorQueue = new SynchronousQueue<Runnable>();
            this.executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, this.executorQueue, threadFactory);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown(Optional<Duration> maximumCompletionWait) throws InterruptedException {
        int maximumActiveTasks;
        this.stop = true;
        this.taskServer.interruptLocalWait();
        Object object = this.addActiveTaskLock;
        synchronized (object) {
            this.executor.shutdown();
            maximumActiveTasks = this.activeTaskCount.get();
            this.addActiveTaskLock.notifyAll();
        }
        if (maximumActiveTasks > 0) {
            logger.info("Waiting for completion of {} running tasks...", (Object)maximumActiveTasks);
        }
        if (maximumCompletionWait.isPresent()) {
            long seconds = ((Duration)maximumCompletionWait.get()).getSeconds();
            if (!this.executor.awaitTermination(seconds, TimeUnit.SECONDS)) {
                logger.warn("Some tasks didn't finish within maximum wait time ({} seconds)", (Object)seconds);
            }
        } else {
            while (!this.executor.awaitTermination(24L, TimeUnit.HOURS)) {
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (!this.stop) {
            try {
                Object object = this.addActiveTaskLock;
                synchronized (object) {
                    if (this.executor.isShutdown()) {
                        break;
                    }
                    int maximumActiveTasks = this.activeTaskCount.get();
                    int guaranteedAvaialbleThreads = this.executor.getMaximumPoolSize() - maximumActiveTasks;
                    int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10);
                    if (maxAcquire > 0) {
                        this.metrics.summary(DigdagMetrics.Category.AGENT, "mtag_NumMaxAcquire", (double)maxAcquire);
                        this.transactionManager.begin(() -> {
                            List<TaskRequest> reqs = this.taskServer.lockSharedAgentTasks(maxAcquire, this.agentId, this.config.getLockRetentionTime(), 1000L);
                            for (TaskRequest req : reqs) {
                                this.executor.submit(() -> {
                                    try {
                                        this.runner.run(req);
                                    }
                                    catch (Throwable t) {
                                        logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception. Task queue will detect this failure and this task will be retried later.", t);
                                        this.errorReporter.reportUncaughtError(t);
                                        this.metrics.increment(DigdagMetrics.Category.AGENT, "uncaughtErrors");
                                    }
                                    finally {
                                        this.activeTaskCount.decrementAndGet();
                                    }
                                });
                                this.activeTaskCount.incrementAndGet();
                            }
                            return null;
                        });
                    } else {
                        this.metrics.increment(DigdagMetrics.Category.AGENT, "mtag_RunWaitCounter");
                        this.addActiveTaskLock.wait(500L);
                    }
                }
            }
            catch (Throwable t) {
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Uncaught exception during acquiring tasks from a server. Ignoring. Agent thread will be retried.", t);
                this.errorReporter.reportUncaughtError(t);
                this.metrics.increment(DigdagMetrics.Category.AGENT, "uncaughtErrors");
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

