/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.engine.task.dispatcher;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.eventbus.AbstractDelayEvent;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.TaskDispatchableEventBus;
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerGroupDispatcher
extends BaseDaemonThread {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(WorkerGroupDispatcher.class);
    private final ITaskExecutorClient taskExecutorClient;
    private final TaskDispatchableEventBus<TaskDispatchableEvent<ITaskExecutionRunnable>, ITaskExecutionRunnable> workerGroupEventBus;
    private final Set<Integer> waitingDispatchTaskIds;
    private final AtomicBoolean runningFlag = new AtomicBoolean(false);

    public WorkerGroupDispatcher(String workerGroupName, ITaskExecutorClient taskExecutorClient) {
        super("WorkerGroupTaskDispatcher-" + workerGroupName);
        this.taskExecutorClient = taskExecutorClient;
        this.workerGroupEventBus = new TaskDispatchableEventBus();
        this.waitingDispatchTaskIds = ConcurrentHashMap.newKeySet();
        log.info("Initialize WorkerGroupDispatcher: {}", (Object)this.getName());
    }

    public synchronized void start() {
        if (this.runningFlag.compareAndSet(false, true)) {
            log.info("The {} starting...", (Object)this.getName());
            super.start();
            log.info("The {}  started", (Object)this.getName());
        } else {
            log.error("The {} status is {}, will not start again", (Object)this.getName(), (Object)this.runningFlag.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        while (this.runningFlag.get()) {
            AbstractDelayEvent taskEntry = this.workerGroupEventBus.take();
            ITaskExecutionRunnable taskExecutionRunnable = (ITaskExecutionRunnable)taskEntry.getData();
            try {
                TaskExecutorMDCUtils.MDCAutoClosable ignore = TaskExecutorMDCUtils.logWithMDC((int)taskExecutionRunnable.getId());
                Throwable throwable = null;
                try {
                    LogUtils.setWorkflowInstanceIdMDC((Integer)taskExecutionRunnable.getTaskInstance().getWorkflowInstanceId());
                    this.doDispatchTask(taskExecutionRunnable);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (ignore == null) continue;
                    if (throwable != null) {
                        try {
                            ignore.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    ignore.close();
                }
            }
            finally {
                LogUtils.removeWorkflowInstanceIdMDC();
            }
        }
    }

    private void doDispatchTask(ITaskExecutionRunnable taskExecutionRunnable) {
        try {
            if (!this.waitingDispatchTaskIds.remove(taskExecutionRunnable.getId())) {
                log.info("The task: {} doesn't exist in waitingDispatchTaskIds(it might be paused or killed), will skip dispatch", (Object)taskExecutionRunnable.getId());
                return;
            }
            this.taskExecutorClient.dispatch(taskExecutionRunnable);
        }
        catch (Exception e) {
            long waitingTimeMills = Math.min((long)taskExecutionRunnable.getTaskExecutionContext().increaseDispatchFailTimes() * 1000L, 60000L);
            this.dispatchTask(taskExecutionRunnable, waitingTimeMills);
            log.error("Dispatch Task: {} failed will retry after: {}/ms", new Object[]{taskExecutionRunnable.getId(), waitingTimeMills, e});
        }
    }

    public void dispatchTask(ITaskExecutionRunnable taskExecutionRunnable, long delayTimeMills) {
        this.waitingDispatchTaskIds.add(taskExecutionRunnable.getId());
        this.workerGroupEventBus.add(new TaskDispatchableEvent<ITaskExecutionRunnable>(delayTimeMills, taskExecutionRunnable));
    }

    public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
        return this.waitingDispatchTaskIds.remove(taskExecutionRunnable.getId());
    }

    public boolean existTask(ITaskExecutionRunnable taskExecutionRunnable) {
        return this.waitingDispatchTaskIds.contains(taskExecutionRunnable.getId());
    }

    public synchronized void close() {
        if (this.runningFlag.compareAndSet(true, false)) {
            log.info("WorkerGroupDispatcher {} closed", (Object)this.getName());
        } else {
            log.warn("The WorkerGroupDispatcher: {} doesn't started", (Object)this.getName());
        }
    }

    int queueSize() {
        return this.workerGroupEventBus.size();
    }
}

