/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.embedded;

import com.codahale.metrics.MetricFilter;
import com.github.rholder.retry.RetryListener;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.linkedin.data.template.DataTemplate;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javassist.bytecode.ClassFile;
import javax.annotation.Nullable;
import org.apache.avro.SchemaBuilder;
import org.apache.commons.lang3.ClassUtils;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.extractor.InstrumentedExtractorBase;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.JobLauncherFactory;
import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.api.Configurable;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.GobblinInstancePluginFactory;
import org.apache.gobblin.runtime.api.JobExecutionDriver;
import org.apache.gobblin.runtime.api.JobExecutionResult;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.cli.CliObjectOption;
import org.apache.gobblin.runtime.cli.CliObjectSupport;
import org.apache.gobblin.runtime.cli.ConstructorAndPublicMethodsGobblinCliFactory;
import org.apache.gobblin.runtime.cli.NotOnCli;
import org.apache.gobblin.runtime.instance.SimpleGobblinInstanceEnvironment;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceDriver;
import org.apache.gobblin.runtime.job_catalog.ImmutableFSJobCatalog;
import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
import org.apache.gobblin.runtime.job_catalog.StaticJobCatalog;
import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.plugins.GobblinInstancePluginUtils;
import org.apache.gobblin.runtime.plugins.metrics.GobblinMetricsPlugin;
import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
import org.apache.gobblin.runtime.std.DefaultJobLifecycleListenerImpl;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PullFileLoader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ClassUtil;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;
import org.reflections.Reflections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbeddedGobblin {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedGobblin.class);
    private static final Splitter KEY_VALUE_SPLITTER = Splitter.on((String)":").limit(2);
    private final JobSpec.Builder specBuilder;
    private final Map<String, String> userConfigMap;
    private final Map<String, String> builtConfigMap;
    private final Config defaultSysConfig;
    private final Map<String, String> sysConfigOverrides;
    private final Map<String, Integer> distributedJars;
    private Runnable distributeJarsFunction;
    private JobTemplate template;
    private Logger useLog = log;
    private FullTimeout launchTimeout = new FullTimeout(100L, TimeUnit.SECONDS);
    private FullTimeout jobTimeout = new FullTimeout(100L, TimeUnit.DAYS);
    private FullTimeout shutdownTimeout = new FullTimeout(100L, TimeUnit.SECONDS);
    private boolean dumpJStackOnTimeout = false;
    private List<GobblinInstancePluginFactory> plugins = Lists.newArrayList();
    private Optional<Path> jobFile = Optional.absent();

    public EmbeddedGobblin() {
        this("EmbeddedGobblin");
    }

    @CliObjectSupport(argumentNames={"jobName"})
    public EmbeddedGobblin(String name) {
        HadoopUtils.addGobblinSite();
        this.specBuilder = new JobSpec.Builder(name);
        this.userConfigMap = Maps.newHashMap();
        this.builtConfigMap = Maps.newHashMap();
        this.sysConfigOverrides = Maps.newHashMap();
        this.defaultSysConfig = this.getDefaultSysConfig();
        this.distributedJars = Maps.newHashMap();
        this.loadCoreGobblinJarsToDistributedJars();
        this.distributeJarsFunction = new Runnable(){

            @Override
            public void run() {
            }
        };
    }

    public EmbeddedGobblin mrMode() throws IOException {
        this.sysConfigOverrides.put("launcher.type", JobLauncherFactory.JobLauncherType.MAPREDUCE.name());
        this.builtConfigMap.put("fs.uri", FileSystem.get((Configuration)new Configuration()).getUri().toString());
        this.builtConfigMap.put("mr.job.root.dir", "/tmp/EmbeddedGobblin_" + System.currentTimeMillis());
        this.distributeJarsFunction = new Runnable(){

            @Override
            public void run() {
                EmbeddedGobblin.this.sysConfigOverrides.put("job.jars", Joiner.on((String)",").join(EmbeddedGobblin.this.getPrioritizedDistributedJars()));
            }
        };
        return this;
    }

    public EmbeddedGobblin distributeJar(String jarPath) {
        return this.distributeJarWithPriority(jarPath, 0);
    }

    public EmbeddedGobblin distributeJarByClassWithPriority(Class<?> klazz, int priority) {
        String jar = ClassUtil.findContainingJar(klazz);
        if (jar == null) {
            log.warn(String.format("Could not find jar for class %s. This is normal in test runs.", klazz));
            return this;
        }
        return this.distributeJarWithPriority(jar, priority);
    }

    public synchronized EmbeddedGobblin distributeJarWithPriority(String jarPath, int priority) {
        if (this.distributedJars.containsKey(jarPath)) {
            this.distributedJars.put(jarPath, Math.min(priority, this.distributedJars.get(jarPath)));
        } else {
            this.distributedJars.put(jarPath, priority);
        }
        return this;
    }

    public EmbeddedGobblin setTemplate(JobTemplate template) {
        this.template = template;
        return this;
    }

    public EmbeddedGobblin setTemplate(String templateURI) throws URISyntaxException, SpecNotFoundException, JobTemplate.TemplateException {
        return this.setTemplate(new PackagedTemplatesJobCatalogDecorator().getTemplate(new URI(templateURI)));
    }

    public EmbeddedGobblin usePlugin(GobblinInstancePluginFactory pluginFactory) {
        this.plugins.add(pluginFactory);
        return this;
    }

    public EmbeddedGobblin usePlugin(String pluginAlias) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        return this.usePlugin(GobblinInstancePluginUtils.instantiatePluginByAlias(pluginAlias));
    }

    public EmbeddedGobblin sysConfig(String key, String value) {
        this.sysConfigOverrides.put(key, value);
        return this;
    }

    public EmbeddedGobblin sysConfig(String keyValue) {
        List split = KEY_VALUE_SPLITTER.splitToList((CharSequence)keyValue);
        if (split.size() != 2) {
            throw new RuntimeException("Cannot parse " + keyValue + ". Expected <key>:<value>.");
        }
        return this.sysConfig((String)split.get(0), (String)split.get(1));
    }

    public EmbeddedGobblin jobFile(String pathStr) {
        this.jobFile = Optional.of((Object)new Path(pathStr));
        return this;
    }

    @CliObjectOption(description="Authenticate using kerberos. Format: \"<login-user>:<keytab-file>\".")
    public EmbeddedGobblin kerberosAuthentication(String credentials) {
        List split = Splitter.on((String)":").splitToList((CharSequence)credentials);
        if (split.size() != 2) {
            throw new RuntimeException("Cannot parse " + credentials + ". Expected <login-user>:<keytab-file>");
        }
        try {
            this.usePlugin("hadoopLoginFromKeytab");
        }
        catch (ReflectiveOperationException roe) {
            throw new RuntimeException(String.format("Could not instantiate %s. Make sure gobblin-runtime-hadoop is in your classpath.", "hadoopLoginFromKeytab"), roe);
        }
        this.sysConfig("gobblin.instance.hadoop.loginUser", (String)split.get(0));
        this.sysConfig("gobblin.instance.hadoop.loginUserKeytabFile", (String)split.get(1));
        return this;
    }

    public EmbeddedGobblin setConfiguration(String key, String value) {
        this.userConfigMap.put(key, value);
        return this;
    }

    public EmbeddedGobblin setConfiguration(String keyValue) {
        List split = KEY_VALUE_SPLITTER.splitToList((CharSequence)keyValue);
        if (split.size() != 2) {
            throw new RuntimeException("Cannot parse " + keyValue + ". Expected <key>:<value>.");
        }
        return this.setConfiguration((String)split.get(0), (String)split.get(1));
    }

    public EmbeddedGobblin setJobTimeout(long timeout, TimeUnit timeUnit) {
        this.jobTimeout = new FullTimeout(timeout, timeUnit);
        return this;
    }

    public EmbeddedGobblin setJobTimeout(String timeout) {
        return this.setJobTimeout(Period.parse((String)timeout).getSeconds(), TimeUnit.SECONDS);
    }

    public EmbeddedGobblin setLaunchTimeout(long timeout, TimeUnit timeUnit) {
        this.launchTimeout = new FullTimeout(timeout, timeUnit);
        return this;
    }

    public EmbeddedGobblin setLaunchTimeout(String timeout) {
        return this.setLaunchTimeout(Period.parse((String)timeout).getSeconds(), TimeUnit.SECONDS);
    }

    public EmbeddedGobblin setShutdownTimeout(long timeout, TimeUnit timeUnit) {
        this.shutdownTimeout = new FullTimeout(timeout, timeUnit);
        return this;
    }

    public EmbeddedGobblin setShutdownTimeout(String timeout) {
        return this.setShutdownTimeout(Period.parse((String)timeout).getSeconds(), TimeUnit.SECONDS);
    }

    public EmbeddedGobblin setDumpJStackOnTimeout(boolean dumpJStackOnTimeout) {
        this.dumpJStackOnTimeout = dumpJStackOnTimeout;
        return this;
    }

    public EmbeddedGobblin useStateStore(String rootDir) {
        this.setConfiguration("state.store.enabled", "true");
        this.setConfiguration("state.store.dir", rootDir);
        return this;
    }

    public EmbeddedGobblin enableMetrics() {
        this.usePlugin(new GobblinMetricsPlugin.Factory());
        this.sysConfig("metrics.enabled", Boolean.toString(true));
        return this;
    }

    protected Config getDefaultSysConfig() {
        return ConfigFactory.parseResources((String)"embedded/embedded.conf");
    }

    @NotOnCli
    public JobExecutionResult run() throws InterruptedException, TimeoutException, ExecutionException {
        JobExecutionDriver jobDriver = this.runAsync();
        return jobDriver.get(this.jobTimeout.getTimeout(), this.jobTimeout.getTimeUnit());
    }

    @NotOnCli
    public JobExecutionDriver runAsync() throws TimeoutException, InterruptedException {
        ResolvedJobSpec resolvedJobSpec;
        JobSpec jobSpec;
        this.distributeJarsFunction.run();
        log.debug("BuiltConfigMap: {}", this.builtConfigMap);
        log.debug("DefaultSysConfig: {}", (Object)this.defaultSysConfig);
        Config sysProps = ConfigFactory.parseMap(this.builtConfigMap).withFallback((ConfigMergeable)this.defaultSysConfig);
        log.debug("Merged SysProps:{}", (Object)sysProps);
        Config userConfig = ConfigFactory.parseMap(this.userConfigMap);
        log.debug("UserConfig: {}", (Object)userConfig);
        if (this.jobFile.isPresent()) {
            try {
                Path jobFilePath = (Path)this.jobFile.get();
                PullFileLoader loader = new PullFileLoader(jobFilePath.getParent(), jobFilePath.getFileSystem(new Configuration()), (Collection)PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, (Collection)PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
                Config jobConfig = userConfig.withFallback((ConfigMergeable)loader.loadPullFile(jobFilePath, sysProps, false));
                log.debug("JobConfig: {}", (Object)jobConfig);
                ImmutableFSJobCatalog.JobSpecConverter converter = new ImmutableFSJobCatalog.JobSpecConverter(jobFilePath.getParent(), (Optional<String>)Optional.absent());
                jobSpec = converter.apply(jobConfig);
            }
            catch (IOException ioe) {
                throw new RuntimeException("Failed to run embedded Gobblin.", ioe);
            }
        } else {
            Config finalConfig = userConfig.withFallback((ConfigMergeable)sysProps);
            if (this.template != null) {
                this.specBuilder.withTemplate(this.template);
            }
            jobSpec = this.specBuilder.withConfig(finalConfig).build();
        }
        try {
            JobSpecResolver resolver = JobSpecResolver.builder(sysProps).build();
            resolvedJobSpec = resolver.resolveJobSpec(jobSpec);
        }
        catch (IOException | JobTemplate.TemplateException | SpecNotFoundException exc) {
            throw new RuntimeException("Failed to resolved template.", exc);
        }
        StaticJobCatalog jobCatalog = new StaticJobCatalog((Optional<Logger>)Optional.of((Object)this.useLog), (Collection<JobSpec>)Lists.newArrayList((Object[])new JobSpec[]{resolvedJobSpec}));
        SimpleGobblinInstanceEnvironment instanceEnvironment = new SimpleGobblinInstanceEnvironment("EmbeddedGobblinInstance", this.useLog, this.getSysConfig());
        StandardGobblinInstanceDriver.Builder builder = new StandardGobblinInstanceDriver.Builder((Optional<GobblinInstanceEnvironment>)Optional.of((Object)instanceEnvironment)).withLog(this.useLog).withJobCatalog(jobCatalog).withImmediateJobScheduler();
        for (GobblinInstancePluginFactory plugin : this.plugins) {
            builder.addPlugin(plugin);
        }
        final StandardGobblinInstanceDriver driver = builder.build();
        EmbeddedJobLifecycleListener listener = new EmbeddedJobLifecycleListener(this.useLog);
        driver.registerJobLifecycleListener(listener);
        driver.startAsync();
        boolean started = listener.awaitStarted(this.launchTimeout.getTimeout(), this.launchTimeout.getTimeUnit());
        if (!started) {
            this.dumpJStackOnTimeout("Launch");
            log.warn("Timeout waiting for job to start. Aborting.");
            driver.stopAsync();
            driver.awaitTerminated(this.shutdownTimeout.getTimeout(), this.shutdownTimeout.getTimeUnit());
            throw new TimeoutException("Timeout waiting for job to start.");
        }
        JobExecutionDriver jobDriver = listener.getJobDriver();
        Futures.addCallback((ListenableFuture)jobDriver, (FutureCallback)new FutureCallback<JobExecutionResult>(){

            public void onSuccess(@Nullable JobExecutionResult result) {
                this.stopGobblinInstanceDriver();
            }

            public void onFailure(Throwable t) {
                this.stopGobblinInstanceDriver();
            }

            private void stopGobblinInstanceDriver() {
                try {
                    driver.stopAsync();
                    driver.awaitTerminated(EmbeddedGobblin.this.shutdownTimeout.getTimeout(), EmbeddedGobblin.this.shutdownTimeout.getTimeUnit());
                }
                catch (TimeoutException te) {
                    EmbeddedGobblin.this.dumpJStackOnTimeout("stop gobblin instance driver");
                    log.error("Failed to shutdown Gobblin instance driver.");
                }
            }
        });
        return listener.getJobDriver();
    }

    private void dumpJStackOnTimeout(String loc) {
        if (this.dumpJStackOnTimeout) {
            log.info("=== Dump jstack ({}) ===", (Object)loc);
            ThreadMXBean bean = ManagementFactory.getThreadMXBean();
            ThreadInfo[] infos = bean.dumpAllThreads(true, true);
            Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
            HashMap<Long, Thread> threadMap = new HashMap<Long, Thread>();
            for (Thread t : threadSet) {
                threadMap.put(t.getId(), t);
            }
            for (ThreadInfo info : infos) {
                Thread thread = (Thread)threadMap.get(info.getThreadId());
                log.info("({}) {}", (Object)(thread == null ? "Unknown" : (thread.isDaemon() ? "Daemon" : "Non-Daemon")), (Object)info.toString());
            }
        } else {
            log.info("Dump jstack ({}) is disabled.", (Object)loc);
        }
    }

    @VisibleForTesting
    public Configurable getSysConfig() {
        return DefaultConfigurableImpl.createFromConfig(ConfigFactory.parseMap(this.sysConfigOverrides).withFallback((ConfigMergeable)this.defaultSysConfig));
    }

    private void loadCoreGobblinJarsToDistributedJars() {
        this.distributeJarByClassWithPriority(State.class, 0);
        this.distributeJarByClassWithPriority(ConstructState.class, 0);
        this.distributeJarByClassWithPriority(InstrumentedExtractorBase.class, 0);
        this.distributeJarByClassWithPriority(MetricContext.class, 0);
        this.distributeJarByClassWithPriority(GobblinMetrics.class, 0);
        this.distributeJarByClassWithPriority(FsStateStore.class, 0);
        this.distributeJarByClassWithPriority(Task.class, 0);
        this.distributeJarByClassWithPriority(PathUtils.class, 0);
        this.distributeJarByClassWithPriority(ReadableInstant.class, 0);
        this.distributeJarByClassWithPriority(Escaper.class, -10);
        this.distributeJarByClassWithPriority(MetricFilter.class, 0);
        this.distributeJarByClassWithPriority(DataTemplate.class, 0);
        this.distributeJarByClassWithPriority(ClassUtils.class, 0);
        this.distributeJarByClassWithPriority(SchemaBuilder.class, 0);
        this.distributeJarByClassWithPriority(RetryListener.class, 0);
        this.distributeJarByClassWithPriority(ConfigFactory.class, 0);
        this.distributeJarByClassWithPriority(Reflections.class, 0);
        this.distributeJarByClassWithPriority(ClassFile.class, 0);
    }

    @VisibleForTesting
    protected List<String> getPrioritizedDistributedJars() {
        ArrayList jarsWithPriority = Lists.newArrayList(this.distributedJars.entrySet());
        Collections.sort(jarsWithPriority, new Comparator<Map.Entry<String, Integer>>(){

            @Override
            public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                return Integer.compare(o1.getValue(), o2.getValue());
            }
        });
        return Lists.transform((List)jarsWithPriority, (Function)new Function<Map.Entry<String, Integer>, String>(){

            public String apply(Map.Entry<String, Integer> input) {
                return input.getKey();
            }
        });
    }

    public Optional<Path> getJobFile() {
        return this.jobFile;
    }

    private static class EmbeddedJobLifecycleListener
    extends DefaultJobLifecycleListenerImpl {
        private final Lock lock = new ReentrantLock();
        private final Condition runningStateCondition = this.lock.newCondition();
        private volatile boolean running = false;
        private JobExecutionDriver jobDriver;

        public EmbeddedJobLifecycleListener(Logger log) {
            super(log);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean awaitStarted(long timeout, TimeUnit timeUnit) throws InterruptedException {
            this.lock.lock();
            try {
                long startTime = System.currentTimeMillis();
                long totalTimeMillis = timeUnit.toMillis(timeout);
                while (!this.running) {
                    boolean bl;
                    long millisLeft = totalTimeMillis - (System.currentTimeMillis() - startTime);
                    if (millisLeft < 0L) {
                        bl = false;
                        return bl;
                    }
                    bl = this.runningStateCondition.await(millisLeft, TimeUnit.MILLISECONDS);
                }
            }
            finally {
                this.lock.unlock();
            }
            return true;
        }

        @Override
        public void onJobLaunch(JobExecutionDriver jobDriver) {
            if (this.jobDriver != null) {
                throw new IllegalStateException("OnJobLaunch called when a job was already running.");
            }
            super.onJobLaunch(jobDriver);
            this.lock.lock();
            try {
                this.running = true;
                this.jobDriver = jobDriver;
                this.runningStateCondition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }

        private JobExecutionDriver getJobDriver() {
            return this.jobDriver;
        }
    }

    private static class FullTimeout {
        private final long timeout;
        private final TimeUnit timeUnit;

        public FullTimeout(long timeout, TimeUnit timeUnit) {
            this.timeout = timeout;
            this.timeUnit = timeUnit;
        }

        public long getTimeout() {
            return this.timeout;
        }

        public TimeUnit getTimeUnit() {
            return this.timeUnit;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof FullTimeout)) {
                return false;
            }
            FullTimeout other = (FullTimeout)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getTimeout() != other.getTimeout()) {
                return false;
            }
            TimeUnit this$timeUnit = this.getTimeUnit();
            TimeUnit other$timeUnit = other.getTimeUnit();
            return !(this$timeUnit == null ? other$timeUnit != null : !((Object)((Object)this$timeUnit)).equals((Object)other$timeUnit));
        }

        protected boolean canEqual(Object other) {
            return other instanceof FullTimeout;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $timeout = this.getTimeout();
            result = result * 59 + (int)($timeout >>> 32 ^ $timeout);
            TimeUnit $timeUnit = this.getTimeUnit();
            result = result * 59 + ($timeUnit == null ? 43 : ((Object)((Object)$timeUnit)).hashCode());
            return result;
        }

        public String toString() {
            return "EmbeddedGobblin.FullTimeout(timeout=" + this.getTimeout() + ", timeUnit=" + (Object)((Object)this.getTimeUnit()) + ")";
        }
    }

    public static class CliFactory
    extends ConstructorAndPublicMethodsGobblinCliFactory {
        public CliFactory() {
            super(EmbeddedGobblin.class);
        }

        public String getUsageString() {
            return "-jobName <jobName> [OPTIONS]";
        }
    }
}

