/*
 * Decompiled with CFR 0.152.
 */
package io.digdag.core.workflow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigException;
import io.digdag.client.config.ConfigFactory;
import io.digdag.commons.ThrowablesUtil;
import io.digdag.core.Limits;
import io.digdag.core.agent.AgentId;
import io.digdag.core.database.TransactionManager;
import io.digdag.core.log.LogMarkers;
import io.digdag.core.repository.ProjectStoreManager;
import io.digdag.core.repository.ResourceConflictException;
import io.digdag.core.repository.ResourceNotFoundException;
import io.digdag.core.repository.StoredProject;
import io.digdag.core.repository.StoredRevision;
import io.digdag.core.repository.WorkflowDefinition;
import io.digdag.core.session.ImmutableTask;
import io.digdag.core.session.ParameterUpdate;
import io.digdag.core.session.ResumingTask;
import io.digdag.core.session.Session;
import io.digdag.core.session.SessionAttempt;
import io.digdag.core.session.SessionControlStore;
import io.digdag.core.session.SessionMonitor;
import io.digdag.core.session.SessionStore;
import io.digdag.core.session.SessionStoreManager;
import io.digdag.core.session.StoredSession;
import io.digdag.core.session.StoredSessionAttempt;
import io.digdag.core.session.StoredSessionAttemptWithSession;
import io.digdag.core.session.StoredTask;
import io.digdag.core.session.Task;
import io.digdag.core.session.TaskAttemptSummary;
import io.digdag.core.session.TaskStateCode;
import io.digdag.core.session.TaskStateFlags;
import io.digdag.core.workflow.AttemptLimitExceededException;
import io.digdag.core.workflow.AttemptRequest;
import io.digdag.core.workflow.IllegalResumeException;
import io.digdag.core.workflow.SessionAttemptConflictException;
import io.digdag.core.workflow.SessionAttemptControl;
import io.digdag.core.workflow.TaskConfig;
import io.digdag.core.workflow.TaskControl;
import io.digdag.core.workflow.TaskLimitExceededException;
import io.digdag.core.workflow.TaskQueueDispatcher;
import io.digdag.core.workflow.TaskTree;
import io.digdag.core.workflow.Workflow;
import io.digdag.core.workflow.WorkflowCompiler;
import io.digdag.core.workflow.WorkflowSubmitter;
import io.digdag.core.workflow.WorkflowTask;
import io.digdag.core.workflow.WorkflowTaskList;
import io.digdag.metrics.DigdagTimed;
import io.digdag.spi.ImmutableTaskQueueRequest;
import io.digdag.spi.ImmutableTaskRequest;
import io.digdag.spi.TaskConflictException;
import io.digdag.spi.TaskExecutionException;
import io.digdag.spi.TaskNotFoundException;
import io.digdag.spi.TaskQueueLock;
import io.digdag.spi.TaskQueueRequest;
import io.digdag.spi.TaskRequest;
import io.digdag.spi.TaskResult;
import io.digdag.spi.metrics.DigdagMetrics;
import io.digdag.util.RetryControl;
import java.lang.invoke.LambdaMetafactory;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkflowExecutor {
    private static final Logger logger = LoggerFactory.getLogger(WorkflowExecutor.class);
    private final ProjectStoreManager rm;
    private final SessionStoreManager sm;
    private final TransactionManager tm;
    private final WorkflowCompiler compiler;
    private final TaskQueueDispatcher dispatcher;
    private final ConfigFactory cf;
    private final ObjectMapper archiveMapper;
    private final Config systemConfig;
    private final Limits limits;
    private final DigdagMetrics metrics;
    private final Lock propagatorLock = new ReentrantLock();
    private final Condition propagatorCondition = this.propagatorLock.newCondition();
    private volatile boolean propagatorNotice = false;
    private final boolean enqueueRandomFetch;
    private final Integer enqueueFetchSize;
    private static final DateTimeFormatter SESSION_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssxxx", Locale.ENGLISH);
    private static final int INITIAL_INTERVAL = 100;
    private static final int MAX_INTERVAL = 5000;

    @Inject
    public WorkflowExecutor(ProjectStoreManager rm, SessionStoreManager sm, TransactionManager tm, TaskQueueDispatcher dispatcher, WorkflowCompiler compiler, ConfigFactory cf, ObjectMapper archiveMapper, Config systemConfig, Limits limits, DigdagMetrics metrics) {
        this.rm = rm;
        this.sm = sm;
        this.tm = tm;
        this.compiler = compiler;
        this.dispatcher = dispatcher;
        this.cf = cf;
        this.archiveMapper = archiveMapper;
        this.systemConfig = systemConfig;
        this.limits = limits;
        this.metrics = metrics;
        this.enqueueRandomFetch = (Boolean)systemConfig.get("executor.enqueue_random_fetch", Boolean.class, (Object)false);
        this.enqueueFetchSize = (Integer)systemConfig.get("executor.enqueue_fetch_size", Integer.class, (Object)100);
    }

    public StoredSessionAttemptWithSession submitWorkflow(int siteId, AttemptRequest ar, WorkflowDefinition def) throws ResourceNotFoundException, AttemptLimitExceededException, TaskLimitExceededException, SessionAttemptConflictException {
        Workflow workflow = this.compiler.compile(def.getName(), def.getConfig());
        WorkflowTaskList tasks = workflow.getTasks();
        return this.submitTasks(siteId, ar, tasks);
    }

    public <T> T submitTransaction(int siteId, WorkflowSubmitterAction<T> func) throws ResourceNotFoundException, AttemptLimitExceededException, SessionAttemptConflictException {
        try {
            return (T)this.sm.getSessionStore(siteId).sessionTransaction(transaction -> func.call(new WorkflowSubmitter(siteId, transaction, this.rm.getProjectStore(siteId), this.sm.getSessionStore(siteId), this.tm, this.limits)));
        }
        catch (Exception ex) {
            ThrowablesUtil.propagateIfInstanceOf((Throwable)ex, ResourceNotFoundException.class);
            ThrowablesUtil.propagateIfInstanceOf((Throwable)ex, AttemptLimitExceededException.class);
            ThrowablesUtil.propagateIfInstanceOf((Throwable)ex, SessionAttemptConflictException.class);
            throw ThrowablesUtil.propagate((Throwable)ex);
        }
    }

    public StoredSessionAttemptWithSession submitTasks(int siteId, AttemptRequest ar, WorkflowTaskList tasks) throws ResourceNotFoundException, AttemptLimitExceededException, TaskLimitExceededException, SessionAttemptConflictException {
        StoredSessionAttemptWithSession stored;
        Object resumingTasks;
        if (logger.isTraceEnabled()) {
            for (WorkflowTask task : tasks) {
                logger.trace("  Step[{}]: {}", (Object)task.getIndex(), (Object)task.getName());
                logger.trace("    parent: {}", task.getParentIndex().transform(it -> Integer.toString(it)).or((Object)"(root)"));
                logger.trace("    upstreams: {}", (Object)task.getUpstreamIndexes().stream().map(it -> Integer.toString(it)).collect(Collectors.joining(", ")));
                logger.trace("    config: {}", (Object)task.getConfig());
            }
        }
        int projId = ar.getStored().getProjectId();
        Session session = Session.of(projId, ar.getWorkflowName(), ar.getSessionTime());
        SessionAttempt attempt = SessionAttempt.of(ar.getRetryAttemptName(), ar.getSessionParams(), ar.getTimeZone(), (Optional<Long>)Optional.of((Object)ar.getStored().getWorkflowDefinitionId()));
        TaskConfig.validateAttempt(attempt);
        if (ar.getResumingAttemptId().isPresent()) {
            WorkflowTask root = tasks.get(0);
            resumingTasks = TaskControl.buildResumingTaskMap(this.sm.getSessionStore(siteId), (Long)ar.getResumingAttemptId().get(), ar.getResumingTasks());
            Iterator iterator = resumingTasks.iterator();
            while (iterator.hasNext()) {
                ResumingTask resumingTask = (ResumingTask)iterator.next();
                if (!resumingTask.getFullName().equals(root.getFullName())) continue;
                throw new IllegalResumeException("Resuming root task is not allowed");
            }
        } else {
            resumingTasks = ImmutableList.of();
        }
        try {
            SessionStore ss = this.sm.getSessionStore(siteId);
            long activeAttempts = ss.getActiveAttemptCount();
            if (activeAttempts + 1L > this.limits.maxAttempts()) {
                throw new AttemptLimitExceededException("Too many attempts running. Limit: " + this.limits.maxAttempts() + ", Current: " + activeAttempts);
            }
            stored = ss.putAndLockSession(session, (arg_0, arg_1) -> this.lambda$submitTasks$3(siteId, projId, attempt, ar, tasks, (List)resumingTasks, arg_0, arg_1));
        }
        catch (WorkflowTaskLimitExceededException ex) {
            throw ex.getCause();
        }
        catch (ResourceConflictException sessionAlreadyExists) {
            this.tm.reset();
            StoredSessionAttemptWithSession conflicted = ar.getRetryAttemptName().isPresent() ? this.sm.getSessionStore(siteId).getAttemptByName(session.getProjectId(), session.getWorkflowName(), session.getSessionTime(), (String)ar.getRetryAttemptName().get()) : this.sm.getSessionStore(siteId).getLastAttemptByName(session.getProjectId(), session.getWorkflowName(), session.getSessionTime());
            throw new SessionAttemptConflictException("Session already exists", sessionAlreadyExists, conflicted);
        }
        this.noticeStatusPropagate();
        return stored;
    }

    public void storeTasks(SessionControlStore store, StoredSessionAttemptWithSession storedAttempt, WorkflowDefinition def, List<ResumingTask> resumingTasks, List<SessionMonitor> sessionMonitors) throws TaskLimitExceededException {
        Workflow workflow = this.compiler.compile(def.getName(), def.getConfig());
        WorkflowTaskList tasks = workflow.getTasks();
        this.storeTasks(store, storedAttempt, tasks, resumingTasks, sessionMonitors);
    }

    public void storeTasks(SessionControlStore store, StoredSessionAttemptWithSession storedAttempt, WorkflowTaskList tasks, List<ResumingTask> resumingTasks, List<SessionMonitor> sessionMonitors) throws TaskLimitExceededException {
        WorkflowTask root = tasks.get(0);
        TaskStateCode rootTaskState = root.getTaskType().isGroupingOnly() ? TaskStateCode.PLANNED : TaskStateCode.READY;
        ImmutableTask rootTask = Task.taskBuilder().parentId((Optional<Long>)Optional.absent()).fullName(root.getFullName()).config(TaskConfig.validate(root.getConfig())).taskType(root.getTaskType()).state(rootTaskState).stateFlags(TaskStateFlags.empty().withInitialTask()).build();
        try {
            store.insertRootTask(storedAttempt.getId(), rootTask, (taskStore, storedTaskId) -> {
                try {
                    TaskControl.addInitialTasksExceptingRootTask(taskStore, storedAttempt.getId(), storedTaskId, tasks, resumingTasks, this.limits);
                }
                catch (TaskLimitExceededException ex) {
                    throw new WorkflowTaskLimitExceededException(ex);
                }
                return null;
            });
        }
        catch (WorkflowTaskLimitExceededException ex) {
            throw ex.getCause();
        }
        if (!sessionMonitors.isEmpty()) {
            for (SessionMonitor monitor : sessionMonitors) {
                logger.debug("Using session monitor: {}", (Object)monitor);
            }
            store.insertMonitors(storedAttempt.getId(), sessionMonitors);
        }
    }

    public boolean killAttemptById(int siteId, long attemptId) throws ResourceNotFoundException {
        StoredSessionAttemptWithSession attempt = this.sm.getSessionStore(siteId).getAttemptById(attemptId);
        boolean updated = this.sm.requestCancelAttempt(attempt.getId());
        if (updated) {
            this.noticeStatusPropagate();
        }
        return updated;
    }

    private void noticeStatusPropagate() {
        this.propagatorLock.lock();
        try {
            this.propagatorNotice = true;
            this.propagatorCondition.signalAll();
        }
        finally {
            this.propagatorLock.unlock();
        }
    }

    public void noticeRunWhileConditionChange() {
        this.propagatorLock.lock();
        try {
            this.propagatorCondition.signalAll();
        }
        finally {
            this.propagatorLock.unlock();
        }
    }

    public void run() throws InterruptedException {
        this.runWhile(() -> true);
    }

    public StoredSessionAttemptWithSession runUntilDone(long attemptId) throws ResourceNotFoundException, InterruptedException {
        try {
            this.runWhile(() -> {
                try {
                    return !this.sm.getAttemptStateFlags(attemptId).isDone();
                }
                catch (ResourceNotFoundException ex) {
                    throw ThrowablesUtil.propagate((Throwable)ex);
                }
            });
        }
        catch (RuntimeException ex) {
            ThrowablesUtil.propagateIfInstanceOf((Throwable)ex.getCause(), ResourceNotFoundException.class);
            throw ex;
        }
        return this.tm.begin(() -> this.sm.getAttemptWithSessionById(attemptId), ResourceNotFoundException.class);
    }

    public void runUntilAllDone() throws InterruptedException {
        this.runWhile(() -> this.sm.isAnyNotDoneAttempts());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     */
    public void runWhile(BooleanSupplier cond) throws InterruptedException {
        block12: {
            queuer = new TaskQueuer();
            this.propagateBlockedChildrenToReady();
            this.retryRetryWaitingTasks();
            this.enqueueReadyTasks(queuer);
            this.propagateAllPlannedToDone();
            this.propagateSessionArchive();
            waitMsec = new AtomicInteger(100);
            while (true) lbl-1000:
            // 6 sources

            {
                if (this.tm.begin((TransactionManager.SupplierInTransaction<Boolean, E1, E2, E3, E4>)LambdaMetafactory.metafactory(null, null, null, ()Ljava/lang/Object;, lambda$runWhile$9(java.util.function.BooleanSupplier ), ()Ljava/lang/Boolean;)((BooleanSupplier)cond)).booleanValue()) {
                    break block12;
                }
                this.metrics.increment(DigdagMetrics.Category.EXECUTOR, "loopCount");
                this.propagateBlockedChildrenToReady();
                this.retryRetryWaitingTasks();
                this.enqueueReadyTasks(queuer);
                hasModification = this.propagateAllPlannedToDone();
                this.propagateSessionArchive();
                if (hasModification) continue;
                this.propagatorLock.lock();
                try {
                    if (this.propagatorNotice) {
                        this.propagatorNotice = false;
                        waitMsec.set(100);
                    }
                    this.metrics.summary(DigdagMetrics.Category.EXECUTOR, "loopWaitMsec", (double)waitMsec.get());
                    noticed = this.propagatorCondition.await(waitMsec.get(), TimeUnit.MILLISECONDS);
                    if (noticed && this.propagatorNotice) {
                        this.propagatorNotice = false;
                        waitMsec.set(100);
                    }
                    waitMsec.set(Math.min(waitMsec.get() * 2, 5000));
                }
                finally {
                    this.propagatorLock.unlock();
                    continue;
                }
                break;
            }
            ** GOTO lbl-1000
            finally {
                queuer.close();
            }
        }
    }

    protected <R> R catching(Supplier<R> func, R defaultValue, String errorMessage) {
        try {
            return func.get();
        }
        catch (Exception e) {
            this.catchingNotify(e);
            this.metrics.increment(DigdagMetrics.Category.EXECUTOR, "errorInRunWhile");
            logger.warn(errorMessage);
            return defaultValue;
        }
    }

    public void catchingNotify(Exception e) {
    }

    @VisibleForTesting
    protected Function<Long, Optional<Boolean>> funcPropagateBlockedChildrenToReady() {
        return pId -> this.tm.begin(() -> this.sm.lockTaskIfNotLocked((long)pId, store -> store.trySetChildrenBlockedToReadyOrShortCircuitPlannedOrCanceled((long)pId) > 0));
    }

    @DigdagTimed(category="executor", appendMethodName=true)
    protected boolean propagateBlockedChildrenToReady() {
        long finalLastParentId;
        List parentIds;
        boolean anyChanged = false;
        long lastParentId = 0L;
        while (!(parentIds = this.tm.begin(() -> this.lambda$propagateBlockedChildrenToReady$13(finalLastParentId = lastParentId))).isEmpty()) {
            anyChanged = parentIds.stream().map(parentId -> (Boolean)this.catching(() -> this.funcPropagateBlockedChildrenToReady().apply((Long)parentId), Optional.absent(), "Failed to set children to ready. paretId:" + parentId).or((Object)false)).reduce(anyChanged, (a, b) -> a != false || b != false);
            lastParentId = (Long)parentIds.get(parentIds.size() - 1);
        }
        return anyChanged;
    }

    protected Function<Long, Optional<Boolean>> funcSetDoneFromDoneChildren() {
        return tId -> this.tm.begin(() -> this.sm.lockTaskIfNotLocked((long)tId, (store, storedTask) -> this.setDoneFromDoneChildren(new TaskControl(store, storedTask, this.limits))));
    }

    @DigdagTimed(category="executor", appendMethodName=true)
    protected boolean propagateAllPlannedToDone() {
        long finalLastTaskId;
        List taskIds;
        boolean anyChanged = false;
        long lastTaskId = 0L;
        while (!(taskIds = this.tm.begin(() -> this.lambda$propagateAllPlannedToDone$20(finalLastTaskId = lastTaskId))).isEmpty()) {
            anyChanged = taskIds.stream().map(taskId -> (Boolean)this.catching(() -> this.funcSetDoneFromDoneChildren().apply((Long)taskId), Optional.absent(), "Failed to call setDoneFromDoneChildren. taskId:" + taskId).or((Object)false)).reduce(anyChanged, (a, b) -> a != false || b != false);
            lastTaskId = (Long)taskIds.get(taskIds.size() - 1);
        }
        return anyChanged;
    }

    private boolean setDoneFromDoneChildren(TaskControl lockedTask) {
        if (lockedTask.getState() != TaskStateCode.PLANNED) {
            return false;
        }
        if (lockedTask.isAnyProgressibleChild()) {
            return false;
        }
        logger.trace("setDoneFromDoneChildren {}", (Object)lockedTask.get());
        StoredTask task = lockedTask.get();
        if (task.getStateFlags().isCancelRequested()) {
            return lockedTask.setToCanceled();
        }
        if (task.getStateFlags().isDelayedError()) {
            boolean updated = lockedTask.setPlannedToError();
            if (!updated) {
                logger.warn("Unexpected state change failure from PLANNED to ERROR: {}", (Object)task);
            }
            return updated;
        }
        if (task.getStateFlags().isDelayedGroupError()) {
            boolean updated = lockedTask.setPlannedToGroupError();
            if (!updated) {
                logger.warn("Unexpected state change failure from PLANNED to GROUP_ERROR: {}", (Object)task);
            }
            return updated;
        }
        if (lockedTask.isAnyErrorChild()) {
            boolean updated;
            Optional<RetryControl> retryControlOpt = this.checkRetry(task);
            if (retryControlOpt.isPresent()) {
                RetryControl retryControl = (RetryControl)retryControlOpt.get();
                updated = lockedTask.setPlannedToGroupRetryWaiting(retryControl.getNextRetryStateParams(), retryControl.getNextRetryInterval());
            } else {
                boolean isRootTask;
                ArrayList<Long> errorTaskIds = new ArrayList<Long>();
                boolean bl = isRootTask = !lockedTask.get().getParentId().isPresent();
                if (isRootTask) {
                    errorTaskIds.add(this.addAttemptFailureAlertTask(lockedTask));
                }
                try {
                    Optional<Long> errorTask = this.addErrorTasksIfAny(lockedTask, true, export -> {
                        this.collectErrorParams((Config)export, lockedTask.get());
                        return export;
                    });
                    if (errorTask.isPresent()) {
                        errorTaskIds.add((Long)errorTask.get());
                    }
                }
                catch (TaskLimitExceededException ex) {
                    this.tm.reset();
                    logger.warn("Failed to add error tasks because of task limit");
                }
                catch (ConfigException ex) {
                    logger.warn("Found a broken _error task in attempt {} task {}. Skipping this task.", new Object[]{task.getAttemptId(), task.getId(), ex});
                }
                updated = errorTaskIds.isEmpty() ? lockedTask.setPlannedToGroupError() : lockedTask.setPlannedToPlannedWithDelayedGroupError();
            }
            return updated;
        }
        boolean updated = lockedTask.setPlannedToSuccess();
        if (!updated) {
            logger.warn("Unexpected state change failure from PLANNED to SUCCESS: {}", (Object)task);
        }
        return updated;
    }

    Optional<RetryControl> checkRetry(StoredTask task) {
        try {
            RetryControl retryControl = RetryControl.prepare((Config)task.getConfig().getMerged(), (Config)task.getStateParams(), (boolean)false);
            if (retryControl.evaluate()) {
                return Optional.of((Object)retryControl);
            }
            return Optional.absent();
        }
        catch (ConfigException ce) {
            logger.warn("Ignore retry parameter because of invalid retry configuration. attempt_id:{} config:{}", (Object)task.getAttemptId(), (Object)task.getConfig());
            return Optional.absent();
        }
    }

    private void collectErrorParams(Config params, StoredTask task) {
        TaskTree tree = new TaskTree(this.sm.getTaskRelations(task.getAttemptId()));
        List<Long> childrenFromThis = tree.getRecursiveChildrenIdList(task.getId());
        List<ParameterUpdate> childrenStoreParams = this.sm.getStoreParams(childrenFromThis);
        for (ParameterUpdate childStoreParams : childrenStoreParams) {
            childStoreParams.applyTo(params);
        }
        Config error = this.cf.create();
        List<Config> childrenErrors = this.sm.getErrors(childrenFromThis);
        for (Config childError : childrenErrors) {
            error.merge(childError);
        }
        params.set("error", (Object)error);
    }

    @VisibleForTesting
    protected Function<TaskAttemptSummary, Optional<Boolean>> funcArchiveTasks() {
        return t -> this.tm.begin(() -> this.sm.lockAttemptIfExists(t.getAttemptId(), (store, summary) -> {
            if (summary.getStateFlags().isDone()) {
                return false;
            }
            SessionAttemptControl control = new SessionAttemptControl(store, t.getAttemptId());
            control.archiveTasks(this.archiveMapper, t.getState() == TaskStateCode.SUCCESS);
            return true;
        }));
    }

    @DigdagTimed(category="executor", appendMethodName=true)
    protected boolean propagateSessionArchive() {
        long finalLastTaskId;
        List tasks;
        boolean anyChanged = false;
        long lastTaskId = 0L;
        while (!(tasks = this.tm.begin(() -> this.lambda$propagateSessionArchive$28(finalLastTaskId = lastTaskId))).isEmpty()) {
            anyChanged = tasks.stream().map(task -> (Boolean)this.catching(() -> this.funcArchiveTasks().apply((TaskAttemptSummary)task), Optional.absent(), "Failed to call archiveTasks. taskId:" + task.getId()).or((Object)false)).reduce(anyChanged, (a, b) -> a != false || b != false);
            lastTaskId = ((TaskAttemptSummary)tasks.get(tasks.size() - 1)).getId();
        }
        return anyChanged;
    }

    @DigdagTimed(category="executor", appendMethodName=true)
    protected boolean retryRetryWaitingTasks() {
        return this.tm.begin(() -> this.sm.trySetRetryWaitingToReady() > 0);
    }

    @VisibleForTesting
    protected Function<Long, Boolean> funcEnqueueTask() {
        return tId -> this.tm.begin(() -> {
            this.enqueueTask(this.dispatcher, (long)tId);
            return true;
        });
    }

    @DigdagTimed(category="executor", appendMethodName=true)
    protected void enqueueReadyTasks(TaskQueuer queuer) {
        List readyTaskIds = this.tm.begin(() -> this.sm.findAllReadyTaskIds(this.enqueueFetchSize, this.enqueueRandomFetch));
        logger.trace("readyTaskIds:{}", (Object)readyTaskIds);
        Iterator iterator = readyTaskIds.iterator();
        while (iterator.hasNext()) {
            long taskId = (Long)iterator.next();
            this.catching(() -> this.funcEnqueueTask().apply(taskId), true, "Failed to call enqueueTask. taskId:" + taskId);
        }
    }

    @DigdagTimed(category="executor", appendMethodName=true)
    protected void enqueueTask(TaskQueueDispatcher dispatcher, long taskId) {
        this.sm.lockTaskIfNotLocked(taskId, (store, task) -> {
            int siteId;
            TaskControl lockedTask = new TaskControl(store, task, this.limits);
            if (lockedTask.getState() != TaskStateCode.READY) {
                return false;
            }
            if (task.getTaskType().isGroupingOnly()) {
                return this.retryGroupingTask(lockedTask);
            }
            try {
                siteId = this.sm.getSiteIdOfTask(taskId);
            }
            catch (ResourceNotFoundException ex) {
                this.tm.reset();
                IllegalStateException error = new IllegalStateException("Task id=" + taskId + " is ready to run but associated session attempt does not exist.", ex);
                logger.error("Database state error enqueuing task.", (Throwable)error);
                return false;
            }
            try {
                Optional queueName = Optional.absent();
                String encodedUnique = WorkflowExecutor.encodeUniqueQueuedTaskName(lockedTask.get());
                ImmutableTaskQueueRequest request = TaskQueueRequest.builder().priority(0).uniqueName(encodedUnique).data(Optional.absent()).build();
                logger.debug("Queuing task of attempt_id={}: id={} {}", new Object[]{task.getAttemptId(), task.getId(), task.getFullName()});
                try {
                    dispatcher.dispatch(siteId, (Optional<String>)queueName, (TaskQueueRequest)request);
                }
                catch (TaskConflictException ex) {
                    this.tm.reset();
                    logger.warn("Task name {} is already queued in queue={} of site id={}. Skipped enqueuing", new Object[]{encodedUnique, queueName.or((Object)"<shared>"), siteId});
                }
                boolean updated = lockedTask.setReadyToRunning();
                if (!updated) {
                    logger.warn("Unexpected state change failure from READY to RUNNING: {}", (Object)task);
                }
                return updated;
            }
            catch (Exception ex) {
                this.tm.reset();
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Enqueue error, making this task failed: {}", (Object)task, (Object)ex);
                return this.taskFailed(lockedTask, TaskExecutionException.buildExceptionErrorConfig((Throwable)ex).toConfig(this.cf));
            }
        }).or((Object)false);
    }

    private static String encodeUniqueQueuedTaskName(StoredTask task) {
        int retryCount = task.getRetryCount();
        if (retryCount == 0) {
            return Long.toString(task.getId());
        }
        return Long.toString(task.getId()) + ".r" + Integer.toString(retryCount);
    }

    private static long parseTaskIdFromEncodedQueuedTaskName(String encodedUniqueQueuedTaskName) {
        int posDot = encodedUniqueQueuedTaskName.indexOf(46);
        if (posDot >= 0) {
            return Long.parseLong(encodedUniqueQueuedTaskName.substring(0, posDot));
        }
        return Long.parseLong(encodedUniqueQueuedTaskName);
    }

    public List<TaskRequest> getTaskRequests(List<TaskQueueLock> locks) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (TaskQueueLock lock : locks) {
            try {
                long taskId = WorkflowExecutor.parseTaskIdFromEncodedQueuedTaskName(lock.getUniqueName());
                Optional<TaskRequest> request = this.getTaskRequest(taskId, lock.getLockId());
                if (request.isPresent()) {
                    builder.add((Object)((TaskRequest)request.get()));
                    continue;
                }
                this.dispatcher.deleteInconsistentTask(lock.getLockId());
            }
            catch (RuntimeException ex) {
                this.tm.reset();
                logger.error(LogMarkers.UNEXPECTED_SERVER_ERROR, "Invalid association of task queue lock id: {}", (Object)lock, (Object)ex);
            }
        }
        return builder.build();
    }

    private Optional<TaskRequest> getTaskRequest(long taskId, String lockId) {
        return (Optional)this.sm.lockTaskIfExists(taskId, (store, task) -> {
            StoredProject project;
            StoredSessionAttemptWithSession attempt;
            try {
                attempt = this.sm.getAttemptWithSessionById(task.getAttemptId());
            }
            catch (ResourceNotFoundException ex) {
                this.tm.reset();
                IllegalStateException error = new IllegalStateException("Task id=" + taskId + " is in the task queue but associated session attempt does not exist.", ex);
                logger.error("Database state error enqueuing task.", (Throwable)error);
                return Optional.absent();
            }
            Optional rev = Optional.absent();
            if (attempt.getWorkflowDefinitionId().isPresent()) {
                try {
                    rev = Optional.of((Object)this.rm.getRevisionOfWorkflowDefinition((Long)attempt.getWorkflowDefinitionId().get()));
                }
                catch (ResourceNotFoundException ex) {
                    this.tm.reset();
                    IllegalStateException error = new IllegalStateException("Task id=" + taskId + " is in the task queue but associated workflow definition does not exist.", ex);
                    logger.error("Database state error enqueuing task.", (Throwable)error);
                    return Optional.absent();
                }
            }
            try {
                project = this.rm.getProjectByIdInternal(attempt.getSession().getProjectId());
            }
            catch (ResourceNotFoundException ex) {
                this.tm.reset();
                IllegalStateException error = new IllegalStateException("Task id=" + taskId + " is in the task queue but associated project does not exist.", ex);
                logger.error("Database state error enqueuing task.", (Throwable)error);
                return Optional.absent();
            }
            Config params = this.cf.fromJsonString((String)this.systemConfig.get("digdag.defaultParams", String.class, (Object)"{}"));
            if (rev.isPresent()) {
                params.merge(((StoredRevision)rev.get()).getDefaultParams());
            }
            params.merge(attempt.getParams());
            this.collectParams(params, task, attempt);
            Config localConfig = task.getConfig().getLocal().deepCopy();
            params.remove("_check");
            params.remove("_error");
            localConfig.remove("_check");
            localConfig.remove("_error");
            ImmutableTaskRequest request = TaskRequest.builder().siteId(attempt.getSiteId()).projectId(attempt.getSession().getProjectId()).projectName(project.getName()).workflowName(attempt.getSession().getWorkflowName()).revision(rev.transform(it -> it.getName())).taskId(task.getId()).attemptId(attempt.getId()).sessionId(attempt.getSessionId()).retryAttemptName(attempt.getRetryAttemptName()).isCancelRequested(task.getStateFlags().isCancelRequested()).taskName(task.getFullName()).lockId(lockId).timeZone(attempt.getTimeZone()).sessionUuid(attempt.getSessionUuid()).sessionTime(attempt.getSession().getSessionTime()).createdAt(Instant.now()).localConfig(localConfig).config(params).lastStateParams(task.getStateParams()).workflowDefinitionId(attempt.getWorkflowDefinitionId()).retryCount(task.getRetryCount()).startedAt(task.getStartedAt()).build();
            return Optional.of((Object)request);
        }).or(() -> {
            IllegalStateException error = new IllegalStateException("Task id=" + taskId + " is in the task queue but associated task is deleted.");
            logger.error("Database state error enqueuing task.", (Throwable)error);
            return Optional.absent();
        });
    }

    private boolean retryGroupingTask(TaskControl lockedTask) {
        StoredTask task = lockedTask.get();
        TaskTree tree = new TaskTree(this.sm.getTaskRelations(task.getAttemptId()));
        List<Long> childrenIdList = tree.getRecursiveChildrenIdList(task.getId());
        lockedTask.copyInitialTasksForRetry(task.getFullName(), childrenIdList);
        lockedTask.setGroupRetryReadyToPlanned();
        return true;
    }

    public boolean taskFailed(int siteId, long taskId, String lockId, AgentId agentId, Config error) {
        boolean changed = (Boolean)this.sm.lockTaskIfExists(taskId, (store, task) -> this.taskFailed(new TaskControl(store, task, this.limits), error)).or((Object)false);
        if (changed) {
            try {
                this.dispatcher.taskFinished(siteId, lockId, agentId);
            }
            catch (TaskNotFoundException ex) {
                this.tm.reset();
                logger.warn("Ignoring missing task entry error", (Throwable)ex);
            }
            catch (TaskConflictException ex) {
                this.tm.reset();
                logger.warn("Ignoring preempted task entry error", (Throwable)ex);
            }
        }
        return changed;
    }

    public boolean taskSucceeded(int siteId, long taskId, String lockId, AgentId agentId, TaskResult result) {
        boolean changed = (Boolean)this.sm.lockTaskIfExists(taskId, (store, task) -> this.taskSucceeded(new TaskControl(store, task, this.limits), result)).or((Object)false);
        if (changed) {
            try {
                this.dispatcher.taskFinished(siteId, lockId, agentId);
            }
            catch (TaskNotFoundException ex) {
                this.tm.reset();
                logger.warn("Ignoring missing task entry error", (Throwable)ex);
            }
            catch (TaskConflictException ex) {
                this.tm.reset();
                logger.warn("Ignoring preempted task entry error", (Throwable)ex);
            }
        }
        return changed;
    }

    public boolean retryTask(int siteId, long taskId, String lockId, AgentId agentId, int retryInterval, Config retryStateParams, Optional<Config> error) {
        boolean changed = (Boolean)this.sm.lockTaskIfExists(taskId, (store, task) -> this.retryTask(new TaskControl(store, task, this.limits), retryInterval, retryStateParams, error)).or((Object)false);
        if (changed) {
            try {
                this.dispatcher.taskFinished(siteId, lockId, agentId);
            }
            catch (TaskNotFoundException ex) {
                this.tm.reset();
                logger.warn("Ignoring missing task entry error", (Throwable)ex);
            }
            catch (TaskConflictException ex) {
                this.tm.reset();
                logger.warn("Ignoring preempted task entry error", (Throwable)ex);
            }
        }
        return changed;
    }

    private boolean taskFailed(TaskControl lockedTask, Config error) {
        boolean updated;
        boolean errorTaskAdded;
        logger.trace("Task failed with error {} with no retry: {}", (Object)error, (Object)lockedTask.get());
        if (lockedTask.getState() != TaskStateCode.RUNNING) {
            logger.trace("Skipping taskFailed callback to a {} task", (Object)lockedTask.getState());
            return false;
        }
        if (lockedTask.get().getStateFlags().isCancelRequested()) {
            return lockedTask.setToCanceled();
        }
        try {
            Optional<Long> errorTaskId = this.addErrorTasksIfAny(lockedTask, false, export -> export.set("error", (Object)error));
            errorTaskAdded = errorTaskId.isPresent();
        }
        catch (TaskLimitExceededException ex) {
            this.tm.reset();
            errorTaskAdded = false;
            logger.warn("Failed to add error tasks because of task limit");
        }
        catch (ConfigException ex) {
            errorTaskAdded = false;
            logger.warn("Found a broken _error task in attempt {} task {}. Skipping this task.", new Object[]{lockedTask.get().getAttemptId(), lockedTask.get().getId(), ex});
        }
        if (errorTaskAdded) {
            logger.trace("Added an error task");
            updated = lockedTask.setRunningToPlannedWithDelayedError(error);
        } else {
            updated = lockedTask.setRunningToShortCircuitError(error);
        }
        this.noticeStatusPropagate();
        if (!updated) {
            logger.warn("Unexpected state change failure from RUNNING to RETRY, PLANNED or ERROR: {}", (Object)lockedTask.get());
        }
        return updated;
    }

    private boolean taskSucceeded(TaskControl lockedTask, TaskResult result) {
        logger.trace("Task succeeded with result {}: {}", (Object)result, (Object)lockedTask.get());
        if (lockedTask.getState() != TaskStateCode.RUNNING) {
            logger.debug("Ignoring taskSucceeded callback to a {} task", (Object)lockedTask.getState());
            return false;
        }
        if (lockedTask.get().getStateFlags().isCancelRequested()) {
            return lockedTask.setToCanceled();
        }
        boolean subtaskAdded = false;
        try {
            Optional<Long> rootSubtaskId = this.addSubtasksIfNotEmpty(lockedTask, result.getSubtaskConfig());
            Optional<Long> checkTaskId = this.addCheckTasksIfAny(lockedTask, rootSubtaskId);
            subtaskAdded = rootSubtaskId.isPresent() || checkTaskId.isPresent();
        }
        catch (TaskLimitExceededException ex) {
            this.tm.reset();
            logger.warn("Failed to add subtasks because of task limit");
            return this.taskFailed(lockedTask, TaskExecutionException.buildExceptionErrorConfig((Throwable)ex).toConfig(this.cf));
        }
        catch (ConfigException ex) {
            Config errorConfig = TaskExecutionException.buildExceptionErrorConfig((Throwable)ex).toConfig(this.cf);
            logger.error("Configuration error at task {}: {}", (Object)lockedTask.get().getFullName(), errorConfig.get("message", String.class, (Object)""));
            return this.taskFailed(lockedTask, errorConfig);
        }
        boolean updated = subtaskAdded ? lockedTask.setRunningToPlannedSuccessful(result) : lockedTask.setRunningToShortCircuitSuccess(result);
        this.noticeStatusPropagate();
        if (!updated) {
            logger.warn("Unexpected state change failure from RUNNING to PLANNED: {}", (Object)lockedTask.get());
        }
        return updated;
    }

    private boolean retryTask(TaskControl lockedTask, int retryInterval, Config retryStateParams, Optional<Config> error) {
        if (lockedTask.getState() != TaskStateCode.RUNNING) {
            logger.trace("Skipping retryTask callback to a {} task", (Object)lockedTask.getState());
            return false;
        }
        if (error.isPresent()) {
            logger.trace("Task failed with error {} with retrying after {} seconds: {}", new Object[]{error.get(), retryInterval, lockedTask.get()});
        }
        boolean updated = lockedTask.setRunningToRetryWaiting(retryStateParams, retryInterval);
        this.noticeStatusPropagate();
        if (!updated) {
            logger.warn("Unexpected state change failure from RUNNING to RETRY: {}", (Object)lockedTask.get());
        }
        return updated;
    }

    private void collectParams(Config params, StoredTask task, StoredSessionAttempt attempt) {
        TaskTree tree = new TaskTree(this.sm.getTaskRelations(attempt.getId()));
        List<Long> parentsFromRoot = tree.getRecursiveParentIdListFromRoot(task.getId());
        List<Long> parentsUpstreamChildrenFromFar = tree.getRecursiveParentsUpstreamChildrenIdListFromFar(task.getId());
        List<Config> exports = this.sm.getExportParams(parentsFromRoot);
        List<ParameterUpdate> stores = this.sm.getStoreParams(parentsUpstreamChildrenFromFar);
        for (int si = 0; si < parentsUpstreamChildrenFromFar.size(); ++si) {
            ParameterUpdate stored = stores.get(si);
            long taskId = parentsUpstreamChildrenFromFar.get(si);
            int ei = parentsFromRoot.indexOf(taskId);
            if (ei >= 0) {
                Config exported = exports.get(ei);
                params.merge(exported);
            }
            stored.applyTo(params);
        }
        params.merge(task.getConfig().getExport());
    }

    private Optional<Long> addSubtasksIfNotEmpty(TaskControl lockedTask, Config subtaskConfig) throws TaskLimitExceededException {
        if (subtaskConfig.isEmpty()) {
            return Optional.absent();
        }
        WorkflowTaskList tasks = this.compiler.compileTasks(lockedTask.get().getFullName(), "^sub", subtaskConfig);
        if (tasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding sub tasks: {}", (Object)tasks);
        long rootTaskId = lockedTask.addGeneratedSubtasks(tasks, (List<Long>)ImmutableList.of(), true, true);
        return Optional.of((Object)rootTaskId);
    }

    private Optional<Long> addErrorTasksIfAny(TaskControl lockedTask, boolean isParentErrorPropagatedFromChildren, Function<Config, Config> errorBuilder) throws TaskLimitExceededException {
        Config subtaskConfig = lockedTask.get().getConfig().getErrorConfig();
        if (subtaskConfig.isEmpty()) {
            return Optional.absent();
        }
        Config export = subtaskConfig.getNestedOrGetEmpty("_export");
        export = errorBuilder.apply(export);
        subtaskConfig.setNested("_export", export);
        WorkflowTaskList tasks = this.compiler.compileTasks(lockedTask.get().getFullName(), "^error", subtaskConfig);
        if (tasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding error tasks: {}", (Object)tasks);
        long rootTaskId = lockedTask.addGeneratedSubtasks(tasks, (List<Long>)ImmutableList.of(), false);
        return Optional.of((Object)rootTaskId);
    }

    private long addAttemptFailureAlertTask(TaskControl rootTask) {
        Config config = this.cf.create();
        config.set("_type", (Object)"notify");
        config.set("_command", (Object)"Workflow session attempt failed");
        WorkflowTaskList tasks = this.compiler.compileTasks(rootTask.get().getFullName(), "^failure-alert", config);
        return rootTask.addGeneratedSubtasksWithoutLimit(tasks, (List<Long>)ImmutableList.of(), false);
    }

    private Optional<Long> addCheckTasksIfAny(TaskControl lockedTask, Optional<Long> upstreamTaskId) throws TaskLimitExceededException {
        Config subtaskConfig = lockedTask.get().getConfig().getCheckConfig();
        if (subtaskConfig.isEmpty()) {
            return Optional.absent();
        }
        WorkflowTaskList tasks = this.compiler.compileTasks(lockedTask.get().getFullName(), "^check", subtaskConfig);
        if (tasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding check tasks: {}" + tasks);
        List upstreamTaskIdList = (List)upstreamTaskId.transform(id -> ImmutableList.of((Object)id)).or((Object)ImmutableList.of());
        long rootTaskId = lockedTask.addGeneratedSubtasks(tasks, upstreamTaskIdList, false);
        return Optional.of((Object)rootTaskId);
    }

    public Optional<Long> addMonitorTask(TaskControl lockedTask, String type, Config taskConfig) {
        switch (type) {
            case "sla": {
                taskConfig.remove("time");
                taskConfig.remove("duration");
                this.addSlaMonitorTasks(lockedTask, type, taskConfig);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported monitor task type: " + type);
            }
        }
        WorkflowTaskList tasks = this.compiler.compileTasks(lockedTask.get().getFullName(), "^" + type, taskConfig);
        if (tasks.isEmpty()) {
            return Optional.absent();
        }
        logger.trace("Adding {} tasks: {}", (Object)type, (Object)tasks);
        long rootTaskId = lockedTask.addGeneratedSubtasksWithoutLimit(tasks, (List<Long>)ImmutableList.of(), false);
        return Optional.of((Object)rootTaskId);
    }

    private void addSlaMonitorTasks(TaskControl lockedTask, String type, Config taskConfig) {
        boolean alert = true;
        try {
            alert = (Boolean)taskConfig.get("alert", Boolean.TYPE, (Object)true);
        }
        catch (ConfigException e) {
            logger.warn("sla configuration error: ", (Throwable)e);
        }
        taskConfig.remove("alert");
        if (alert) {
            Config config = this.cf.create();
            config.set("_type", (Object)"notify");
            config.set("_command", (Object)"SLA violation");
            WorkflowTaskList tasks = this.compiler.compileTasks(lockedTask.get().getFullName(), "^" + type + "^alert", config);
            logger.trace("Adding {} tasks: {}", (Object)type, (Object)tasks);
            lockedTask.addGeneratedSubtasksWithoutLimit(tasks, (List<Long>)ImmutableList.of(), false);
        }
        boolean fail = false;
        try {
            fail = (Boolean)taskConfig.get("fail", Boolean.TYPE, (Object)false);
        }
        catch (ConfigException e) {
            logger.warn("sla configuration error: ", (Throwable)e);
        }
        taskConfig.remove("fail");
        if (fail) {
            Config config = this.cf.create();
            config.set("_type", (Object)"fail");
            config.set("_command", (Object)"SLA violation");
            WorkflowTaskList tasks = this.compiler.compileTasks(lockedTask.get().getFullName(), "^" + type + "^fail", config);
            logger.trace("Adding {} tasks: {}", (Object)type, (Object)tasks);
            lockedTask.addGeneratedSubtasksWithoutLimit(tasks, (List<Long>)ImmutableList.of(), false);
        }
    }

    private /* synthetic */ List lambda$propagateSessionArchive$28(long finalLastTaskId) throws RuntimeException, RuntimeException, RuntimeException, RuntimeException {
        return this.sm.findRootTasksByStates(TaskStateCode.doneStates(), finalLastTaskId);
    }

    private /* synthetic */ List lambda$propagateAllPlannedToDone$20(long finalLastTaskId) throws RuntimeException, RuntimeException, RuntimeException, RuntimeException {
        return this.sm.findTasksByState(TaskStateCode.PLANNED, finalLastTaskId);
    }

    private /* synthetic */ List lambda$propagateBlockedChildrenToReady$13(long finalLastParentId) throws RuntimeException, RuntimeException, RuntimeException, RuntimeException {
        return this.sm.findDirectParentsOfBlockedTasks(finalLastParentId);
    }

    private static /* synthetic */ Boolean lambda$runWhile$9(BooleanSupplier cond) throws RuntimeException, RuntimeException, RuntimeException, RuntimeException {
        return !cond.getAsBoolean();
    }

    private /* synthetic */ StoredSessionAttemptWithSession lambda$submitTasks$3(int siteId, int projId, SessionAttempt attempt, AttemptRequest ar, WorkflowTaskList tasks, List resumingTasks, SessionControlStore store, StoredSession storedSession) throws ResourceConflictException, ResourceNotFoundException {
        StoredProject proj = this.rm.getProjectStore(siteId).getProjectById(projId);
        if (proj.getDeletedAt().isPresent()) {
            throw new ResourceNotFoundException(String.format(Locale.ENGLISH, "Project id={} name={} is already deleted", proj.getId(), proj.getName()));
        }
        StoredSessionAttempt storedAttempt = store.insertAttempt(storedSession.getId(), projId, attempt);
        logger.info("Starting a new session project id={} workflow name={} session_time={}", new Object[]{projId, ar.getWorkflowName(), SESSION_TIME_FORMATTER.withZone(ar.getTimeZone()).format(ar.getSessionTime())});
        StoredSessionAttemptWithSession storedAttemptWithSession = StoredSessionAttemptWithSession.of(siteId, storedSession, storedAttempt);
        try {
            this.storeTasks(store, storedAttemptWithSession, tasks, (List<ResumingTask>)resumingTasks, ar.getSessionMonitors());
        }
        catch (TaskLimitExceededException ex) {
            throw new WorkflowTaskLimitExceededException(ex);
        }
        return storedAttemptWithSession;
    }

    private class TaskQueuer
    implements AutoCloseable {
        private final Map<Long, Future<Void>> waiting = new ConcurrentHashMap<Long, Future<Void>>();
        private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("task-queuer-%d").build());

        @Override
        public void close() {
            this.executor.shutdown();
        }
    }

    private static class WorkflowTaskLimitExceededException
    extends RuntimeException {
        public WorkflowTaskLimitExceededException(TaskLimitExceededException cause) {
            super(cause);
        }

        @Override
        public TaskLimitExceededException getCause() {
            return (TaskLimitExceededException)super.getCause();
        }
    }

    public static interface WorkflowSubmitterAction<T> {
        public T call(WorkflowSubmitter var1) throws ResourceNotFoundException, AttemptLimitExceededException, SessionAttemptConflictException;
    }
}

