/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.service;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.ErrorCode;
import org.apache.kylin.common.exception.ExceptionReason;
import org.apache.kylin.common.exception.ExceptionResolve;
import org.apache.kylin.common.exception.JobErrorCode;
import org.apache.kylin.common.exception.JobExceptionReason;
import org.apache.kylin.common.exception.JobExceptionResolve;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.code.ErrorCodeProducer;
import org.apache.kylin.common.exception.code.ErrorCodeServer;
import org.apache.kylin.common.logging.SetLogCategory;
import org.apache.kylin.common.mail.MailNotificationType;
import org.apache.kylin.common.metrics.MetricsCategory;
import org.apache.kylin.common.metrics.MetricsGroup;
import org.apache.kylin.common.metrics.MetricsName;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.scheduler.JobDiscardNotifier;
import org.apache.kylin.common.scheduler.SchedulerEventNotifier;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.spark.job.NSparkExecutable;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.JobActionEnum;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStatusUtil;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.dao.JobInfoDao;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ChainedExecutable;
import org.apache.kylin.job.execution.ChainedStageExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.JobSchedulerModeEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.execution.StageExecutable;
import org.apache.kylin.job.rest.JobFilter;
import org.apache.kylin.job.rest.JobMapperFilter;
import org.apache.kylin.job.util.JobContextUtil;
import org.apache.kylin.job.util.JobInfoUtil;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.model.FusionModel;
import org.apache.kylin.metadata.model.FusionModelManager;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.aspect.Transaction;
import org.apache.kylin.rest.request.JobUpdateRequest;
import org.apache.kylin.rest.request.SparkJobUpdateRequest;
import org.apache.kylin.rest.response.ExecutableResponse;
import org.apache.kylin.rest.response.ExecutableStepResponse;
import org.apache.kylin.rest.service.BasicService;
import org.apache.kylin.rest.service.JobSupporter;
import org.apache.kylin.rest.service.ModelService;
import org.apache.kylin.rest.service.ProjectService;
import org.apache.kylin.rest.service.TableExtService;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.JobDriverUIUtil;
import org.apache.kylin.rest.util.JobFilterUtil;
import org.apache.kylin.rest.util.SparkHistoryUIUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service(value="jobInfoService")
public class JobInfoService
extends BasicService
implements JobSupporter {
    private static final Logger logger = LoggerFactory.getLogger((String)"build");
    private static final String PARSE_ERROR_MSG = "Error parsing the executablePO: ";
    public static final String EXCEPTION_CODE_PATH = "exception_to_code.json";
    public static final String EXCEPTION_CODE_DEFAULT = "KE-030001000";
    private static final Map<String, String> JOB_TYPE_MAP = Maps.newHashMap();
    @Autowired
    private ProjectService projectService;
    @Autowired
    private JobInfoDao jobInfoDao;
    private AclEvaluate aclEvaluate;
    @Autowired
    private ModelService modelService;
    @Autowired
    private TableExtService tableExtService;

    @Autowired
    public JobInfoService setAclEvaluate(AclEvaluate aclEvaluate) {
        this.aclEvaluate = aclEvaluate;
        return this;
    }

    public void checkJobStatus(List<String> jobStatuses) {
        if (CollectionUtils.isEmpty(jobStatuses)) {
            return;
        }
        jobStatuses.forEach(this::checkJobStatus);
    }

    public void checkJobStatus(String jobStatus) {
        if (Objects.isNull(JobStatusEnum.getByName((String)jobStatus))) {
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.JOB_STATUS_ILLEGAL, new Object[0]);
        }
    }

    public List<JobStatusEnum> parseJobStatus(List<String> strValues) {
        return strValues.stream().map(strValue -> JobStatusEnum.valueOf((String)strValue)).collect(Collectors.toList());
    }

    public void checkJobStatusAndAction(String jobStatus, String action) {
        this.checkJobStatus(jobStatus);
        JobActionEnum.validateValue((String)action);
        JobStatusEnum jobStatusEnum = JobStatusEnum.valueOf((String)jobStatus);
        if (!jobStatusEnum.checkAction(JobActionEnum.valueOf((String)action))) {
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.JOB_ACTION_ILLEGAL, new Object[]{jobStatus, jobStatusEnum.getValidActions()});
        }
    }

    public void checkJobStatusAndAction(JobUpdateRequest jobUpdateRequest) {
        List<String> jobIds = jobUpdateRequest.getJobIds();
        List<Object> jobStatuses = jobUpdateRequest.getStatuses() == null ? Lists.newArrayList() : jobUpdateRequest.getStatuses();
        jobIds.stream().map(this::getJobInstance).map(ExecutableResponse::getStatus).map(Enum::toString).forEach(jobStatuses::add);
        this.checkJobStatusAndAction(jobStatuses, jobUpdateRequest.getAction());
    }

    private void checkJobStatusAndAction(List<String> jobStatuses, String action) {
        if (CollectionUtils.isEmpty(jobStatuses)) {
            return;
        }
        for (String jobStatus : jobStatuses) {
            this.checkJobStatusAndAction(jobStatus, action);
        }
    }

    public ExecutableResponse getJobInstance(String jobId) {
        Preconditions.checkNotNull((Object)jobId);
        ExecutablePO executablePO = this.jobInfoDao.getExecutablePOByUuid(jobId);
        if (executablePO == null) {
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.JOB_NOT_EXIST, new Object[]{jobId});
        }
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, executablePO.getProject());
        AbstractExecutable executable = executableManager.fromPO(executablePO);
        return this.convert(executable, executablePO);
    }

    public String getProjectByJobId(String jobId) {
        Preconditions.checkNotNull((Object)jobId);
        for (ProjectInstance projectInstance : this.getReadableProjects()) {
            ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, projectInstance.getName());
            if (!Objects.nonNull(executableManager.getJob(jobId))) continue;
            return projectInstance.getName();
        }
        return null;
    }

    @VisibleForTesting
    public List<ProjectInstance> getReadableProjects() {
        return this.projectService.getReadableProjects(null, false);
    }

    public List<ExecutableResponse> listJobs(JobFilter jobFilter) {
        return this.listJobs(jobFilter, -1, -1);
    }

    public List<ExecutableResponse> listJobs(JobFilter jobFilter, int offset, int limit) {
        if (StringUtils.isNotEmpty((CharSequence)jobFilter.getProject())) {
            this.aclEvaluate.checkProjectOperationPermission(jobFilter.getProject());
        }
        JobMapperFilter jobMapperFilter = JobFilterUtil.getJobMapperFilter(jobFilter, offset, limit, this.modelService, this.tableExtService, this.projectService);
        List jobInfoList = this.jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
        List<ExecutableResponse> result = jobInfoList.stream().map(JobInfoUtil::deserializeExecutablePO).map(executablePO -> {
            AbstractExecutable executable = ((ExecutableManager)this.getManager(ExecutableManager.class, executablePO.getProject())).fromPO(executablePO);
            ExecutableResponse convert = this.convert(executable, (ExecutablePO)executablePO);
            return convert;
        }).collect(Collectors.toList());
        this.sortByDurationIfNeed(result, jobFilter.getSortBy(), jobMapperFilter.getOrderType());
        return result;
    }

    public void sortByDurationIfNeed(List<ExecutableResponse> list, String orderByField, String orderType) {
        Comparator comparator = null;
        if ("duration".equalsIgnoreCase(orderByField)) {
            comparator = (o1, o2) -> (int)("ASC".equalsIgnoreCase(orderType) ? o1.getDuration() - o2.getDuration() : o2.getDuration() - o1.getDuration());
        } else if ("total_duration".equalsIgnoreCase(orderByField)) {
            comparator = (o1, o2) -> (int)("ASC".equalsIgnoreCase(orderType) ? o1.getTotalDuration() - o2.getTotalDuration() : o2.getTotalDuration() - o1.getTotalDuration());
        } else {
            return;
        }
        list.sort(comparator);
    }

    public long countJobs(JobFilter jobFilter) {
        if (StringUtils.isNotEmpty((CharSequence)jobFilter.getProject())) {
            this.aclEvaluate.checkProjectOperationPermission(jobFilter.getProject());
        }
        JobMapperFilter jobMapperFilter = JobFilterUtil.getJobMapperFilter(jobFilter, 0, 0, this.modelService, this.tableExtService, this.projectService);
        return this.jobInfoDao.countByFilter(jobMapperFilter);
    }

    public List<ExecutableStepResponse> getJobDetail(String project, String jobId) {
        Map waiteTimeMap;
        this.aclEvaluate.checkProjectOperationPermission(project);
        ExecutablePO executablePO = this.jobInfoDao.getExecutablePOByUuid(jobId);
        if (executablePO == null) {
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.JOB_NOT_EXIST, new Object[]{jobId});
        }
        AbstractExecutable executable = null;
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        try {
            executable = executableManager.fromPO(executablePO);
        }
        catch (Exception e) {
            logger.error(PARSE_ERROR_MSG, (Throwable)e);
            return Collections.emptyList();
        }
        Output output = executable.getOutput(executablePO);
        try (SetLogCategory ignored = new SetLogCategory("build");){
            waiteTimeMap = JsonUtil.readValueAsMap((String)output.getExtra().getOrDefault("waiteTime", "{}"));
        }
        catch (IOException e) {
            logger.error(e.getMessage(), (Throwable)e);
            waiteTimeMap = Maps.newHashMap();
        }
        String targetSubject = executable.getTargetSubject();
        ArrayList<ExecutableStepResponse> executableStepList = new ArrayList<ExecutableStepResponse>();
        List tasks = ((ChainedExecutable)executable).getTasks();
        for (AbstractExecutable task : tasks) {
            ExecutableStepResponse executableStepResponse2 = this.parseToExecutableStep(task, executablePO, waiteTimeMap, output.getState());
            if (task.getStatusInMem() == ExecutableState.ERROR) {
                executableStepResponse2.setFailedStepId(output.getFailedStepId());
                executableStepResponse2.setFailedSegmentId(output.getFailedSegmentId());
                executableStepResponse2.setFailedStack(output.getFailedStack());
                executableStepResponse2.setFailedStepName(task.getName());
                this.setExceptionResolveAndCodeAndReason(output, executableStepResponse2);
            }
            if (executable.getJobSchedulerMode() == JobSchedulerModeEnum.DAG && task.getStatus() == ExecutableState.ERROR && !StringUtils.startsWith((CharSequence)output.getFailedStepId(), (CharSequence)task.getId())) {
                executableStepResponse2.setStatus(JobStatusEnum.STOPPED);
            }
            if (task instanceof ChainedStageExecutable) {
                Map stagesMap = Optional.ofNullable(((ChainedStageExecutable)task).getStagesMap()).orElse(Maps.newHashMap());
                HashMap stringSubStageMap = Maps.newHashMap();
                ArrayList subStages = Lists.newArrayList();
                for (Map.Entry entry : stagesMap.entrySet()) {
                    String segmentId = (String)entry.getKey();
                    ExecutableStepResponse.SubStages segmentSubStages = new ExecutableStepResponse.SubStages();
                    List stageExecutables = Optional.ofNullable(entry.getValue()).orElse(Lists.newArrayList());
                    ArrayList stageResponses = Lists.newArrayList();
                    for (StageExecutable stage : stageExecutables) {
                        ExecutableStepResponse stageResponse = this.parseStageToExecutableStep(task, stage, executableManager.getOutput(stage.getId(), segmentId));
                        if (executable.getJobSchedulerMode() == JobSchedulerModeEnum.DAG && stage.getStatus(segmentId) == ExecutableState.ERROR && !StringUtils.startsWith((CharSequence)output.getFailedStepId(), (CharSequence)stage.getId())) {
                            stageResponse.setStatus(JobStatusEnum.STOPPED);
                        }
                        this.setStage(subStages, stageResponse);
                        stageResponses.add(stageResponse);
                        if (!StringUtils.equals((CharSequence)output.getFailedStepId(), (CharSequence)stage.getId())) continue;
                        executableStepResponse2.setFailedStepName(stage.getName());
                    }
                    if (StringUtils.equals((CharSequence)task.getId(), (CharSequence)segmentId)) continue;
                    this.setSegmentSubStageParams(project, targetSubject, task, segmentId, segmentSubStages, stageExecutables, stageResponses, waiteTimeMap, output.getState(), executablePO);
                    stringSubStageMap.put(segmentId, segmentSubStages);
                }
                if (MapUtils.isNotEmpty((Map)stringSubStageMap)) {
                    executableStepResponse2.setSegmentSubStages(stringSubStageMap);
                }
                if (CollectionUtils.isNotEmpty((Collection)subStages)) {
                    executableStepResponse2.setSubStages(subStages);
                    if (MapUtils.isEmpty((Map)stringSubStageMap) || stringSubStageMap.size() == 1) {
                        long taskDuration = subStages.stream().map(ExecutableStepResponse::getDuration).mapToLong(Long::valueOf).sum();
                        executableStepResponse2.setDuration(taskDuration);
                    }
                }
            }
            executableStepList.add(executableStepResponse2);
        }
        if (executable.getStatusInMem() == ExecutableState.DISCARDED) {
            executableStepList.forEach(executableStepResponse -> {
                executableStepResponse.setStatus(JobStatusEnum.DISCARDED);
                Optional.ofNullable(executableStepResponse.getSubStages()).orElse(Lists.newArrayList()).forEach(subtask -> subtask.setStatus(JobStatusEnum.DISCARDED));
                Map<String, ExecutableStepResponse.SubStages> subStageMap = Optional.ofNullable(executableStepResponse.getSegmentSubStages()).orElse(Maps.newHashMap());
                for (Map.Entry<String, ExecutableStepResponse.SubStages> entry : subStageMap.entrySet()) {
                    entry.getValue().getStage().forEach(stage -> stage.setStatus(JobStatusEnum.DISCARDED));
                }
            });
        }
        return executableStepList;
    }

    public void batchDropJob(String project, List<String> jobIds, List<String> filterStatuses) {
        this.aclEvaluate.checkProjectOperationPermission(project);
        List<AbstractExecutable> jobs = this.getJobsByStatus(project, jobIds, filterStatuses);
        this.batchDropJob0(project, jobs);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#ae, 'ADMINISTRATION')")
    public void batchDropGlobalJob(List<String> jobIds, List<String> filterStatuses) {
        List<AbstractExecutable> jobs = this.getJobsByStatus(null, jobIds, filterStatuses);
        HashMap<String, ArrayList> projectJobMap = new HashMap<String, ArrayList>();
        for (AbstractExecutable job : jobs) {
            String project = job.getProject();
            if (!projectJobMap.containsKey(project)) {
                projectJobMap.put(project, Lists.newArrayList());
            }
            ((List)projectJobMap.get(project)).add(job);
        }
        projectJobMap.entrySet().stream().forEach(entry -> {
            String project = (String)entry.getKey();
            this.aclEvaluate.checkProjectOperationPermission(project);
            this.batchDropJob0(project, (List)entry.getValue());
        });
    }

    private void batchDropJob0(String project, List<AbstractExecutable> jobs) {
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        ArrayList jobIdsToBeDelete = Lists.newArrayList();
        jobs.forEach(job -> {
            executableManager.checkJobCanBeDeleted(job);
            jobIdsToBeDelete.add(job.getJobId());
        });
        JobContextUtil.withTxAndRetry(() -> {
            executableManager.deleteJobByIdList(jobIdsToBeDelete);
            return true;
        });
    }

    public void batchUpdateJobStatus(List<String> jobIds, String project, String action, List<String> filterStatuses) throws IOException {
        List filterStates = JobStatusUtil.mapJobStatusToScheduleState(filterStatuses);
        List executablePos = this.jobInfoDao.getExecutablePoByStatus(project, jobIds, filterStates);
        if (null == project) {
            executablePos.forEach(executablePO -> this.aclEvaluate.checkProjectOperationPermission(executablePO.getProject()));
        } else {
            this.aclEvaluate.checkProjectOperationPermission(project);
        }
        for (ExecutablePO executablePO2 : executablePos) {
            this.updateJobStatus(executablePO2.getId(), executablePO2, executablePO2.getProject(), action);
        }
    }

    @Transactional
    public ExecutableResponse manageJob(String project, ExecutableResponse job, String action) throws IOException {
        Preconditions.checkNotNull((Object)project);
        Preconditions.checkNotNull((Object)job);
        Preconditions.checkArgument((!StringUtils.isBlank((CharSequence)action) ? 1 : 0) != 0);
        if (JobActionEnum.DISCARD == JobActionEnum.valueOf((String)action)) {
            return job;
        }
        ExecutablePO executablePO = this.jobInfoDao.getExecutablePOByUuid(job.getId());
        this.updateJobStatus(job.getId(), executablePO, project, action);
        return this.getJobInstance(job.getId());
    }

    private void jobActionValidate(String action) {
        JobActionEnum.validateValue((String)action.toUpperCase(Locale.ROOT));
    }

    @VisibleForTesting
    public void jobActionValidateToTest(String action) {
        this.jobActionValidate(action);
    }

    @VisibleForTesting
    public void updateJobStatus(String jobId, ExecutablePO executablePO, String project, String action) throws IOException {
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        AbstractExecutable executable = executableManager.fromPO(executablePO);
        this.jobActionValidate(action);
        switch (JobActionEnum.valueOf((String)action.toUpperCase(Locale.ROOT))) {
            case RESUME: {
                executableManager.resumeJob(jobId);
                MetricsGroup.hostTagCounterInc((MetricsName)MetricsName.JOB_RESUMED, (MetricsCategory)MetricsCategory.PROJECT, (String)project);
                break;
            }
            case RESTART: {
                this.killExistApplication(executable);
                executableManager.restartJob(jobId);
                break;
            }
            case DISCARD: {
                this.discardJob(project, jobId, executable);
                JobTypeEnum jobTypeEnum = executableManager.getJob(jobId).getJobType();
                String jobType = jobTypeEnum == null ? "" : jobTypeEnum.name();
                EventBusFactory.getInstance().postAsync((SchedulerEventNotifier)new JobDiscardNotifier(project, jobType));
                break;
            }
            case PAUSE: {
                executableManager.pauseJob(jobId);
                this.killExistApplication(executable);
                break;
            }
            default: {
                throw new IllegalStateException("This job can not do this action: " + action);
            }
        }
    }

    public void updateJobError(String project, String jobId, String failedStepId, String failedSegmentId, String failedStack, String failedReason) {
        if (StringUtils.isBlank((CharSequence)failedStepId)) {
            return;
        }
        JobContextUtil.withTxAndRetry(() -> {
            ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
            executableManager.updateJobError(jobId, failedStepId, failedSegmentId, failedStack, failedReason);
            return true;
        });
    }

    public void updateStageStatus(String project, String taskId, String segmentId, String status, Map<String, String> updateInfo, String errMsg) {
        ExecutableState newStatus = this.convertToExecutableState(status);
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        executableManager.updateStageStatus(taskId, segmentId, newStatus, updateInfo, errMsg);
    }

    public ExecutableState convertToExecutableState(String status) {
        if (StringUtils.isBlank((CharSequence)status)) {
            return null;
        }
        return ExecutableState.valueOf((String)status);
    }

    public void updateSparkJobInfo(SparkJobUpdateRequest request) {
        if (request.getJobId().contains("ASYNC-QUERY-")) {
            return;
        }
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, request.getProject());
        HashMap extraInfo = Maps.newHashMap();
        extraInfo.put("yarn_application_id", request.getYarnAppId());
        extraInfo.put("yarn_application_tracking_url", request.getYarnAppUrl());
        extraInfo.put("queue_name", request.getQueueName());
        extraInfo.put("cores", request.getCores());
        extraInfo.put("memory", request.getMemory());
        JobContextUtil.withTxAndRetry(() -> {
            executableManager.updateJobOutput(request.getTaskId(), null, extraInfo, null, null);
            return true;
        });
    }

    public void updateSparkTimeInfo(String project, String jobId, String taskId, String waitTime, String buildTime) {
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        HashMap extraInfo = Maps.newHashMap();
        extraInfo.put("yarn_job_wait_time", waitTime);
        extraInfo.put("yarn_job_run_time", buildTime);
        if (jobId.contains("ASYNC-QUERY-")) {
            return;
        }
        JobContextUtil.withTxAndRetry(() -> {
            executableManager.updateJobOutput(taskId, null, extraInfo, null, null);
            return true;
        });
    }

    public String getJobOutput(String project, String jobId) {
        return this.getJobOutput(project, jobId, jobId);
    }

    public String getJobOutput(String project, String jobId, String stepId) {
        this.aclEvaluate.checkProjectOperationPermission(project);
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
    }

    public void killExistApplication(String project, String jobId) {
        AbstractExecutable job = ((ExecutableManager)this.getManager(ExecutableManager.class, project)).getJob(jobId);
        this.killExistApplication(job);
    }

    public void killExistApplication(AbstractExecutable job) {
        if (job instanceof ChainedExecutable) {
            ((ChainedExecutable)job).getTasks().stream().filter(task -> task.getStatusInMem() == ExecutableState.RUNNING).filter(task -> task instanceof NSparkExecutable).forEach(task -> ((NSparkExecutable)task).killOrphanApplicationIfExists(task.getId()));
        }
    }

    private void discardJob(String project, String jobId, AbstractExecutable job) {
        if (ExecutableState.SUCCEED == job.getStatusInMem()) {
            throw new KylinException((ErrorCodeProducer)ErrorCodeServer.JOB_UPDATE_STATUS_FAILED, new Object[]{"DISCARD", jobId, job.getStatusInMem()});
        }
        if (ExecutableState.DISCARDED == job.getStatusInMem()) {
            return;
        }
        this.killExistApplication(job);
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            ((ExecutableManager)this.getManager(ExecutableManager.class, project)).discardJob(job.getId());
            return true;
        }, (String)project);
        if (this.getConfig().isMailEnabled()) {
            job.notifyUser(MailNotificationType.JOB_DISCARDED);
        }
    }

    private List<AbstractExecutable> getJobsByStatus(String project, List<String> jobIds, List<String> filterStatuses) {
        List filterStates = JobStatusUtil.mapJobStatusToScheduleState(filterStatuses);
        return this.jobInfoDao.getExecutablePoByStatus(project, jobIds, filterStates).stream().map(executablePO -> ((ExecutableManager)this.getManager(ExecutableManager.class, executablePO.getProject())).fromPO(executablePO)).collect(Collectors.toList());
    }

    @VisibleForTesting
    public ExecutableResponse convert(AbstractExecutable executable, ExecutablePO executablePO) {
        ExecutableResponse executableResponse = ExecutableResponse.create(executable, executablePO);
        executableResponse.setSchedulerState(executable.getStatusInMem());
        executableResponse.setStatus(executable.getStatusInMem().toJobStatus());
        return executableResponse;
    }

    @VisibleForTesting
    public ExecutableStepResponse parseToExecutableStep(AbstractExecutable task, ExecutablePO po, Map<String, String> waiteTimeMap, ExecutableState jobState) {
        long taskWaitTime;
        ExecutableStepResponse result = new ExecutableStepResponse();
        result.setId(task.getId());
        result.setName(task.getName());
        result.setSequenceID(task.getStepId());
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, task.getProject());
        Output stepOutput = executableManager.getOutput(task.getId(), po);
        if (stepOutput == null) {
            try (SetLogCategory ignored = new SetLogCategory("build");){
                logger.warn("Cannot found output for task: id={}", (Object)task.getId());
            }
            return result;
        }
        result.setSchedulerState(stepOutput.getState());
        result.setStatus(stepOutput.getState().toJobStatus());
        for (Map.Entry entry : stepOutput.getExtra().entrySet()) {
            if (entry.getKey() == null || entry.getValue() == null) continue;
            result.putInfo((String)entry.getKey(), (String)entry.getValue());
        }
        if (KylinConfig.getInstanceFromEnv().isHistoryServerEnable() && result.getInfo().containsKey("yarn_application_id")) {
            result.putInfo("spark_history_application_tracking_url", SparkHistoryUIUtil.getHistoryTrackerUrl(result.getInfo().get("yarn_application_id")));
        }
        if (result.getInfo().containsKey("yarn_application_tracking_url")) {
            result.putInfo("proxy_application_tracking_url", JobDriverUIUtil.getProxyUrl(task.getProject(), task.getId()));
            NProjectManager projectManager = NProjectManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv());
            ProjectInstance prjInstance = projectManager.getProject(task.getProject());
            if (prjInstance != null && prjInstance.getConfig().isProxyJobSparkUIEnabled()) {
                result.putInfo("yarn_application_tracking_url", JobDriverUIUtil.getProxyUrl(task.getProject(), task.getId()));
            }
        }
        result.setExecStartTime(AbstractExecutable.getStartTime((Output)stepOutput));
        result.setExecEndTime(AbstractExecutable.getEndTime((Output)stepOutput));
        result.setCreateTime(AbstractExecutable.getCreateTime((Output)stepOutput));
        result.setDuration(AbstractExecutable.getDuration((Output)stepOutput));
        long waiteTime = Long.parseLong(waiteTimeMap.getOrDefault(task.getId(), "0"));
        if (jobState != ExecutableState.PAUSED && (taskWaitTime = task.getWaitTime(po)) != waiteTime) {
            waiteTime = taskWaitTime + waiteTime;
        }
        result.setWaitTime(waiteTime);
        if (task instanceof ShellExecutable) {
            result.setExecCmd(((ShellExecutable)task).getCmd());
        }
        result.setShortErrMsg(stepOutput.getShortErrMsg());
        result.setPreviousStep(task.getPreviousStep());
        result.setNextSteps(task.getNextSteps());
        return result;
    }

    public void setExceptionResolveAndCodeAndReason(Output output, ExecutableStepResponse executableStepResponse) {
        try (SetLogCategory ignored = new SetLogCategory("build");){
            String exceptionCode = this.getExceptionCode(output);
            executableStepResponse.setFailedResolve(ExceptionResolve.getResolve((String)exceptionCode));
            executableStepResponse.setFailedCode(ErrorCode.getLocalizedString((String)exceptionCode));
            if (StringUtils.equals((CharSequence)exceptionCode, (CharSequence)EXCEPTION_CODE_DEFAULT)) {
                String reason = StringUtils.isBlank((CharSequence)output.getFailedReason()) ? JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason() : JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason() + ": " + output.getFailedReason();
                executableStepResponse.setFailedReason(reason);
            } else {
                executableStepResponse.setFailedReason(ExceptionReason.getReason((String)exceptionCode));
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
            executableStepResponse.setFailedResolve(JobExceptionResolve.JOB_BUILDING_ERROR.toExceptionResolve().getResolve());
            executableStepResponse.setFailedCode(JobErrorCode.JOB_BUILDING_ERROR.toErrorCode().getLocalizedString());
            executableStepResponse.setFailedReason(JobExceptionReason.JOB_BUILDING_ERROR.toExceptionReason().getReason());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public String getExceptionCode(Output output) {
        try (SetLogCategory ignored = new SetLogCategory("build");){
            String exceptionOrExceptionMessage = output.getFailedReason();
            if (StringUtils.isBlank((CharSequence)exceptionOrExceptionMessage)) {
                if (StringUtils.isBlank((CharSequence)output.getFailedStack())) {
                    String string = EXCEPTION_CODE_DEFAULT;
                    return string;
                }
                exceptionOrExceptionMessage = output.getFailedStack().split("\n")[0];
            }
            InputStream exceptionCodeStream = ((Object)((Object)this)).getClass().getClassLoader().getResource(EXCEPTION_CODE_PATH).openStream();
            Map exceptionCodes = (Map)JsonUtil.readValue((InputStream)exceptionCodeStream, Map.class);
            for (Map.Entry o : exceptionCodes.entrySet()) {
                Map.Entry exceptionCode = o;
                if (!StringUtils.contains((CharSequence)exceptionOrExceptionMessage, (CharSequence)String.valueOf(exceptionCode.getKey())) && !StringUtils.contains((CharSequence)String.valueOf(exceptionCode.getKey()), (CharSequence)exceptionOrExceptionMessage)) continue;
                String code = exceptionCodes.getOrDefault(exceptionCode.getKey(), EXCEPTION_CODE_DEFAULT);
                String string = String.valueOf(code);
                return string;
            }
            String string = EXCEPTION_CODE_DEFAULT;
            return string;
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
            return EXCEPTION_CODE_DEFAULT;
        }
    }

    private ExecutableStepResponse parseStageToExecutableStep(AbstractExecutable task, StageExecutable stageExecutable, Output stageOutput) {
        ExecutableStepResponse result = new ExecutableStepResponse();
        result.setId(stageExecutable.getId());
        result.setName(stageExecutable.getName());
        result.setSequenceID(stageExecutable.getStepId());
        if (stageOutput == null) {
            try (SetLogCategory ignored = new SetLogCategory("build");){
                logger.warn("Cannot found output for task: id={}", (Object)stageExecutable.getId());
            }
            return result;
        }
        for (Map.Entry entry : stageOutput.getExtra().entrySet()) {
            if (entry.getKey() == null || entry.getValue() == null) continue;
            result.putInfo((String)entry.getKey(), (String)entry.getValue());
        }
        result.setSchedulerState(stageOutput.getState());
        result.setStatus(stageOutput.getState().toJobStatus());
        result.setExecStartTime(AbstractExecutable.getStartTime((Output)stageOutput));
        result.setExecEndTime(AbstractExecutable.getEndTime((Output)stageOutput));
        result.setCreateTime(AbstractExecutable.getCreateTime((Output)stageOutput));
        result.setDuration(AbstractExecutable.getStageDuration((Output)stageOutput, (AbstractExecutable)task.getParent()));
        String indexCount = Optional.ofNullable(task.getParam("indexCount")).orElse("0");
        result.setIndexCount(Long.parseLong(indexCount));
        if (result.getStatus() == JobStatusEnum.FINISHED) {
            result.setSuccessIndexCount(Long.parseLong(indexCount));
        } else {
            String successIndexCount = stageOutput.getExtra().getOrDefault("indexSuccessCount", "0");
            result.setSuccessIndexCount(Long.parseLong(successIndexCount));
        }
        return result;
    }

    private void setStage(List<ExecutableStepResponse> responses, ExecutableStepResponse newResponse) {
        ExecutableStepResponse oldResponse = responses.stream().filter(response -> response.getId().equals(newResponse.getId())).findFirst().orElse(null);
        if (null != oldResponse) {
            HashSet jobStatusEnums = Sets.newHashSet((Object[])new JobStatusEnum[]{JobStatusEnum.ERROR, JobStatusEnum.STOPPED, JobStatusEnum.DISCARDED});
            HashSet jobFinishOrSkip = Sets.newHashSet((Object[])new JobStatusEnum[]{JobStatusEnum.FINISHED, JobStatusEnum.SKIP, JobStatusEnum.WARNING});
            if (oldResponse.getStatus() != newResponse.getStatus() && !jobStatusEnums.contains(oldResponse.getStatus())) {
                if (jobStatusEnums.contains(newResponse.getStatus())) {
                    oldResponse.setStatus(newResponse.getStatus());
                } else if (jobFinishOrSkip.contains(newResponse.getStatus()) && jobFinishOrSkip.contains(oldResponse.getStatus())) {
                    oldResponse.setStatus(JobStatusEnum.FINISHED);
                } else {
                    oldResponse.setStatus(JobStatusEnum.RUNNING);
                }
            }
            if (newResponse.getExecStartTime() != 0L) {
                oldResponse.setExecStartTime(Math.min(newResponse.getExecStartTime(), oldResponse.getExecStartTime()));
            }
            oldResponse.setExecEndTime(Math.max(newResponse.getExecEndTime(), oldResponse.getExecEndTime()));
            long successIndex = oldResponse.getSuccessIndexCount() + newResponse.getSuccessIndexCount();
            oldResponse.setSuccessIndexCount(successIndex);
            long index = oldResponse.getIndexCount() + newResponse.getIndexCount();
            oldResponse.setIndexCount(index);
        } else {
            ExecutableStepResponse res = new ExecutableStepResponse();
            res.setId(newResponse.getId());
            res.setName(newResponse.getName());
            res.setSequenceID(newResponse.getSequenceID());
            res.setExecStartTime(newResponse.getExecStartTime());
            res.setExecEndTime(newResponse.getExecEndTime());
            res.setDuration(newResponse.getDuration());
            res.setWaitTime(newResponse.getWaitTime());
            res.setIndexCount(newResponse.getIndexCount());
            res.setSuccessIndexCount(newResponse.getSuccessIndexCount());
            res.setStatus(newResponse.getStatus());
            res.setCmdType(newResponse.getCmdType());
            responses.add(res);
        }
    }

    private void setSegmentSubStageParams(String project, String targetSubject, AbstractExecutable task, String segmentId, ExecutableStepResponse.SubStages segmentSubStages, List<StageExecutable> stageExecutables, List<ExecutableStepResponse> stageResponses, Map<String, String> waiteTimeMap, ExecutableState jobState, ExecutablePO executablePO) {
        segmentSubStages.setStage(stageResponses);
        if (CollectionUtils.isNotEmpty(stageResponses)) {
            long taskStartTime = task.getOutput(executablePO).getStartTime();
            long firstStageStartTime = stageResponses.get(0).getExecStartTime();
            if (taskStartTime != 0L && firstStageStartTime == 0L) {
                firstStageStartTime = System.currentTimeMillis();
            }
            long waitTime = Long.parseLong(waiteTimeMap.getOrDefault(segmentId, "0"));
            if (jobState != ExecutableState.PAUSED) {
                waitTime = firstStageStartTime - taskStartTime + waitTime;
            }
            segmentSubStages.setWaitTime(waitTime);
        }
        Long execStartTime = stageResponses.stream().filter(ex -> ex.getStatus() != JobStatusEnum.PENDING).map(ExecutableStepResponse::getExecStartTime).min(Long::compare).orElse(0L);
        segmentSubStages.setExecStartTime(execStartTime);
        Set stageStatuses = stageResponses.stream().map(ExecutableStepResponse::getStatus).collect(Collectors.toSet());
        if (!(stageStatuses.contains(JobStatusEnum.RUNNING) || task.getStatusInMem() == ExecutableState.RUNNING && stageStatuses.contains(JobStatusEnum.PENDING))) {
            Long execEndTime = stageResponses.stream().map(ExecutableStepResponse::getExecEndTime).max(Long::compare).orElse(0L);
            segmentSubStages.setExecEndTime(execEndTime);
        }
        long segmentDuration = stageResponses.stream().map(ExecutableStepResponse::getDuration).mapToLong(Long::valueOf).sum();
        segmentSubStages.setDuration(segmentDuration);
        Segments segmentsByRange = this.modelService.getSegmentsByRange(targetSubject, project, "", "");
        NDataSegment segment = segmentsByRange.stream().filter(seg -> StringUtils.equals((CharSequence)seg.getId(), (CharSequence)segmentId)).findFirst().orElse(null);
        if (null != segment) {
            SegmentRange segRange = segment.getSegRange();
            segmentSubStages.setName(segment.getName());
            segmentSubStages.setStartTime(Long.parseLong(segRange.getStart().toString()));
            segmentSubStages.setEndTime(Long.parseLong(segRange.getEnd().toString()));
        }
        int stepCount = stageResponses.isEmpty() ? 1 : stageResponses.size();
        float stepRatio = (float)ExecutableResponse.calculateSuccessStage(task, segmentId, stageExecutables, true, executablePO) / (float)stepCount;
        segmentSubStages.setStepRatio(stepRatio);
        Optional<ExecutableStepResponse> warningStageRes = stageResponses.stream().filter(stageRes -> stageRes.getStatus() == JobStatusEnum.WARNING).findFirst();
        warningStageRes.ifPresent(res -> {
            String cfr_ignored_0 = segmentSubStages.getInfo().put("warning_code", res.getInfo().getOrDefault("warning_code", null));
        });
    }

    public InputStream getAllJobOutput(String project, String jobId, String stepId) {
        this.aclEvaluate.checkProjectOperationPermission(project);
        ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
        Output output = executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE);
        return Optional.ofNullable(output.getVerboseMsgStream()).orElse(IOUtils.toInputStream((String)Optional.ofNullable(output.getVerboseMsg()).orElse(""), (String)"UTF-8"));
    }

    public List<ExecutableResponse> addOldParams(List<ExecutableResponse> executableResponseList) {
        executableResponseList.forEach(executableResponse -> {
            ExecutableResponse.OldParams oldParams = new ExecutableResponse.OldParams();
            NDataModel nDataModel = ((NDataModelManager)this.getManager(NDataModelManager.class, executableResponse.getProject())).getDataModelDesc(executableResponse.getTargetModel());
            String modelName = Objects.isNull(nDataModel) ? null : nDataModel.getAlias();
            List<ExecutableStepResponse> stepResponseList = this.getJobDetail(executableResponse.getProject(), executableResponse.getId());
            stepResponseList.forEach(stepResponse -> {
                ExecutableStepResponse.OldParams stepOldParams = new ExecutableStepResponse.OldParams();
                stepOldParams.setExecWaitTime(stepResponse.getWaitTime());
                stepResponse.setOldParams(stepOldParams);
            });
            oldParams.setProjectName(executableResponse.getProject());
            oldParams.setRelatedCube(modelName);
            oldParams.setDisplayCubeName(modelName);
            oldParams.setUuid(executableResponse.getId());
            oldParams.setType(JOB_TYPE_MAP.get(executableResponse.getJobName()));
            oldParams.setName(executableResponse.getJobName());
            oldParams.setExecInterruptTime(0L);
            oldParams.setMrWaiting(executableResponse.getWaitTime());
            executableResponse.setOldParams(oldParams);
            executableResponse.setSteps(stepResponseList);
        });
        return executableResponseList;
    }

    public List<String> getFusionModelsByTableDesc(String project, TableDesc tableDesc) {
        ArrayList fusionModelIds = Lists.newArrayList();
        for (NDataModel tableRelatedModel : ((NDataflowManager)this.getManager(NDataflowManager.class, project)).getModelsUsingTable(tableDesc)) {
            String modelId = tableRelatedModel.getId();
            NDataModel model = ((NDataModelManager)this.getManager(NDataModelManager.class, project)).getDataModelDesc(modelId);
            FusionModelManager fusionModelManager = FusionModelManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
            FusionModel fusionModel = fusionModelManager.getFusionModel(modelId);
            if (model.isFusionModel() && Objects.nonNull(fusionModel)) {
                fusionModelIds.add(modelId);
            }
            SetLogCategory ignored = new SetLogCategory("build");
            Throwable throwable = null;
            try {
                logger.warn("model is not fusion model or fusion model is null, {}", (Object)modelId);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (ignored == null) continue;
                if (throwable != null) {
                    try {
                        ignored.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                ignored.close();
            }
        }
        return fusionModelIds;
    }

    public List<String> getBatchModelJobIdsOfFusionModel(String project, List<String> fusionModelIds) {
        ArrayList batchModelIds = Lists.newArrayList();
        for (String fusionModelId : fusionModelIds) {
            FusionModelManager fusionModelManager = FusionModelManager.getInstance((KylinConfig)KylinConfig.getInstanceFromEnv(), (String)project);
            FusionModel fusionModel = fusionModelManager.getFusionModel(fusionModelId);
            if (!Objects.nonNull(fusionModel)) continue;
            batchModelIds.add(fusionModel.getBatchModel().getUuid());
        }
        if (batchModelIds.isEmpty()) {
            return Lists.newArrayList();
        }
        JobMapperFilter jobMapperFilter = new JobMapperFilter();
        jobMapperFilter.setProject(project);
        jobMapperFilter.setModelIds((List)batchModelIds);
        ArrayList ignoreStates = Lists.newArrayList((Object[])new ExecutableState[]{ExecutableState.SUCCEED, ExecutableState.ERROR, ExecutableState.DISCARDED, ExecutableState.SUICIDAL});
        List states = Arrays.stream(ExecutableState.values()).filter(state -> !ignoreStates.contains(state)).collect(Collectors.toList());
        jobMapperFilter.setStatuses(states);
        List jobInfoList = this.jobInfoDao.getJobInfoListByFilter(jobMapperFilter);
        return jobInfoList.stream().map(jobInfo -> jobInfo.getJobId()).collect(Collectors.toList());
    }

    @Transaction(project=0)
    public void stopBatchJob(String project, TableDesc tableDesc) {
        List<String> fusionModelIds = this.getFusionModelsByTableDesc(project, tableDesc);
        List<String> jobIdList = this.getBatchModelJobIdsOfFusionModel(project, fusionModelIds);
        JobContextUtil.remoteDiscardJob((String)project, jobIdList);
    }

    public String getOriginTrackUrlByProjectAndStepId(String project, String stepId) {
        String trackUrl = null;
        try {
            ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
            Output stepOutput = executableManager.getOutput(stepId);
            trackUrl = (String)stepOutput.getExtra().get("yarn_application_tracking_url");
        }
        catch (Exception e) {
            logger.warn("get trackUrl failed", (Throwable)e);
        }
        return trackUrl;
    }

    public void checkSuicideJobOfModel(String project, String dataflowId) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
            executableManager.checkSuicideJobOfModel(project, dataflowId);
            return true;
        }, (String)project);
    }

    public void discardJobs(String project, List<String> jobIdList) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            ExecutableManager executableManager = (ExecutableManager)this.getManager(ExecutableManager.class, project);
            jobIdList.forEach(arg_0 -> ((ExecutableManager)executableManager).discardJob(arg_0));
            return true;
        }, (String)project);
    }

    static {
        JOB_TYPE_MAP.put("INDEX_REFRESH", "Refresh Data");
        JOB_TYPE_MAP.put("INDEX_MERGE", "Merge Data");
        JOB_TYPE_MAP.put("INDEX_BUILD", "Build Index");
        JOB_TYPE_MAP.put("INC_BUILD", "Load Data");
        JOB_TYPE_MAP.put("TABLE_SAMPLING", "Sample Table");
    }
}

