/*
 * Decompiled with CFR 0.152.
 */
package org.apache.gobblin.compaction.mapreduce.avro;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import java.io.IOException;
import java.util.Comparator;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.gobblin.compaction.mapreduce.avro.AvroDeltaFieldNameProvider;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class AvroKeyDedupReducer
extends Reducer<AvroKey<GenericRecord>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
    public static final String DELTA_SCHEMA_PROVIDER = "org.apache.gobblin.compaction." + AvroKeyDedupReducer.class.getSimpleName() + ".deltaFieldsProvider";
    private AvroKey<GenericRecord> outKey;
    private Optional<AvroValueDeltaSchemaComparator> deltaComparatorOptional;
    private AvroDeltaFieldNameProvider deltaFieldNamesProvider;

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        this.outKey = new AvroKey();
        this.deltaComparatorOptional = Optional.absent();
        Configuration conf = context.getConfiguration();
        String deltaSchemaProviderClassName = conf.get(DELTA_SCHEMA_PROVIDER);
        if (deltaSchemaProviderClassName != null) {
            this.deltaFieldNamesProvider = (AvroDeltaFieldNameProvider)GobblinConstructorUtils.invokeConstructor(AvroDeltaFieldNameProvider.class, (String)deltaSchemaProviderClassName, (Object[])new Object[]{conf});
            this.deltaComparatorOptional = Optional.of((Object)new AvroValueDeltaSchemaComparator(this.deltaFieldNamesProvider));
        }
    }

    protected void reduce(AvroKey<GenericRecord> key, Iterable<AvroValue<GenericRecord>> values, Reducer.Context context) throws IOException, InterruptedException {
        int numVals = 0;
        AvroValue<GenericRecord> valueToRetain = null;
        for (AvroValue<GenericRecord> value : values) {
            if (valueToRetain == null) {
                valueToRetain = value;
            } else if (this.deltaComparatorOptional.isPresent()) {
                valueToRetain = ((AvroValueDeltaSchemaComparator)this.deltaComparatorOptional.get()).compare(valueToRetain, value) >= 0 ? valueToRetain : value;
            }
            ++numVals;
        }
        this.outKey.datum(valueToRetain.datum());
        if (numVals > 1) {
            context.getCounter((Enum)EVENT_COUNTER.MORE_THAN_1).increment(1L);
            context.getCounter((Enum)EVENT_COUNTER.DEDUPED).increment((long)(numVals - 1));
        }
        context.getCounter((Enum)EVENT_COUNTER.RECORD_COUNT).increment(1L);
        context.write(this.outKey, (Object)NullWritable.get());
    }

    @VisibleForTesting
    protected AvroKey<GenericRecord> getOutKey() {
        return this.outKey;
    }

    @VisibleForTesting
    protected static class AvroValueDeltaSchemaComparator
    implements Comparator<AvroValue<GenericRecord>> {
        private final AvroDeltaFieldNameProvider deltaSchemaProvider;

        public AvroValueDeltaSchemaComparator(AvroDeltaFieldNameProvider provider) {
            this.deltaSchemaProvider = provider;
        }

        @Override
        public int compare(AvroValue<GenericRecord> o1, AvroValue<GenericRecord> o2) {
            GenericRecord record1 = (GenericRecord)o1.datum();
            GenericRecord record2 = (GenericRecord)o2.datum();
            for (String deltaFieldName : this.deltaSchemaProvider.getDeltaFieldNames(record1)) {
                if (record1.get(deltaFieldName).equals(record2.get(deltaFieldName))) continue;
                return ((Comparable)record1.get(deltaFieldName)).compareTo(record2.get(deltaFieldName));
            }
            return 0;
        }
    }

    public static enum EVENT_COUNTER {
        MORE_THAN_1,
        DEDUPED,
        RECORD_COUNT;

    }
}

