/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.azkaban;

import azkaban.jobExecutor.AbstractJob;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.gobblin.azkaban.AzkabanTags;
import org.apache.gobblin.configuration.DynamicConfigGenerator;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.RootMetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.app.ApplicationException;
import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.listeners.CompositeJobListener;
import org.apache.gobblin.runtime.listeners.EmailNotificationJobListener;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.TimeRangeChecker;
import org.apache.gobblin.util.hadoop.TokenUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

public class AzkabanJobLauncher
extends AbstractJob
implements ApplicationLauncher,
JobLauncher {
    private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class);
    public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride";
    public static final String LOG_LEVEL_OVERRIDE_MAP = "log.levelOverride.map";
    public static final String GOBBLIN_CUSTOM_JOB_LISTENERS = "gobblin.custom.job.listeners";
    private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
    private static final String AZKABAN_LINK_JOBEXEC_URL = "azkaban.link.jobexec.url";
    private static final String AZKABAN_LINK_JOBEXEC_PROXY_URL = "azkaban.link.jobexec.proxyUrl";
    private static final String AZKABAN_FLOW_EXEC_ID = "azkaban.flow.execid";
    private static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = "mapreduce.job.credentials.binary";
    private static final String AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = "gobblin.azkaban.SLAInSeconds";
    private static final String DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS = "-1";
    public static final String GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS = "gobblin.azkaban.initializeHadoopTokens";
    public static final String DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS = "true";
    private final Closer closer = Closer.create();
    private final JobLauncher jobLauncher;
    private final JobListener jobListener;
    private final Properties props;
    private final ApplicationLauncher applicationLauncher;
    private final long ownAzkabanSla;

    public AzkabanJobLauncher(String jobId, Properties props) throws Exception {
        super(jobId, LOG);
        HadoopUtils.addGobblinSite();
        ArrayList tags = Lists.newArrayList();
        tags.addAll(Tag.fromMap((Map)AzkabanTags.getAzkabanTags()));
        RootMetricContext.get((List)tags);
        if (props.containsKey(GOBBLIN_LOG_LEVEL_KEY)) {
            Level logLevel = Level.toLevel((String)props.getProperty(GOBBLIN_LOG_LEVEL_KEY), (Level)Level.INFO);
            Logger.getLogger((String)"org.apache.gobblin").setLevel(logLevel);
        }
        Log4jConfigurationHelper.setLogLevel((Collection)PropertiesUtils.getPropAsList((Properties)props, (String)LOG_LEVEL_OVERRIDE_MAP, (String)""));
        this.props = new Properties();
        this.props.putAll((Map<?, ?>)props);
        this.jobListener = this.initJobListener();
        Config propsAsConfig = ConfigUtils.propertiesToConfig((Properties)props);
        DynamicConfigGenerator dynamicConfigGenerator = DynamicConfigGeneratorFactory.createDynamicConfigGenerator((Config)propsAsConfig);
        Config dynamicConfig = dynamicConfigGenerator.generateDynamicConfig(propsAsConfig);
        for (Map.Entry entry : dynamicConfig.entrySet()) {
            this.props.put(entry.getKey(), ((ConfigValue)entry.getValue()).unwrapped().toString());
        }
        Configuration conf = new Configuration();
        String fsUri = conf.get(HADOOP_FS_DEFAULT_NAME);
        if (!Strings.isNullOrEmpty((String)fsUri)) {
            if (!this.props.containsKey("fs.uri")) {
                this.props.setProperty("fs.uri", fsUri);
            }
            if (!this.props.containsKey("state.store.fs.uri")) {
                this.props.setProperty("state.store.fs.uri", fsUri);
            }
        }
        this.props.setProperty("job.tracking.url", Strings.nullToEmpty((String)conf.get(AZKABAN_LINK_JOBEXEC_URL)));
        if (Boolean.parseBoolean(this.props.getProperty(GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS, DEFAULT_GOBBLIN_AZKABAN_INITIALIZE_HADOOP_TOKENS))) {
            if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
                LOG.info((Object)("Job type " + props.getProperty("type") + " provided Hadoop token in the environment variable " + "HADOOP_TOKEN_FILE_LOCATION"));
                this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
            } else {
                LOG.info((Object)("Job type " + props.getProperty("type") + " did not provide Hadoop token in the environment variable " + "HADOOP_TOKEN_FILE_LOCATION" + ". Negotiating Hadoop tokens."));
                File tokenFile = Files.createTempFile("mr-azkaban", ".token", new FileAttribute[0]).toFile();
                TokenUtils.getHadoopTokens((State)new State(props), (Optional)Optional.of((Object)tokenFile), (Credentials)new Credentials());
                System.setProperty("HADOOP_TOKEN_FILE_LOCATION", tokenFile.getAbsolutePath());
                System.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
                this.props.setProperty(MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenFile.getAbsolutePath());
                this.props.setProperty("env.HADOOP_TOKEN_FILE_LOCATION", tokenFile.getAbsolutePath());
            }
        }
        Properties jobProps = this.props;
        AbstractJobLauncher.resolveGobblinJobTemplateIfNecessary((Properties)jobProps);
        GobblinMetrics.addCustomTagsToProperties((Properties)jobProps, (List)tags);
        if (!jobProps.containsKey("launcher.type")) {
            jobProps.setProperty("launcher.type", JobLauncherFactory.JobLauncherType.MAPREDUCE.toString());
        }
        this.ownAzkabanSla = Long.parseLong(jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));
        List<Object> metadataTags = Lists.newArrayList();
        if (jobProps.containsKey("flow.name")) {
            metadataTags = AzkabanJobLauncher.addAdditionalMetadataTags(jobProps);
        }
        this.jobLauncher = (JobLauncher)this.closer.register((Closeable)JobLauncherFactory.newJobLauncher((Properties)jobProps, (Properties)jobProps, null, (List)metadataTags));
        boolean isMetricReportingFailureFatal = PropertiesUtils.getPropAsBoolean((Properties)jobProps, (String)"gobblin.job.isMetricReportingFailureFatal", (String)Boolean.toString(false));
        boolean isEventReportingFailureFatal = PropertiesUtils.getPropAsBoolean((Properties)jobProps, (String)"gobblin.job.isEventReportingFailureFatal", (String)Boolean.toString(false));
        jobProps.setProperty("metrics.reporting.failure.fatal", Boolean.toString(isMetricReportingFailureFatal));
        jobProps.setProperty("event.reporting.failure.fatal", Boolean.toString(isEventReportingFailureFatal));
        this.applicationLauncher = (ApplicationLauncher)this.closer.register((Closeable)new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID()));
    }

    protected JobListener initJobListener() {
        CompositeJobListener compositeJobListener = new CompositeJobListener();
        List listeners = new State(this.props).getPropAsList(GOBBLIN_CUSTOM_JOB_LISTENERS, EmailNotificationJobListener.class.getSimpleName());
        try {
            for (String listenerAlias : listeners) {
                ClassAliasResolver conditionClassAliasResolver = new ClassAliasResolver(JobListener.class);
                compositeJobListener.addJobListener((JobListener)conditionClassAliasResolver.resolveClass(listenerAlias).newInstance());
            }
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            throw new IllegalArgumentException(e);
        }
        return compositeJobListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void run() throws Exception {
        if (!this.isCurrentTimeInRange()) return;
        if (this.ownAzkabanSla > 0L) {
            LOG.info((Object)("Found gobblin defined SLA: " + this.ownAzkabanSla));
            ExecutorService service = Executors.newSingleThreadExecutor();
            boolean isCancelled = false;
            Future<Void> future = service.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    AzkabanJobLauncher.this.runRealJob();
                    return null;
                }
            });
            try {
                future.get(this.ownAzkabanSla, TimeUnit.SECONDS);
                return;
            }
            catch (TimeoutException e) {
                LOG.info((Object)("Cancelling job since SLA is reached: " + this.ownAzkabanSla));
                future.cancel(true);
                isCancelled = true;
                this.cancelJob(this.jobListener);
            }
            finally {
                service.shutdown();
                if (!isCancelled) return;
                this.cancel();
                throw new RuntimeException("Job failed because it reaches SLA limit: " + this.ownAzkabanSla);
            }
        } else {
            this.runRealJob();
        }
    }

    private void runRealJob() throws Exception {
        try {
            this.start();
            this.launchJob(this.jobListener);
        }
        finally {
            try {
                this.stop();
            }
            finally {
                this.close();
            }
        }
    }

    public void cancel() throws Exception {
        try {
            this.stop();
        }
        finally {
            this.close();
        }
    }

    public void start() throws ApplicationException {
        this.applicationLauncher.start();
    }

    public void stop() throws ApplicationException {
        this.applicationLauncher.stop();
    }

    public void launchJob(@Nullable JobListener jobListener) throws JobException {
        this.jobLauncher.launchJob(jobListener);
    }

    public void cancelJob(@Nullable JobListener jobListener) throws JobException {
        this.jobLauncher.cancelJob(jobListener);
    }

    public void close() throws IOException {
        this.closer.close();
    }

    private boolean isCurrentTimeInRange() {
        Splitter splitter = Splitter.on((String)",").omitEmptyStrings().trimResults();
        if (this.props.contains("azkaban.execution.days.list") && this.props.contains("azkaban.execution.time.range")) {
            List executionTimeRange = splitter.splitToList((CharSequence)this.props.getProperty("azkaban.execution.time.range"));
            List executionDays = splitter.splitToList((CharSequence)this.props.getProperty("azkaban.execution.days.list"));
            Preconditions.checkArgument((executionTimeRange.size() == 2 ? 1 : 0) != 0, (Object)"The property azkaban.execution.days.list should be a comma separated list of two entries");
            return TimeRangeChecker.isTimeInRange((List)executionDays, (String)((String)executionTimeRange.get(0)), (String)((String)executionTimeRange.get(1)), (DateTime)new DateTime(DateTimeZone.forID((String)"America/Los_Angeles")));
        }
        return true;
    }

    private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties jobProps) {
        ArrayList metadataTags = Lists.newArrayList();
        String jobExecutionId = jobProps.getProperty(AZKABAN_FLOW_EXEC_ID, "");
        String jobExecutionUrl = jobProps.getProperty(AZKABAN_LINK_JOBEXEC_PROXY_URL, jobProps.getProperty(AZKABAN_LINK_JOBEXEC_URL, ""));
        metadataTags.add(new Tag("flowGroup", (Object)jobProps.getProperty("flow.group", "")));
        metadataTags.add(new Tag("flowName", (Object)jobProps.getProperty("flow.name")));
        if (jobProps.containsKey("job.currentAttempts")) {
            metadataTags.add(new Tag("currentAttempts", (Object)jobProps.getProperty("job.currentAttempts", "1")));
            metadataTags.add(new Tag("currentGeneration", (Object)jobProps.getProperty("job.currentGeneration", "1")));
            metadataTags.add(new Tag("shouldRetry", (Object)"false"));
        }
        metadataTags.add(new Tag("flowExecutionId", (Object)jobProps.getProperty("flow.executionId", jobExecutionId)));
        metadataTags.add(new Tag("jobExecutionId", (Object)jobExecutionId));
        metadataTags.add(new Tag("jobGroup", (Object)jobProps.getProperty("job.group", "")));
        metadataTags.add(new Tag("jobName", (Object)jobProps.getProperty("job.name", "")));
        metadataTags.add(new Tag("message", (Object)jobExecutionUrl));
        metadataTags.add(new Tag("user.to.proxy", (Object)jobProps.getProperty("user.to.proxy", "")));
        LOG.debug((Object)String.format("AzkabanJobLauncher.addAdditionalMetadataTags: metadataTags %s", metadataTags));
        return metadataTags;
    }
}

