/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.worker.runner;

import java.util.Map;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RetryReportTaskStatusThread
implements Runnable {
    private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
    @Autowired
    WorkerConfig workerConfig;
    private final TaskCallbackService taskCallbackService = (TaskCallbackService)SpringApplicationContext.getBean(TaskCallbackService.class);

    public void start() {
        Thread thread = new Thread((Runnable)this, "RetryReportTaskStatusThread");
        thread.setDaemon(true);
        thread.start();
    }

    @Override
    public void run() {
        ResponceCache responceCache = ResponceCache.get();
        while (Stopper.isRunning()) {
            ThreadUtils.sleep((long)(this.workerConfig.getRetryReportTaskStatusInterval() * 1000));
            try {
                Integer taskInstanceId;
                if (!responceCache.getAckCache().isEmpty()) {
                    Map<Integer, Command> ackCache = responceCache.getAckCache();
                    for (Map.Entry<Integer, Command> entry : ackCache.entrySet()) {
                        taskInstanceId = entry.getKey();
                        Command ackCommand = entry.getValue();
                        this.taskCallbackService.sendAck(taskInstanceId, ackCommand);
                    }
                }
                if (!responceCache.getResponseCache().isEmpty()) {
                    Map<Integer, Command> responseCache = responceCache.getResponseCache();
                    for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
                        taskInstanceId = entry.getKey();
                        Command responseCommand = entry.getValue();
                        this.taskCallbackService.sendResult(taskInstanceId, responseCommand);
                    }
                }
                if (responceCache.getKillResponseCache().isEmpty()) continue;
                Map<Integer, Command> killResponseCache = responceCache.getKillResponseCache();
                for (Map.Entry<Integer, Command> entry : killResponseCache.entrySet()) {
                    taskInstanceId = entry.getKey();
                    Command killResponseCommand = entry.getValue();
                    this.taskCallbackService.sendResult(taskInstanceId, killResponseCommand);
                }
            }
            catch (Exception e) {
                this.logger.warn("retry report task status error", (Throwable)e);
            }
        }
    }
}

