/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.mapreduce;

import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.math3.primes.Primes;
import org.apache.gobblin.compaction.dataset.DatasetHelper;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CompactionJobConfigurator {
    private static final Logger log = LoggerFactory.getLogger(CompactionJobConfigurator.class);
    public static final String COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY = "compaction.jobConfiguratorFactory.class";
    public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS = "org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
    protected final State state;
    protected final FileSystem fs;
    protected Job configuredJob;
    protected final boolean shouldDeduplicate;
    protected Path mrOutputPath = null;
    protected boolean isJobCreated = false;
    protected Collection<Path> mapReduceInputPaths = null;
    protected Collection<String> oldFiles = null;
    protected Collection<Path> dstNewFiles = null;
    protected long fileNameRecordCount = 0L;

    public CompactionJobConfigurator(State state) throws IOException {
        this.state = state;
        this.fs = this.getFileSystem(state);
        this.shouldDeduplicate = state.getPropAsBoolean("compaction.should.deduplicate", true);
    }

    public static CompactionJobConfigurator instantiateConfigurator(State state) {
        String compactionConfiguratorFactoryClass = state.getProp(COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY, DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS);
        try {
            return Class.forName(compactionConfiguratorFactoryClass).asSubclass(ConfiguratorFactory.class).newInstance().createConfigurator(state);
        }
        catch (IOException | ReflectiveOperationException e) {
            throw new RuntimeException("Failed to instantiate a instance of job configurator:", e);
        }
    }

    public abstract String getFileExtension();

    public Job createJob(FileSystemDataset dataset) throws IOException {
        Configuration conf = HadoopUtils.getConfFromState((State)this.state);
        if (conf.get("mapreduce.output.fileoutputformat.compress") == null && conf.get("mapred.output.compress") == null) {
            conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
        }
        if (conf.get("mapreduce.job.complete.cancel.delegation.tokens") == null) {
            conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
        }
        this.addJars(conf, this.state, this.fs);
        Job job = Job.getInstance((Configuration)conf);
        job.setJobName("Gobblin MR Compaction");
        boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, dataset);
        if (emptyDirectoryFlag) {
            this.state.setProp("mapreduce.job.input.path.empty", (Object)true);
        }
        this.configureMapper(job);
        this.configureReducer(job);
        if (emptyDirectoryFlag || !this.shouldDeduplicate) {
            job.setNumReduceTasks(0);
        }
        this.configureSchema(job);
        this.isJobCreated = true;
        this.configuredJob = job;
        return job;
    }

    protected abstract void configureSchema(Job var1) throws IOException;

    protected abstract void configureMapper(Job var1);

    protected abstract void configureReducer(Job var1) throws IOException;

    protected FileSystem getFileSystem(State state) throws IOException {
        Configuration conf = HadoopUtils.getConfFromState((State)state);
        String uri = state.getProp("source.filebased.fs.uri", "file:///");
        return FileSystem.get((URI)URI.create(uri), (Configuration)conf);
    }

    protected void setNumberOfReducers(Job job) throws IOException {
        long inputSize = 0L;
        for (Path inputPath : this.mapReduceInputPaths) {
            inputSize += this.fs.getContentSummary(inputPath).getLength();
        }
        long targetFileSize = this.state.getPropAsLong("compaction.job.target.output.file.size", 0x20000000L);
        int maxNumReducers = this.state.getPropAsInt("compaction.job.max.num.reducers", 3600);
        int numReducers = Math.min(Ints.checkedCast((long)(inputSize / targetFileSize)) + 1, maxNumReducers);
        boolean usePrimeReducers = this.state.getPropAsBoolean("compaction.job.use.prime.reducers", true);
        if (usePrimeReducers && numReducers != 1) {
            numReducers = Primes.nextPrime((int)numReducers);
        }
        job.setNumReduceTasks(numReducers);
    }

    protected void addJars(Configuration conf, State state, FileSystem fs) throws IOException {
        if (!state.contains("compaction.jars")) {
            return;
        }
        Path jarFileDir = new Path(state.getProp("compaction.jars"));
        for (FileStatus status : fs.listStatus(jarFileDir)) {
            DistributedCache.addFileToClassPath((Path)status.getPath(), (Configuration)conf, (FileSystem)fs);
        }
    }

    protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException {
        boolean emptyDirectoryFlag = false;
        String mrOutputBase = this.state.getProp("compaction.tmp.job.dir");
        CompactionPathParser parser = new CompactionPathParser(this.state);
        CompactionPathParser.CompactionParserResult rst = parser.parse(dataset);
        this.mrOutputPath = this.concatPaths(mrOutputBase, rst.getDatasetName(), rst.getDstSubDir(), rst.getTimeString());
        if (this.state.contains("gobblin.useDatasetLocalWorkDir")) {
            mrOutputBase = this.state.getProp("compaction.dest.dir");
            this.mrOutputPath = this.concatPaths(mrOutputBase, rst.getDatasetName(), ".temp", rst.getDstSubDir(), rst.getTimeString());
        }
        log.info("Cleaning temporary MR output directory: " + this.mrOutputPath);
        this.fs.delete(this.mrOutputPath, true);
        this.mapReduceInputPaths = this.getGranularInputPaths(dataset.datasetRoot());
        if (this.mapReduceInputPaths.isEmpty()) {
            this.mapReduceInputPaths.add(dataset.datasetRoot());
            emptyDirectoryFlag = true;
        }
        this.oldFiles = new HashSet<String>();
        for (Path path : this.mapReduceInputPaths) {
            this.oldFiles.add(this.fs.makeQualified(path).toString());
            FileInputFormat.addInputPath((Job)job, (Path)path);
        }
        FileOutputFormat.setOutputPath((Job)job, (Path)this.mrOutputPath);
        return emptyDirectoryFlag;
    }

    private Path concatPaths(String ... names) {
        if (names == null || names.length == 0) {
            return null;
        }
        Path cur = new Path(names[0]);
        for (int i = 1; i < names.length; ++i) {
            cur = new Path(cur, new Path(names[i]));
        }
        return cur;
    }

    protected Collection<Path> getGranularInputPaths(Path path) throws IOException {
        boolean appendDelta = this.state.getPropAsBoolean("compaction.rename.source.dir.enabled", false);
        HashSet uncompacted = Sets.newHashSet();
        HashSet total = Sets.newHashSet();
        for (FileStatus fileStatus : FileListUtils.listFilesRecursively((FileSystem)this.fs, (Path)path)) {
            if (appendDelta) {
                if (!fileStatus.getPath().getParent().toString().endsWith("_COMPLETE")) {
                    uncompacted.add(fileStatus.getPath().getParent());
                }
                total.add(fileStatus.getPath().getParent());
                continue;
            }
            uncompacted.add(fileStatus.getPath().getParent());
        }
        if (appendDelta) {
            this.fileNameRecordCount = new InputRecordCountHelper(this.state).calculateRecordCount(total);
            log.info("{} has total input record count (based on file name) {}", (Object)path, (Object)this.fileNameRecordCount);
        }
        return uncompacted;
    }

    private static List<org.apache.hadoop.mapreduce.TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
        LinkedList<org.apache.hadoop.mapreduce.TaskCompletionEvent> completionEvents = new LinkedList<org.apache.hadoop.mapreduce.TaskCompletionEvent>();
        try {
            TaskCompletionEvent[] bunchOfEvents;
            while ((bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size())) != null && bunchOfEvents.length != 0) {
                completionEvents.addAll(Arrays.asList(bunchOfEvents));
            }
        }
        catch (IOException e) {
        }
        return completionEvents;
    }

    private static List<org.apache.hadoop.mapreduce.TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
        return CompactionJobConfigurator.getAllTaskCompletionEvent(completedJob).stream().filter(te -> te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED).collect(Collectors.toList());
    }

    private static boolean isFailedPath(Path path, List<org.apache.hadoop.mapreduce.TaskCompletionEvent> failedEvents) {
        return path.toString().contains("_temporary") || failedEvents.stream().anyMatch(event -> path.toString().contains("/" + event.getTaskAttemptId().toString() + "/"));
    }

    public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs, List<String> acceptableExtension) throws IOException {
        List<org.apache.hadoop.mapreduce.TaskCompletionEvent> failedEvents = CompactionJobConfigurator.getUnsuccessfulTaskCompletionEvent(job);
        List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, acceptableExtension);
        ArrayList<Path> goodPaths = new ArrayList<Path>();
        for (Path filePath : allFilePaths) {
            if (CompactionJobConfigurator.isFailedPath(filePath, failedEvents)) {
                fs.delete(filePath, false);
                log.error("{} is a bad path so it was deleted", (Object)filePath);
                continue;
            }
            goodPaths.add(filePath);
        }
        return goodPaths;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public Job getConfiguredJob() {
        return this.configuredJob;
    }

    public boolean isShouldDeduplicate() {
        return this.shouldDeduplicate;
    }

    public Path getMrOutputPath() {
        return this.mrOutputPath;
    }

    public boolean isJobCreated() {
        return this.isJobCreated;
    }

    public Collection<Path> getMapReduceInputPaths() {
        return this.mapReduceInputPaths;
    }

    public Collection<String> getOldFiles() {
        return this.oldFiles;
    }

    public Collection<Path> getDstNewFiles() {
        return this.dstNewFiles;
    }

    public void setDstNewFiles(Collection<Path> dstNewFiles) {
        this.dstNewFiles = dstNewFiles;
    }

    public long getFileNameRecordCount() {
        return this.fileNameRecordCount;
    }

    public static interface ConfiguratorFactory {
        public CompactionJobConfigurator createConfigurator(State var1) throws IOException;
    }

    protected static enum EXTENSION {
        AVRO("avro"),
        ORC("orc");

        private String extensionString;

        public String getExtensionString() {
            return this.extensionString;
        }

        private EXTENSION(String extensionString) {
            this.extensionString = extensionString;
        }
    }
}

