/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.core.spark;

import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.UnixStyleUsageFormatter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.linkis.engineconnplugin.seatunnel.util.SeatunnelUtils;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.command.CommandArgs;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.spark.SeaTunnelSpark;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;

public class SparkV2Starter
implements Starter {
    public static final Log logger = LogFactory.getLog((String)SparkV2Starter.class.getName());
    private static final int USAGE_EXIT_CODE = 234;
    private static final int PLUGIN_LIB_DIR_DEPTH = 3;
    protected String[] args;
    protected SparkCommandArgs commandArgs;
    protected String appName;
    protected List<Path> jars = new ArrayList<Path>();
    protected List<Path> files = new ArrayList<Path>();
    protected Map<String, String> sparkConf;

    private SparkV2Starter(String[] args, SparkCommandArgs commandArgs) {
        this.args = args;
        this.commandArgs = commandArgs;
    }

    public static int main(String[] args) {
        int exitCode = 0;
        logger.info((Object)"starter start");
        try {
            SparkV2Starter starter = SparkV2Starter.getInstance(args);
            List<String> command = starter.buildCommands();
            String commandVal = String.join((CharSequence)" ", command);
            logger.info((Object)("sparkV2starter commandVal:" + commandVal));
            exitCode = SeatunnelUtils.executeLine(commandVal);
        }
        catch (Exception e) {
            exitCode = 1;
            logger.error((Object)("\n\nsparkV2Starter error:\n" + e));
        }
        return exitCode;
    }

    static SparkV2Starter getInstance(String[] args) {
        SparkCommandArgs commandArgs = (SparkCommandArgs)CommandLineUtils.parse((String[])args, (CommandArgs)new SparkCommandArgs(), (String)EngineType.SPARK2.getStarterShellName(), (boolean)true);
        DeployMode deployMode = commandArgs.getDeployMode();
        switch (deployMode) {
            case CLUSTER: {
                return new ClusterModeSparkStarter(args, commandArgs);
            }
            case CLIENT: {
                return new ClientModeSparkStarter(args, commandArgs);
            }
        }
        throw new IllegalArgumentException("Deploy mode " + deployMode + " not supported");
    }

    private static SparkCommandArgs parseCommandArgs(String[] args) {
        SparkCommandArgs commandArgs = new SparkCommandArgs();
        JCommander commander = JCommander.newBuilder().programName("start-seatunnel-spark.sh").addObject((Object)commandArgs).args(args).build();
        if (commandArgs.isHelp()) {
            commander.setUsageFormatter((IUsageFormatter)new UnixStyleUsageFormatter(commander));
            commander.usage();
            System.exit(234);
        }
        return commandArgs;
    }

    public List<String> buildCommands() throws IOException {
        this.setSparkConf();
        logger.info((Object)"setSparkConf start");
        logger.info((Object)this.commandArgs.getDeployMode().toString());
        Common.setDeployMode(this.commandArgs.getDeployMode());
        Common.setStarter(true);
        this.jars.addAll(Common.getPluginsJarDependencies());
        this.jars.addAll(Common.getLibJars());
        this.jars.addAll(this.getConnectorJarDependencies());
        this.jars.addAll(new ArrayList<Path>(Common.getThirdPartyJars(this.sparkConf.getOrDefault(EnvCommonOptions.JARS.key(), ""))));
        this.appName = this.sparkConf.getOrDefault("spark.app.name", "SeaTunnel");
        logger.info((Object)"buildFinal end");
        return this.buildFinal();
    }

    private void setSparkConf() throws FileNotFoundException {
        this.commandArgs.getVariables().stream().filter(Objects::nonNull).map(variable -> variable.split("=", 2)).filter(pair -> ((String[])pair).length == 2).forEach(pair -> System.setProperty(pair[0], pair[1]));
        this.sparkConf = SparkV2Starter.getSparkConf(this.commandArgs.getConfigFile());
        String driverJavaOpts = this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
        String executorJavaOpts = this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
        if (!this.commandArgs.getVariables().isEmpty()) {
            String properties = this.commandArgs.getVariables().stream().map(v -> "-D" + v).collect(Collectors.joining(" "));
            driverJavaOpts = driverJavaOpts + " " + properties;
            executorJavaOpts = executorJavaOpts + " " + properties;
            this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts.trim());
            this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts.trim());
        }
    }

    static Map<String, String> getSparkConf(String configFile) throws FileNotFoundException {
        File file = new File(configFile);
        if (!file.exists()) {
            throw new FileNotFoundException("config file '" + file + "' does not exists!");
        }
        Config appConfig = ConfigFactory.parseFile((File)file).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)).resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
        return appConfig.getConfig("env").entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((ConfigValue)e.getValue()).unwrapped().toString()));
    }

    private List<Path> getPluginsJarDependencies() throws IOException {
        Path pluginRootDir = Common.pluginRootDir();
        if (!Files.exists(pluginRootDir, new LinkOption[0]) || !Files.isDirectory(pluginRootDir, new LinkOption[0])) {
            return Collections.emptyList();
        }
        try (Stream<Path> stream = Files.walk(pluginRootDir, 3, FileVisitOption.FOLLOW_LINKS);){
            List<Path> list = stream.filter(it -> pluginRootDir.relativize((Path)it).getNameCount() == 3).filter(it -> it.getParent().endsWith("lib")).filter(it -> it.getFileName().toString().endsWith("jar")).collect(Collectors.toList());
            return list;
        }
    }

    private List<Path> getConnectorJarDependencies() {
        Path pluginRootDir = Common.connectorJarDir("seatunnel");
        if (!Files.exists(pluginRootDir, new LinkOption[0]) || !Files.isDirectory(pluginRootDir, new LinkOption[0])) {
            return Collections.emptyList();
        }
        Config config = ConfigBuilder.of((String)this.commandArgs.getConfigFile());
        HashSet pluginJars = new HashSet();
        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
        pluginJars.addAll(seaTunnelSourcePluginDiscovery.getPluginJarPaths(this.getPluginIdentifiers(config, PluginType.SOURCE)));
        pluginJars.addAll(seaTunnelSinkPluginDiscovery.getPluginJarPaths(this.getPluginIdentifiers(config, PluginType.SINK)));
        List<Path> connectPaths = pluginJars.stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
        logger.info((Object)("getConnector jar paths:" + connectPaths.toString()));
        return connectPaths;
    }

    private List<PluginIdentifier> getPluginIdentifiers(Config config, PluginType ... pluginTypes) {
        return Arrays.stream(pluginTypes).flatMap(pluginType -> {
            List configList = config.getConfigList(pluginType.getType());
            return configList.stream().map(pluginConfig -> PluginIdentifier.of((String)"seatunnel", (String)pluginType.getType(), (String)pluginConfig.getString("plugin_name")));
        }).collect(Collectors.toList());
    }

    private List<Path> listJars(Path dir) throws IOException {
        try (Stream<Path> stream = Files.list(dir);){
            List<Path> list = stream.filter(it -> !Files.isDirectory(it, new LinkOption[0])).filter(it -> it.getFileName().endsWith("jar")).collect(Collectors.toList());
            return list;
        }
    }

    protected List<String> buildFinal() {
        ArrayList<String> commands = new ArrayList<String>();
        commands.add(System.getenv("SPARK_HOME") + "/bin/spark-submit");
        this.appendOption(commands, "--class", SeaTunnelSpark.class.getName());
        this.appendOption(commands, "--name", this.appName);
        this.appendOption(commands, "--master", this.commandArgs.getMaster());
        this.appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getDeployMode());
        this.appendJars(commands, this.jars);
        this.appendFiles(commands, this.files);
        this.appendSparkConf(commands, this.sparkConf);
        this.appendAppJar(commands);
        this.appendArgs(commands, this.args);
        if (this.commandArgs.isCheckConfig()) {
            commands.add("--check");
        }
        logger.info((Object)("build command:" + commands));
        return commands;
    }

    protected void appendOption(List<String> commands, String option, String value) {
        commands.add(option);
        commands.add("\"" + value.replace("\"", "\\\"") + "\"");
    }

    protected void appendJars(List<String> commands, List<Path> paths) {
        this.appendPaths(commands, "--jars", paths);
    }

    protected void appendFiles(List<String> commands, List<Path> paths) {
        this.appendPaths(commands, "--files", paths);
    }

    protected void appendPaths(List<String> commands, String option, List<Path> paths) {
        if (!paths.isEmpty()) {
            String values = paths.stream().map(Path::toString).collect(Collectors.joining(","));
            this.appendOption(commands, option, values);
        }
    }

    protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
        for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            this.appendOption(commands, "--conf", key + "=" + value);
        }
    }

    protected void appendArgs(List<String> commands, String[] args) {
        commands.addAll(Arrays.asList(args));
    }

    protected void appendAppJar(List<String> commands) {
        String appJarPath = Common.appStarterDir().resolve(EngineType.SPARK2.getStarterJarName()).toString();
        logger.info((Object)("spark appJarPath:" + appJarPath));
        commands.add(appJarPath);
    }

    private static class ClusterModeSparkStarter
    extends SparkV2Starter {
        private ClusterModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
            super(args, commandArgs);
        }

        @Override
        public List<String> buildCommands() throws IOException {
            Common.setDeployMode(this.commandArgs.getDeployMode());
            Common.setStarter(true);
            Path pluginTarball = Common.pluginTarball();
            CompressionUtils.tarGzip((Path)Common.pluginRootDir(), (Path)pluginTarball);
            this.files.add(pluginTarball);
            this.files.add(Paths.get(this.commandArgs.getConfigFile(), new String[0]));
            return super.buildCommands();
        }
    }

    private static class ClientModeSparkStarter
    extends SparkV2Starter {
        private ClientModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
            super(args, commandArgs);
        }

        @Override
        protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
            for (ClientModeSparkConfigs config : ClientModeSparkConfigs.values()) {
                String driverJavaOptions = (String)this.sparkConf.get(config.propertyName);
                if (!StringUtils.isNotBlank((CharSequence)driverJavaOptions)) continue;
                this.appendOption(commands, config.optionName, driverJavaOptions);
            }
            for (Map.Entry entry : sparkConf.entrySet()) {
                String key = (String)entry.getKey();
                String value = (String)entry.getValue();
                if (ClientModeSparkConfigs.PROPERTY_NAME_MAP.containsKey(key)) continue;
                this.appendOption(commands, "--conf", key + "=" + value);
            }
        }

        private static enum ClientModeSparkConfigs {
            DriverMemory("--driver-memory", "spark.driver.memory"),
            DriverJavaOptions("--driver-java-options", "spark.driver.extraJavaOptions"),
            DriverLibraryPath(" --driver-library-path", "spark.driver.extraLibraryPath"),
            DriverClassPath("--driver-class-path", "spark.driver.extraClassPath");

            private final String optionName;
            private final String propertyName;
            private static final Map<String, ClientModeSparkConfigs> PROPERTY_NAME_MAP;

            private ClientModeSparkConfigs(String optionName, String propertyName) {
                this.optionName = optionName;
                this.propertyName = propertyName;
            }

            static {
                PROPERTY_NAME_MAP = new HashMap<String, ClientModeSparkConfigs>();
                for (ClientModeSparkConfigs config : ClientModeSparkConfigs.values()) {
                    PROPERTY_NAME_MAP.put(config.propertyName, config);
                }
            }
        }
    }
}

