/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.plugin.flink;

import java.io.File;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.plugin.flink.FlinkClientService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
import org.apache.inlong.manager.plugin.util.FlinkServiceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkService {
    private static final Logger log = LoggerFactory.getLogger(FlinkService.class);
    private static final Pattern IP_PORT_PATTERN = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
    private final FlinkConfig flinkConfig;
    private final Integer parallelism;
    private final String savepointDirectory;
    private final Configuration configuration;
    private final FlinkClientService clientService;

    public FlinkService(String endpoint) throws Exception {
        Integer port;
        String address;
        FlinkConfiguration flinkConfiguration = new FlinkConfiguration();
        this.flinkConfig = flinkConfiguration.getFlinkConfig();
        this.parallelism = this.flinkConfig.getParallelism();
        this.savepointDirectory = this.flinkConfig.getSavepointDirectory();
        this.configuration = new Configuration();
        Integer jobManagerPort = this.flinkConfig.getJobManagerPort();
        this.configuration.setInteger(JobManagerOptions.PORT, jobManagerPort.intValue());
        if (StringUtils.isEmpty((CharSequence)endpoint)) {
            address = this.flinkConfig.getAddress();
            port = this.flinkConfig.getPort();
        } else {
            Map<String, String> ipPort = this.translateFromEndpoint(endpoint);
            if (ipPort.isEmpty()) {
                throw new BusinessException("get address:port failed from endpoint " + endpoint);
            }
            address = ipPort.get("address");
            port = Integer.valueOf(ipPort.get("port"));
        }
        this.configuration.setString(JobManagerOptions.ADDRESS, address);
        this.configuration.setInteger(RestOptions.PORT, port.intValue());
        this.clientService = (FlinkClientService)FlinkServiceUtils.getFlinkClientService(this.configuration, this.flinkConfig);
    }

    private Map<String, String> translateFromEndpoint(String endpoint) throws Exception {
        HashMap<String, String> map = new HashMap<String, String>(2);
        Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
        if (matcher.find()) {
            map.put("address", matcher.group(1));
            map.put("port", matcher.group(2));
            return map;
        }
        throw new Exception("endpoint [" + endpoint + "] was not match address:port");
    }

    public FlinkConfig getFlinkConfig() {
        return this.flinkConfig;
    }

    public JobStatus getJobStatus(String jobId) throws Exception {
        return this.clientService.getJobStatus(jobId);
    }

    public JobDetailsInfo getJobDetail(String jobId) throws Exception {
        return this.clientService.getJobDetail(jobId);
    }

    public String submit(FlinkInfo flinkInfo) throws Exception {
        try {
            SavepointRestoreSettings settings = SavepointRestoreSettings.none();
            return this.submitJobBySavepoint(flinkInfo, settings);
        }
        catch (Exception e) {
            log.error("submit job from info {} failed: ", (Object)flinkInfo, (Object)e);
            throw new Exception("submit job failed: " + e.getMessage());
        }
    }

    public String restore(FlinkInfo flinkInfo) throws Exception {
        try {
            if (StringUtils.isNotEmpty((CharSequence)flinkInfo.getSavepointPath())) {
                SavepointRestoreSettings settings = SavepointRestoreSettings.forPath((String)this.savepointDirectory, (boolean)false);
                return this.submitJobBySavepoint(flinkInfo, settings);
            }
            log.warn("skip to restore as the savepoint path was empty " + flinkInfo);
            return null;
        }
        catch (Exception e) {
            log.error("restore job from info {} failed: ", (Object)flinkInfo, (Object)e);
            throw new Exception("restore job failed: " + e.getMessage());
        }
    }

    private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
        String localJarPath = flinkInfo.getLocalJarPath();
        File jarFile = new File(localJarPath);
        String[] programArgs = this.genProgramArgs(flinkInfo, this.flinkConfig);
        List connectorJars = flinkInfo.getConnectorJarPaths().stream().map(p -> {
            try {
                return new File((String)p).toURI().toURL();
            }
            catch (MalformedURLException e) {
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toList());
        PackagedProgram program = PackagedProgram.newBuilder().setConfiguration(this.configuration).setEntryPointClassName("org.apache.inlong.sort.Entrance").setJarFile(jarFile).setUserClassPaths(connectorJars).setArguments(programArgs).setSavepointRestoreSettings(settings).build();
        JobGraph jobGraph = PackagedProgramUtils.createJobGraph((PackagedProgram)program, (Configuration)this.configuration, (int)this.parallelism, (boolean)false);
        jobGraph.addJars(connectorJars);
        RestClusterClient client = this.clientService.getFlinkClient();
        CompletableFuture result = client.submitJob(jobGraph);
        return ((JobID)result.get()).toString();
    }

    public String stopJob(String jobId, StopWithSavepointRequest request) throws Exception {
        return this.clientService.stopJob(jobId, request.isDrain(), request.getTargetDirectory());
    }

    public void cancelJob(String jobId) throws Exception {
        this.clientService.cancelJob(jobId);
    }

    private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
        ArrayList<String> list = new ArrayList<String>();
        list.add("-cluster-id");
        list.add(flinkInfo.getJobName());
        list.add("-job.name");
        list.add(flinkInfo.getJobName());
        list.add("-group.info.file");
        list.add(flinkInfo.getLocalConfPath());
        list.add("-checkpoint.interval");
        list.add("60000");
        list.add("-metrics.audit.proxy.hosts");
        list.add(flinkConfig.getAuditProxyHosts());
        return list.toArray(new String[0]);
    }
}

