/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.plugin.flink;

import java.io.File;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.plugin.flink.FlinkClientService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkService {
    private static final Logger log = LoggerFactory.getLogger(FlinkService.class);
    private static final Pattern IP_PORT_PATTERN = Pattern.compile("(\\d+\\.\\d+\\.\\d+\\.\\d+):(\\d+)");
    private final FlinkConfig flinkConfig;
    private final Integer parallelism;
    private final String savepointDirectory;
    private final Map<String, Configuration> configurations = new HashMap<String, Configuration>();
    private final Map<Configuration, FlinkClientService> flinkClientServices = new HashMap<Configuration, FlinkClientService>();

    public FlinkService() throws Exception {
        this.flinkConfig = FlinkUtils.getFlinkConfigFromFile();
        this.parallelism = this.flinkConfig.getParallelism();
        this.savepointDirectory = this.flinkConfig.getSavepointDirectory();
    }

    public static FlinkService getInstance() {
        return FlinkServiceHolder.INSTANCE;
    }

    private Map<String, String> translateFromEndpoint(String endpoint) {
        HashMap<String, String> map = new HashMap<String, String>(2);
        Matcher matcher = IP_PORT_PATTERN.matcher(endpoint);
        if (matcher.find()) {
            map.put("address", matcher.group(1));
            map.put("port", matcher.group(2));
        }
        return map;
    }

    public FlinkConfig getFlinkConfig() {
        return this.flinkConfig;
    }

    public JobStatus getJobStatus(String endpoint, String jobId) throws Exception {
        Configuration configuration = this.getFlinkConfiguration(endpoint);
        return this.getFlinkClientService(configuration).getJobStatus(jobId);
    }

    public JobStatus getJobStatus(FlinkInfo flinkInfo) throws Exception {
        Configuration configuration = this.getFlinkConfiguration(flinkInfo.getEndpoint());
        return this.getFlinkClientService(configuration).getJobStatus(flinkInfo.getJobId());
    }

    private FlinkClientService getFlinkClientService(Configuration configuration) {
        return this.flinkClientServices.computeIfAbsent(configuration, k -> (FlinkClientService)FlinkUtils.getFlinkClientService(configuration, this.flinkConfig));
    }

    private Configuration getFlinkConfiguration(String endpoint) {
        return this.configurations.computeIfAbsent(endpoint, k -> {
            Integer port;
            String address;
            if (StringUtils.isEmpty((CharSequence)endpoint)) {
                address = this.flinkConfig.getAddress();
                port = this.flinkConfig.getPort();
            } else {
                Map<String, String> ipPort = this.translateFromEndpoint(endpoint);
                if (ipPort.isEmpty()) {
                    throw new BusinessException("get address:port failed from endpoint " + endpoint);
                }
                address = ipPort.get("address");
                port = Integer.valueOf(ipPort.get("port"));
            }
            Configuration configuration = new Configuration();
            configuration.setInteger(JobManagerOptions.PORT, this.flinkConfig.getJobManagerPort().intValue());
            configuration.setString(JobManagerOptions.ADDRESS, address);
            configuration.setInteger(RestOptions.PORT, port.intValue());
            return configuration;
        });
    }

    public JobDetailsInfo getJobDetail(FlinkInfo flinkInfo) throws Exception {
        Configuration configuration = this.getFlinkConfiguration(flinkInfo.getEndpoint());
        return this.getFlinkClientService(configuration).getJobDetail(flinkInfo.getJobId());
    }

    public String submit(FlinkInfo flinkInfo) throws Exception {
        try {
            SavepointRestoreSettings settings = SavepointRestoreSettings.none();
            return this.submitJobBySavepoint(flinkInfo, settings);
        }
        catch (Exception e) {
            log.error("submit job from info {} failed: ", (Object)flinkInfo, (Object)e);
            throw new Exception("submit job failed: " + e.getMessage());
        }
    }

    public String restore(FlinkInfo flinkInfo) throws Exception {
        try {
            if (StringUtils.isNotEmpty((CharSequence)flinkInfo.getSavepointPath())) {
                SavepointRestoreSettings settings = SavepointRestoreSettings.forPath((String)this.savepointDirectory, (boolean)false);
                return this.submitJobBySavepoint(flinkInfo, settings);
            }
            log.warn("skip to restore as the savepoint path was empty " + flinkInfo);
            return null;
        }
        catch (Exception e) {
            log.error("restore job from info {} failed: ", (Object)flinkInfo, (Object)e);
            throw new Exception("restore job failed: " + e.getMessage());
        }
    }

    private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
        String localJarPath = flinkInfo.getLocalJarPath();
        File jarFile = new File(localJarPath);
        String[] programArgs = this.genProgramArgs(flinkInfo, this.flinkConfig);
        List connectorJars = flinkInfo.getConnectorJarPaths().stream().map(p -> {
            try {
                return new File((String)p).toURI().toURL();
            }
            catch (MalformedURLException e) {
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toList());
        Configuration configuration = this.getFlinkConfiguration(flinkInfo.getEndpoint());
        PackagedProgram program = PackagedProgram.newBuilder().setConfiguration(configuration).setEntryPointClassName("org.apache.inlong.sort.Entrance").setJarFile(jarFile).setUserClassPaths(connectorJars).setArguments(programArgs).setSavepointRestoreSettings(settings).build();
        JobGraph jobGraph = PackagedProgramUtils.createJobGraph((PackagedProgram)program, (Configuration)configuration, (int)this.parallelism, (boolean)false);
        jobGraph.addJars(connectorJars);
        RestClusterClient client = this.getFlinkClientService(configuration).getFlinkClient();
        CompletableFuture result = client.submitJob(jobGraph);
        return ((JobID)result.get()).toString();
    }

    public String stopJob(FlinkInfo flinkInfo, StopWithSavepointRequest request) throws Exception {
        Configuration configuration = this.getFlinkConfiguration(flinkInfo.getEndpoint());
        return this.getFlinkClientService(configuration).stopJob(flinkInfo.getJobId(), request.isDrain(), request.getTargetDirectory());
    }

    public void cancelJob(FlinkInfo flinkInfo) throws Exception {
        Configuration configuration = this.getFlinkConfiguration(flinkInfo.getEndpoint());
        this.getFlinkClientService(configuration).cancelJob(flinkInfo.getJobId());
    }

    private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
        ArrayList<String> list = new ArrayList<String>();
        list.add("-cluster-id");
        list.add(flinkInfo.getJobName());
        list.add("-job.name");
        list.add(flinkInfo.getJobName());
        list.add("-group.info.file");
        list.add(flinkInfo.getLocalConfPath());
        list.add("-checkpoint.interval");
        list.add("60000");
        if ("batch".equalsIgnoreCase(flinkInfo.getRuntimeExecutionMode())) {
            list.add("-runtime.execution.mode");
            list.add(flinkInfo.getRuntimeExecutionMode());
            list.add("-source.boundary.type");
            list.add(flinkInfo.getBoundaryType());
            list.add("-source.lower.boundary");
            list.add(flinkInfo.getLowerBoundary());
            list.add("-source.upper.boundary");
            list.add(flinkInfo.getUpperBoundary());
        }
        return list.toArray(new String[0]);
    }

    private static class FlinkServiceHolder {
        private static final FlinkService INSTANCE;

        private FlinkServiceHolder() {
        }

        static {
            try {
                INSTANCE = new FlinkService();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

