/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class GlobalTaskDispatchWaitingQueueLooper
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GlobalTaskDispatchWaitingQueueLooper.class);
    @Autowired
    private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
    @Autowired
    private ITaskExecutorClient taskExecutorClient;
    private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);

    public GlobalTaskDispatchWaitingQueueLooper() {
        super("GlobalTaskDispatchWaitingQueueLooper");
    }

    public synchronized void start() {
        if (!this.RUNNING_FLAG.compareAndSet(false, true)) {
            log.error("The GlobalTaskDispatchWaitingQueueLooper already started, will not start again");
            return;
        }
        log.info("GlobalTaskDispatchWaitingQueueLooper starting...");
        super.start();
        log.info("GlobalTaskDispatchWaitingQueueLooper started...");
    }

    public void run() {
        while (this.RUNNING_FLAG.get()) {
            this.doDispatch();
        }
    }

    void doDispatch() {
        ITaskExecutionRunnable taskExecutionRunnable = this.globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
        TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
        try {
            TaskExecutionStatus status = taskInstance.getState();
            if (status != TaskExecutionStatus.SUBMITTED_SUCCESS && status != TaskExecutionStatus.DELAY_EXECUTION) {
                log.warn("The TaskInstance {} state is : {}, will not dispatch", (Object)taskInstance.getName(), (Object)status);
                return;
            }
            this.taskExecutorClient.dispatch(taskExecutionRunnable);
        }
        catch (Exception e) {
            long waitingTimeMills = Math.min((long)taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1000L, 60000L);
            this.globalTaskDispatchWaitingQueue.dispatchTaskExecuteRunnableWithDelay(taskExecutionRunnable, waitingTimeMills);
            log.error("Dispatch Task: {} failed will retry after: {}/ms", new Object[]{taskInstance.getName(), waitingTimeMills, e});
        }
    }

    @Override
    public void close() throws Exception {
        if (this.RUNNING_FLAG.compareAndSet(true, false)) {
            log.info("GlobalTaskDispatchWaitingQueueLooper stopping...");
            log.info("GlobalTaskDispatchWaitingQueueLooper stopped...");
        } else {
            log.error("GlobalTaskDispatchWaitingQueueLooper is not started");
        }
    }
}

