/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.plugin.task.flink;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
import org.apache.dolphinscheduler.plugin.task.flink.FileUtils;
import org.apache.dolphinscheduler.plugin.task.flink.FlinkArgsUtils;
import org.apache.dolphinscheduler.plugin.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.plugin.task.flink.FlinkStreamParameters;
import org.apache.dolphinscheduler.plugin.task.flink.FlinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkStreamTask
extends FlinkTask
implements StreamTask {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FlinkStreamTask.class);
    private FlinkStreamParameters flinkParameters;
    private final TaskExecutionContext taskExecutionContext;

    public FlinkStreamTask(TaskExecutionContext taskExecutionContext) {
        super(taskExecutionContext);
        this.taskExecutionContext = taskExecutionContext;
    }

    public void init() {
        this.flinkParameters = (FlinkStreamParameters)((Object)JSONUtils.parseObject((String)this.taskExecutionContext.getTaskParams(), FlinkStreamParameters.class));
        log.info("Initialize Flink task params {}", (Object)JSONUtils.toPrettyJsonString((Object)((Object)this.flinkParameters)));
        if (this.flinkParameters == null || !this.flinkParameters.checkParameters()) {
            throw new RuntimeException("flink task params is not valid");
        }
        FileUtils.generateScriptFile((TaskExecutionContext)this.taskExecutionContext, (FlinkParameters)this.flinkParameters);
    }

    protected String getScript() {
        List args = FlinkArgsUtils.buildRunCommandLine((TaskExecutionContext)this.taskExecutionContext, (FlinkParameters)this.flinkParameters);
        return args.stream().collect(Collectors.joining(" "));
    }

    public AbstractParameters getParameters() {
        return this.flinkParameters;
    }

    public void cancelApplication() throws TaskException {
        List appIds = this.getApplicationIds();
        if (CollectionUtils.isEmpty((Collection)appIds)) {
            log.error("can not get appId, taskInstanceId:{}", (Object)this.taskExecutionContext.getTaskInstanceId());
            return;
        }
        this.taskExecutionContext.setAppIds(String.join((CharSequence)",", appIds));
        List args = FlinkArgsUtils.buildCancelCommandLine((TaskExecutionContext)this.taskExecutionContext);
        log.info("cancel application args:{}", (Object)args);
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        processBuilder.command(args);
        try {
            processBuilder.start();
        }
        catch (IOException e) {
            throw new TaskException("cancel application error", (Throwable)e);
        }
    }

    public void savePoint() throws Exception {
        List appIds = this.getApplicationIds();
        if (CollectionUtils.isEmpty((Collection)appIds)) {
            log.warn("can not get appId, taskInstanceId:{}", (Object)this.taskExecutionContext.getTaskInstanceId());
            return;
        }
        this.taskExecutionContext.setAppIds(String.join((CharSequence)",", appIds));
        List args = FlinkArgsUtils.buildSavePointCommandLine((TaskExecutionContext)this.taskExecutionContext);
        log.info("savepoint args:{}", (Object)args);
        ProcessBuilder processBuilder = new ProcessBuilder(new String[0]);
        processBuilder.command(args);
        processBuilder.start();
    }
}

