/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.engine.mr.steps;

import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayPrimitiveWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.DictionaryGenerator;
import org.apache.kylin.dict.DictionaryInfo;
import org.apache.kylin.dict.IDictionaryBuilder;
import org.apache.kylin.engine.mr.KylinReducer;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UHCDictionaryReducer
extends KylinReducer<SelfDefineSortableKey, NullWritable, NullWritable, BytesWritable> {
    private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryReducer.class);
    private IDictionaryBuilder builder;
    private TblColRef col;
    private MultipleOutputs mos;

    @Override
    protected void doSetup(Reducer.Context context) throws IOException {
        super.bindCurrentConfiguration(context.getConfiguration());
        Configuration conf = context.getConfiguration();
        this.mos = new MultipleOutputs((TaskInputOutputContext)context);
        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
        String cubeName = conf.get("cube.name");
        CubeInstance cube = CubeManager.getInstance((KylinConfig)config).getCube(cubeName);
        CubeDesc cubeDesc = cube.getDescriptor();
        List uhcColumns = cubeDesc.getAllUHCColumns();
        int taskId = context.getTaskAttemptID().getTaskID().getId();
        this.col = (TblColRef)uhcColumns.get(taskId);
        logger.info("column name: " + this.col.getIdentity());
        if (cube.getDescriptor().getShardByColumns().contains(this.col)) {
            this.builder = DictionaryGenerator.newDictionaryBuilder((DataType)this.col.getType());
            this.builder.init(null, 0, null);
        } else {
            String hdfsDir = conf.get("global.dict.base.dir");
            DictionaryInfo dictionaryInfo = new DictionaryInfo(this.col.getColumnDesc(), this.col.getDatatype());
            String builderClass = cubeDesc.getDictionaryBuilderClass(this.col);
            this.builder = (IDictionaryBuilder)ClassUtil.newInstance((String)builderClass);
            this.builder.init(dictionaryInfo, 0, hdfsDir);
        }
    }

    @Override
    public void doReduce(SelfDefineSortableKey skey, Iterable<NullWritable> values, Reducer.Context context) throws IOException, InterruptedException {
        Text key = skey.getText();
        String value = Bytes.toString((byte[])key.getBytes(), (int)1, (int)(key.getLength() - 1));
        this.builder.addValue(value);
    }

    @Override
    protected void doCleanup(Reducer.Context context) throws IOException, InterruptedException {
        Dictionary dict = this.builder.build();
        this.outputDict(this.col, (Dictionary<String>)dict);
    }

    private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
        String dictFileName = col.getIdentity() + "/" + col.getName() + ".rldict";
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
             DataOutputStream outputStream = new DataOutputStream((OutputStream)baos);){
            outputStream.writeUTF(dict.getClass().getName());
            dict.write((DataOutput)outputStream);
            this.mos.write("dict", (Object)NullWritable.get(), (Object)new ArrayPrimitiveWritable((Object)baos.toByteArray()), dictFileName);
        }
        this.mos.close();
    }
}

