/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.cli;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.flink.cdc.cli.CliExecutor;
import org.apache.flink.cdc.cli.CliFrontendOptions;
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
import org.apache.flink.cdc.cli.utils.FlinkEnvironmentUtils;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CliFrontend {
    private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class);
    private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME";
    private static final String FLINK_CDC_HOME_ENV_VAR = "FLINK_CDC_HOME";

    public static void main(String[] args) throws Exception {
        Options cliOptions = CliFrontendOptions.initializeOptions();
        DefaultParser parser = new DefaultParser();
        CommandLine commandLine = parser.parse(cliOptions, args);
        if (args.length == 0 || commandLine.hasOption(CliFrontendOptions.HELP)) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.setLeftPadding(4);
            formatter.setWidth(80);
            formatter.printHelp(" ", cliOptions);
            return;
        }
        PipelineExecution.ExecutionInfo result = CliFrontend.createExecutor(commandLine).run();
        CliFrontend.printExecutionInfo(result);
    }

    @VisibleForTesting
    static CliExecutor createExecutor(CommandLine commandLine) throws Exception {
        List unparsedArgs = commandLine.getArgList();
        if (unparsedArgs.isEmpty()) {
            throw new IllegalArgumentException("Missing pipeline definition file path in arguments. ");
        }
        Path pipelineDefPath = Paths.get((String)unparsedArgs.get(0), new String[0]);
        LOG.info("Real Path pipelineDefPath {}", (Object)pipelineDefPath);
        Configuration globalPipelineConfig = CliFrontend.getGlobalConfig(commandLine);
        Path flinkHome = CliFrontend.getFlinkHome(commandLine);
        Configuration flinkConfig = FlinkEnvironmentUtils.loadFlinkConfiguration(flinkHome);
        SavepointRestoreSettings savepointSettings = CliFrontend.createSavepointRestoreSettings(commandLine);
        List<Path> additionalJars = Arrays.stream((Object[])Optional.ofNullable(commandLine.getOptionValues(CliFrontendOptions.JAR)).orElse(new String[0])).map(x$0 -> Paths.get(x$0, new String[0])).collect(Collectors.toList());
        return new CliExecutor(commandLine, pipelineDefPath, flinkConfig, globalPipelineConfig, commandLine.hasOption(CliFrontendOptions.USE_MINI_CLUSTER), additionalJars, savepointSettings);
    }

    private static SavepointRestoreSettings createSavepointRestoreSettings(CommandLine commandLine) {
        if (commandLine.hasOption(CliFrontendOptions.SAVEPOINT_PATH_OPTION.getOpt())) {
            String savepointPath = commandLine.getOptionValue(CliFrontendOptions.SAVEPOINT_PATH_OPTION.getOpt());
            boolean allowNonRestoredState = commandLine.hasOption(CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
            RestoreMode restoreMode = commandLine.hasOption(CliFrontendOptions.SAVEPOINT_CLAIM_MODE) ? (RestoreMode)org.apache.flink.configuration.ConfigurationUtils.convertValue((Object)commandLine.getOptionValue(CliFrontendOptions.SAVEPOINT_CLAIM_MODE), RestoreMode.class) : (RestoreMode)SavepointConfigOptions.RESTORE_MODE.defaultValue();
            return SavepointRestoreSettings.forPath((String)savepointPath, (boolean)allowNonRestoredState, (RestoreMode)restoreMode);
        }
        return SavepointRestoreSettings.none();
    }

    private static Path getFlinkHome(CommandLine commandLine) {
        String flinkHomeFromArgs = commandLine.getOptionValue(CliFrontendOptions.FLINK_HOME);
        if (flinkHomeFromArgs != null) {
            LOG.debug("Flink home is loaded by command-line argument: {}", (Object)flinkHomeFromArgs);
            return Paths.get(flinkHomeFromArgs, new String[0]);
        }
        String flinkHomeFromEnvVar = System.getenv(FLINK_HOME_ENV_VAR);
        if (flinkHomeFromEnvVar != null) {
            LOG.debug("Flink home is loaded by environment variable: {}", (Object)flinkHomeFromEnvVar);
            return Paths.get(flinkHomeFromEnvVar, new String[0]);
        }
        throw new IllegalArgumentException("Cannot find Flink home from either command line arguments \"--flink-home\" or the environment variable \"FLINK_HOME\". Please make sure Flink home is properly set. ");
    }

    private static Configuration getGlobalConfig(CommandLine commandLine) throws Exception {
        String globalConfig = commandLine.getOptionValue(CliFrontendOptions.GLOBAL_CONFIG);
        if (globalConfig != null) {
            Path globalConfigPath = Paths.get(globalConfig, new String[0]);
            LOG.info("Using global config in command line: {}", (Object)globalConfigPath);
            return ConfigurationUtils.loadConfigFile(globalConfigPath);
        }
        String flinkCdcHome = System.getenv(FLINK_CDC_HOME_ENV_VAR);
        if (flinkCdcHome != null) {
            Path globalConfigPath = Paths.get(flinkCdcHome, new String[0]).resolve("conf").resolve("flink-cdc.yaml");
            LOG.info("Using global config in FLINK_CDC_HOME: {}", (Object)globalConfigPath);
            return ConfigurationUtils.loadConfigFile(globalConfigPath);
        }
        LOG.warn("Cannot find global configuration in command-line or FLINK_CDC_HOME. Will use empty global configuration.");
        return new Configuration();
    }

    private static void printExecutionInfo(PipelineExecution.ExecutionInfo info) {
        System.out.println("Pipeline has been submitted to cluster.");
        System.out.printf("Job ID: %s\n", info.getId());
        System.out.printf("Job Description: %s\n", info.getDescription());
    }
}

