/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.mapreduce.avro;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Enums;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.commons.io.FilenameUtils;
import org.apache.gobblin.compaction.dataset.Dataset;
import org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper;
import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat;
import org.apache.gobblin.util.AvroUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MRCompactorAvroKeyDedupJobRunner
extends MRCompactorJobRunner {
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactorAvroKeyDedupJobRunner.class);
    private static final String COMPACTION_JOB_PREFIX = "compaction.job.";
    public static final String COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA = "compaction.job.avro.single.input.schema";
    public static final String COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC = "compaction.job.avro.key.schema.loc";
    public static final String COMPACTION_JOB_DEDUP_KEY = "compaction.job.dedup.key";
    public static final String COMPACTION_JOB_KEY_FIELD_BLACKLIST = "compaction.job.key.fieldBlacklist";
    private static final String AVRO = "avro";
    private static final String SCHEMA_DEDUP_FIELD_ANNOTATOR = "primarykey";
    public static final DedupKeyOption DEFAULT_DEDUP_KEY_OPTION = DedupKeyOption.KEY;
    private final boolean useSingleInputSchema;

    public MRCompactorAvroKeyDedupJobRunner(Dataset dataset, FileSystem fs) {
        super(dataset, fs);
        this.useSingleInputSchema = this.dataset.jobProps().getPropAsBoolean(COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true);
    }

    @Override
    protected void configureJob(Job job) throws IOException {
        super.configureJob(job);
        this.configureSchema(job);
    }

    private void configureSchema(Job job) throws IOException {
        Schema newestSchema = MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
        if (this.useSingleInputSchema) {
            AvroJob.setInputKeySchema((Job)job, (Schema)newestSchema);
        }
        AvroJob.setMapOutputKeySchema((Job)job, (Schema)(this.shouldDeduplicate ? this.getKeySchema(job, newestSchema) : newestSchema));
        AvroJob.setMapOutputValueSchema((Job)job, (Schema)newestSchema);
        AvroJob.setOutputKeySchema((Job)job, (Schema)newestSchema);
    }

    @VisibleForTesting
    Schema getKeySchema(Job job, Schema topicSchema) throws IOException {
        Schema keySchema = null;
        DedupKeyOption dedupKeyOption = this.getDedupKeyOption();
        if (dedupKeyOption == DedupKeyOption.ALL) {
            LOG.info("Using all attributes in the schema (except Map, Arrar and Enum fields) for compaction");
            keySchema = (Schema)AvroUtils.removeUncomparableFields((Schema)topicSchema).get();
        } else if (dedupKeyOption == DedupKeyOption.KEY) {
            LOG.info("Using key attributes in the schema for compaction");
            keySchema = (Schema)AvroUtils.removeUncomparableFields((Schema)MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
        } else if (this.keySchemaFileSpecified()) {
            Path keySchemaFile = this.getKeySchemaFile();
            LOG.info("Using attributes specified in schema file " + keySchemaFile + " for compaction");
            try {
                keySchema = AvroUtils.parseSchemaFromFile((Path)keySchemaFile, (FileSystem)this.fs);
            }
            catch (IOException e) {
                LOG.error("Failed to parse avro schema from " + keySchemaFile + ", using key attributes in the schema for compaction");
                keySchema = (Schema)AvroUtils.removeUncomparableFields((Schema)MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
            }
            if (!MRCompactorAvroKeyDedupJobRunner.isKeySchemaValid(keySchema, topicSchema)) {
                LOG.warn(String.format("Key schema %s is not compatible with record schema %s.", keySchema, topicSchema) + "Using key attributes in the schema for compaction");
                keySchema = (Schema)AvroUtils.removeUncomparableFields((Schema)MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
            }
        } else {
            LOG.info("Property compaction.job.avro.key.schema.loc not provided. Using key attributes in the schema for compaction");
            keySchema = (Schema)AvroUtils.removeUncomparableFields((Schema)MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
        }
        return keySchema;
    }

    public static Schema getKeySchema(Schema topicSchema) {
        Preconditions.checkArgument((topicSchema.getType() == Schema.Type.RECORD ? 1 : 0) != 0);
        Optional<Schema> newSchema = MRCompactorAvroKeyDedupJobRunner.getKeySchemaFromRecord(topicSchema);
        if (newSchema.isPresent()) {
            return (Schema)newSchema.get();
        }
        LOG.warn(String.format("No field in the schema of %s is annotated as primarykey. Using all fields for deduping", topicSchema.getName()));
        return topicSchema;
    }

    public static Optional<Schema> getKeySchema(Schema.Field field) {
        switch (field.schema().getType()) {
            case RECORD: {
                return MRCompactorAvroKeyDedupJobRunner.getKeySchemaFromRecord(field.schema());
            }
        }
        if (field.doc() != null && field.doc().toLowerCase().endsWith(SCHEMA_DEDUP_FIELD_ANNOTATOR)) {
            return Optional.of((Object)field.schema());
        }
        return Optional.absent();
    }

    public static Optional<Schema> getKeySchemaFromRecord(Schema record) {
        Preconditions.checkArgument((record.getType() == Schema.Type.RECORD ? 1 : 0) != 0);
        ArrayList fields = Lists.newArrayList();
        for (Schema.Field field : record.getFields()) {
            Optional<Schema> newFieldSchema = MRCompactorAvroKeyDedupJobRunner.getKeySchema(field);
            if (!newFieldSchema.isPresent()) continue;
            fields.add(new Schema.Field(field.name(), (Schema)newFieldSchema.get(), field.doc(), field.defaultValue()));
        }
        if (!fields.isEmpty()) {
            Schema newSchema = Schema.createRecord((String)record.getName(), (String)record.getDoc(), (String)record.getName(), (boolean)false);
            newSchema.setFields((List)fields);
            return Optional.of((Object)newSchema);
        }
        return Optional.absent();
    }

    public static boolean isKeySchemaValid(Schema keySchema, Schema topicSchema) {
        return SchemaCompatibility.checkReaderWriterCompatibility((Schema)keySchema, (Schema)topicSchema).getType().equals((Object)SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
    }

    public static Schema getNewestSchemaFromSource(Job job, FileSystem fs) throws IOException {
        Path[] sourceDirs = FileInputFormat.getInputPaths((JobContext)job);
        ArrayList<FileStatus> files = new ArrayList<FileStatus>();
        for (Path sourceDir : sourceDirs) {
            files.addAll(Arrays.asList(fs.listStatus(sourceDir)));
        }
        Collections.sort(files, new LastModifiedDescComparator());
        for (FileStatus file : files) {
            Schema schema = MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(file.getPath(), fs);
            if (schema == null) continue;
            return schema;
        }
        return null;
    }

    public static Schema getNewestSchemaFromSource(Path sourceDir, FileSystem fs) throws IOException {
        FileStatus[] files = fs.listStatus(sourceDir);
        Arrays.sort(files, new LastModifiedDescComparator());
        for (FileStatus status : files) {
            if (status.isDirectory()) {
                Schema schema = MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(status.getPath(), fs);
                if (schema == null) continue;
                return schema;
            }
            if (!FilenameUtils.isExtension((String)status.getPath().getName(), (String)AVRO)) continue;
            return AvroUtils.getSchemaFromDataFile((Path)status.getPath(), (FileSystem)fs);
        }
        return null;
    }

    private DedupKeyOption getDedupKeyOption() {
        if (!this.dataset.jobProps().contains(COMPACTION_JOB_DEDUP_KEY)) {
            return DEFAULT_DEDUP_KEY_OPTION;
        }
        Optional option = Enums.getIfPresent(DedupKeyOption.class, (String)this.dataset.jobProps().getProp(COMPACTION_JOB_DEDUP_KEY).toUpperCase());
        return option.isPresent() ? (DedupKeyOption)((Object)option.get()) : DEFAULT_DEDUP_KEY_OPTION;
    }

    private boolean keySchemaFileSpecified() {
        return this.dataset.jobProps().contains(COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC);
    }

    private Path getKeySchemaFile() {
        return new Path(this.dataset.jobProps().getProp(COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC));
    }

    @Override
    protected void setInputFormatClass(Job job) {
        job.setInputFormatClass(AvroKeyRecursiveCombineFileInputFormat.class);
    }

    @Override
    protected void setMapperClass(Job job) {
        job.setMapperClass(AvroKeyMapper.class);
    }

    @Override
    protected void setMapOutputKeyClass(Job job) {
        job.setMapOutputKeyClass(AvroKey.class);
    }

    @Override
    protected void setMapOutputValueClass(Job job) {
        job.setMapOutputValueClass(AvroValue.class);
    }

    @Override
    protected void setOutputFormatClass(Job job) {
        job.setOutputFormatClass(AvroKeyCompactorOutputFormat.class);
    }

    @Override
    protected void setReducerClass(Job job) {
        job.setReducerClass(AvroKeyDedupReducer.class);
    }

    @Override
    protected void setOutputKeyClass(Job job) {
        job.setOutputKeyClass(AvroKey.class);
    }

    @Override
    protected void setOutputValueClass(Job job) {
        job.setOutputValueClass(NullWritable.class);
    }

    @Override
    protected Collection<String> getApplicableFileExtensions() {
        return Lists.newArrayList((Object[])new String[]{AVRO});
    }

    public static class LastModifiedDescComparator
    implements Comparator<FileStatus>,
    Serializable {
        private static final long serialVersionUID = 1L;

        @Override
        public int compare(FileStatus fs1, FileStatus fs2) {
            if (fs2.getModificationTime() < fs1.getModificationTime()) {
                return -1;
            }
            if (fs2.getModificationTime() > fs1.getModificationTime()) {
                return 1;
            }
            return 0;
        }
    }

    public static enum DedupKeyOption {
        ALL,
        KEY,
        CUSTOM;

    }
}

