/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.engine.command;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBusCoordinator;
import org.apache.dolphinscheduler.server.master.engine.command.ICommandFetcher;
import org.apache.dolphinscheduler.server.master.engine.exceptions.CommandDuplicateHandleException;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.WorkflowExecutionRunnableFactory;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.service.command.CommandService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class CommandEngine
extends BaseDaemonThread
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommandEngine.class);
    @Autowired
    private ICommandFetcher commandFetcher;
    @Autowired
    private CommandService commandService;
    @Autowired
    private MasterConfig masterConfig;
    @Autowired
    private IWorkflowRepository workflowRepository;
    @Autowired
    private WorkflowExecutionRunnableFactory workflowExecutionRunnableFactory;
    @Autowired
    private MetricsProvider metricsProvider;
    @Autowired
    private WorkflowEventBusCoordinator workflowEventBusCoordinator;
    private ExecutorService commandHandleThreadPool;
    private boolean flag = false;

    protected CommandEngine() {
        super("MasterCommandLoopThread");
    }

    public synchronized void start() {
        log.info("MasterSchedulerBootstrap starting..");
        this.commandHandleThreadPool = ThreadUtils.newDaemonFixedThreadExecutor((String)"MasterCommandHandleThreadPool", (int)Runtime.getRuntime().availableProcessors());
        this.flag = true;
        super.start();
        log.info("MasterSchedulerBootstrap started...");
    }

    @Override
    public void close() throws Exception {
        log.info("MasterSchedulerBootstrap stopping...");
        this.flag = false;
        log.info("MasterSchedulerBootstrap stopped...");
    }

    public void run() {
        MasterServerLoadProtection serverLoadProtection = this.masterConfig.getServerLoadProtection();
        while (this.flag) {
            try {
                SystemMetrics systemMetrics = this.metricsProvider.getSystemMetrics();
                if (serverLoadProtection.isOverload(systemMetrics)) {
                    log.warn("The current server is overload, cannot consumes commands.");
                    MasterServerMetrics.incMasterOverload();
                    Thread.sleep(1000L);
                    continue;
                }
                List<Command> commands = this.commandFetcher.fetchCommands();
                if (CollectionUtils.isEmpty(commands)) {
                    Thread.sleep(1000L);
                    continue;
                }
                ArrayList<CompletionStage> allCompleteFutures = new ArrayList<CompletionStage>();
                for (Command command : commands) {
                    CompletionStage completableFuture = ((CompletableFuture)((CompletableFuture)this.bootstrapCommand(command).thenAccept(this::bootstrapWorkflowExecutionRunnable)).thenAccept(unused -> this.bootstrapSuccess(command))).exceptionally(throwable -> this.bootstrapError(command, (Throwable)throwable));
                    allCompleteFutures.add(completableFuture);
                }
                CompletableFuture.allOf(allCompleteFutures.toArray(new CompletableFuture[0])).join();
            }
            catch (InterruptedException interruptedException) {
                log.warn("Master schedule bootstrap interrupted, close the loop", (Throwable)interruptedException);
                Thread.currentThread().interrupt();
                break;
            }
            catch (Exception e) {
                log.error("Master schedule workflow error", (Throwable)e);
                ThreadUtils.sleep((long)1000L);
            }
        }
    }

    private CompletableFuture<IWorkflowExecutionRunnable> bootstrapCommand(Command command) {
        return CompletableFuture.supplyAsync(() -> this.workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command), this.commandHandleThreadPool);
    }

    private CompletableFuture<Void> bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable) {
        WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
        if (workflowInstance.getState() == WorkflowExecutionStatus.SERIAL_WAIT) {
            log.info("The workflow {} state is: {} will not be trigger now", (Object)workflowInstance.getName(), (Object)workflowInstance.getState());
            return CompletableFuture.completedFuture(null);
        }
        this.workflowRepository.put(workflowExecutionRunnable);
        this.workflowEventBusCoordinator.registerWorkflowEventBus(workflowExecutionRunnable);
        workflowExecutionRunnable.getWorkflowEventBus().publish(WorkflowStartLifecycleEvent.of(workflowExecutionRunnable));
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> bootstrapSuccess(Command command) {
        log.info("Success bootstrap command {}", (Object)JSONUtils.toPrettyJsonString((Object)command));
        MasterServerMetrics.incMasterConsumeCommand(1);
        return CompletableFuture.completedFuture(null);
    }

    private Void bootstrapError(Command command, Throwable throwable) {
        if (throwable instanceof CommandDuplicateHandleException) {
            log.warn("Handle command failed, the command: {} has been handled by other master", (Object)command, (Object)throwable);
            return null;
        }
        log.error("Failed bootstrap command {} ", (Object)JSONUtils.toPrettyJsonString((Object)command), (Object)throwable);
        this.commandService.moveToErrorCommand(command, ExceptionUtils.getStackTrace((Throwable)throwable));
        return null;
    }
}

