/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.runtime.job_catalog;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.JobCatalog;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.job_catalog.FSPathAlterationListenerAdaptor;
import org.apache.gobblin.runtime.job_catalog.JobCatalogBase;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PullFileLoader;
import org.apache.gobblin.util.filesystem.PathAlterationListener;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ImmutableFSJobCatalog
extends JobCatalogBase
implements JobCatalog {
    protected final FileSystem fs;
    protected final Config sysConfig;
    protected static final Logger LOGGER = LoggerFactory.getLogger(ImmutableFSJobCatalog.class);
    protected final PullFileLoader loader;
    protected final Path jobConfDirPath;
    private final JobSpecConverter converter;
    protected final PathAlterationObserverScheduler pathAlterationDetector;
    public static final String FS_CATALOG_KEY_PREFIX = "gobblin.fsJobCatalog";
    public static final String VERSION_KEY_IN_JOBSPEC = "gobblin.fsJobCatalog.version";
    public static final String DESCRIPTION_KEY_IN_JOBSPEC = "gobblin.fsJobCatalog.description";

    public ImmutableFSJobCatalog(Config sysConfig) throws IOException {
        this(sysConfig, null);
    }

    public ImmutableFSJobCatalog(GobblinInstanceEnvironment env) throws IOException {
        this(env.getSysConfig().getConfig(), null, (Optional<MetricContext>)Optional.of((Object)env.getMetricContext()), env.isInstrumentationEnabled());
    }

    public ImmutableFSJobCatalog(GobblinInstanceEnvironment env, PathAlterationObserver observer) throws IOException {
        this(env.getSysConfig().getConfig(), observer, (Optional<MetricContext>)Optional.of((Object)env.getMetricContext()), env.isInstrumentationEnabled());
    }

    public ImmutableFSJobCatalog(Config sysConfig, PathAlterationObserver observer) throws IOException {
        this(sysConfig, observer, (Optional<MetricContext>)Optional.absent(), GobblinMetrics.isEnabled((Config)sysConfig));
    }

    public ImmutableFSJobCatalog(Config sysConfig, PathAlterationObserver observer, Optional<MetricContext> parentMetricContext, boolean instrumentationEnabled) throws IOException {
        super((Optional<Logger>)Optional.of((Object)LOGGER), parentMetricContext, instrumentationEnabled, (Optional<Config>)Optional.of((Object)sysConfig));
        this.sysConfig = sysConfig;
        ConfigAccessor cfgAccessor = new ConfigAccessor(this.sysConfig);
        this.jobConfDirPath = cfgAccessor.getJobConfDirPath();
        this.fs = cfgAccessor.getJobConfDirFileSystem();
        this.loader = new PullFileLoader(this.jobConfDirPath, this.jobConfDirPath.getFileSystem(new Configuration()), cfgAccessor.getJobConfigurationFileExtensions(), (Collection)PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS);
        this.converter = new JobSpecConverter(this.jobConfDirPath, this.getInjectedExtension());
        long pollingInterval = cfgAccessor.getPollingInterval();
        if (pollingInterval == -1L) {
            this.pathAlterationDetector = null;
        } else {
            this.pathAlterationDetector = new PathAlterationObserverScheduler(pollingInterval);
            Optional observerOptional = Optional.fromNullable((Object)observer);
            this.pathAlterationDetector.addPathAlterationObserver(this.getListener(), observerOptional, this.jobConfDirPath);
        }
    }

    protected PathAlterationListener getListener() {
        return new FSPathAlterationListenerAdaptor(this.jobConfDirPath, this.loader, this.sysConfig, this.listeners, this.converter);
    }

    @Override
    protected void startUp() throws IOException {
        super.startUp();
        if (this.pathAlterationDetector != null) {
            this.pathAlterationDetector.start();
        }
    }

    @Override
    protected void shutDown() throws IOException {
        try {
            if (this.pathAlterationDetector != null) {
                this.pathAlterationDetector.stop();
            }
        }
        catch (InterruptedException exc) {
            throw new RuntimeException("Failed to stop " + ImmutableFSJobCatalog.class.getName(), exc);
        }
        super.shutDown();
    }

    public synchronized List<JobSpec> getJobs() {
        return Lists.transform((List)Lists.newArrayList((Iterable)this.loader.loadPullFilesRecursively(this.loader.getRootDirectory(), this.sysConfig, this.shouldLoadGlobalConf())), (Function)this.converter);
    }

    @Override
    public synchronized Iterator<JobSpec> getJobSpecIterator() {
        List jobFiles = this.loader.fetchJobFilesRecursively(this.loader.getRootDirectory());
        Iterator jobSpecIterator = Iterators.transform(jobFiles.iterator(), (Function)new Function<Path, JobSpec>(){

            @Nullable
            public JobSpec apply(@Nullable Path jobFile) {
                if (jobFile == null) {
                    return null;
                }
                try {
                    Config config = ImmutableFSJobCatalog.this.loader.loadPullFile(jobFile, ImmutableFSJobCatalog.this.sysConfig, ImmutableFSJobCatalog.this.shouldLoadGlobalConf());
                    return ImmutableFSJobCatalog.this.converter.apply(config);
                }
                catch (IOException e) {
                    ImmutableFSJobCatalog.this.log.error("Cannot load job from {} due to {}", (Object)jobFile, (Object)ExceptionUtils.getFullStackTrace((Throwable)e));
                    return null;
                }
            }
        });
        return jobSpecIterator;
    }

    @Override
    public synchronized JobSpec getJobSpec(URI uri) throws JobSpecNotFoundException {
        try {
            Path targetJobSpecFullPath = this.getPathForURI(this.jobConfDirPath, uri);
            return this.converter.apply(this.loader.loadPullFile(targetJobSpecFullPath, this.sysConfig, this.shouldLoadGlobalConf()));
        }
        catch (FileNotFoundException e) {
            throw new JobSpecNotFoundException(uri);
        }
        catch (IOException e) {
            throw new RuntimeException("IO exception thrown on loading single job configuration file:" + e.getMessage());
        }
    }

    public boolean shouldLoadGlobalConf() {
        return true;
    }

    protected Path getPathForURI(Path jobConfDirPath, URI uri) {
        return PathUtils.mergePaths((Path)jobConfDirPath, (Path)new Path(uri));
    }

    protected Optional<String> getInjectedExtension() {
        return Optional.absent();
    }

    public static class JobSpecConverter
    implements Function<Config, JobSpec> {
        private final Path jobConfigDirPath;
        private final Optional<String> extensionToStrip;

        public URI computeURI(Path filePath) {
            URI uri = PathUtils.relativizePath((Path)filePath, (Path)this.jobConfigDirPath).toUri();
            if (this.extensionToStrip.isPresent()) {
                uri = PathUtils.removeExtension((Path)new Path(uri), (String[])new String[]{(String)this.extensionToStrip.get()}).toUri();
            }
            return uri;
        }

        @Nullable
        public JobSpec apply(Config rawConfig) {
            URI jobConfigURI = this.computeURI(new Path(rawConfig.getString("job.config.path")));
            String version = rawConfig.hasPath(ImmutableFSJobCatalog.VERSION_KEY_IN_JOBSPEC) ? rawConfig.getString(ImmutableFSJobCatalog.VERSION_KEY_IN_JOBSPEC) : "1";
            String description = rawConfig.hasPath(ImmutableFSJobCatalog.DESCRIPTION_KEY_IN_JOBSPEC) ? rawConfig.getString(ImmutableFSJobCatalog.DESCRIPTION_KEY_IN_JOBSPEC) : "Gobblin job " + jobConfigURI;
            Config filteredConfig = rawConfig.withoutPath(ImmutableFSJobCatalog.FS_CATALOG_KEY_PREFIX);
            JobSpec.Builder builder = JobSpec.builder(jobConfigURI).withConfig(filteredConfig).withDescription(description).withVersion(version);
            if (rawConfig.hasPath("job.template")) {
                try {
                    builder.withTemplate(new URI(rawConfig.getString("job.template")));
                }
                catch (URISyntaxException e) {
                    throw new RuntimeException("Bad job template URI " + e, e);
                }
            }
            return builder.build();
        }

        public JobSpecConverter(Path jobConfigDirPath, Optional<String> extensionToStrip) {
            this.jobConfigDirPath = jobConfigDirPath;
            this.extensionToStrip = extensionToStrip;
        }
    }

    public static class ConfigAccessor {
        private final Config cfg;
        private final long pollingInterval;
        private final String jobConfDir;
        private final Path jobConfDirPath;
        private final FileSystem jobConfDirFileSystem;
        private final Set<String> JobConfigurationFileExtensions;

        public ConfigAccessor(Config cfg) {
            this.cfg = cfg;
            long l = this.pollingInterval = this.cfg.hasPath("jobconf.monitor.interval") ? this.cfg.getLong("jobconf.monitor.interval") : 30000L;
            if (this.cfg.hasPath("jobconf.fullyQualifiedPath")) {
                this.jobConfDir = this.cfg.getString("jobconf.fullyQualifiedPath");
            } else if (this.cfg.hasPath("jobconf.dir")) {
                File localJobConfigDir = new File(this.cfg.getString("jobconf.dir"));
                this.jobConfDir = "file://" + localJobConfigDir.getAbsolutePath();
            } else {
                throw new IllegalArgumentException("Expected jobconf.fullyQualifiedPath or jobconf.dir properties.");
            }
            this.jobConfDirPath = new Path(this.jobConfDir);
            try {
                this.jobConfDirFileSystem = this.jobConfDirPath.getFileSystem(new Configuration());
            }
            catch (IOException e) {
                throw new RuntimeException("Unable to detect job config directory file system: " + e, e);
            }
            this.JobConfigurationFileExtensions = ImmutableSet.copyOf((Iterable)Splitter.on((String)",").omitEmptyStrings().trimResults().split((CharSequence)this.getJobConfigurationFileExtensionsString()));
        }

        private String getJobConfigurationFileExtensionsString() {
            String propValue = this.cfg.hasPath("jobconf.extensions") ? this.cfg.getString("jobconf.extensions").toLowerCase() : "pull,job";
            return propValue;
        }

        public Config getCfg() {
            return this.cfg;
        }

        public long getPollingInterval() {
            return this.pollingInterval;
        }

        public String getJobConfDir() {
            return this.jobConfDir;
        }

        public Path getJobConfDirPath() {
            return this.jobConfDirPath;
        }

        public FileSystem getJobConfDirFileSystem() {
            return this.jobConfDirFileSystem;
        }

        public Set<String> getJobConfigurationFileExtensions() {
            return this.JobConfigurationFileExtensions;
        }
    }
}

