/*
 * Decompiled with CFR 0.152.
 */
package org.apache.doris.load.loadv2.dpp;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.dpp.AggregateReduceFunction;
import org.apache.doris.load.loadv2.dpp.BigIntParser;
import org.apache.doris.load.loadv2.dpp.BucketComparator;
import org.apache.doris.load.loadv2.dpp.BucketPartitioner;
import org.apache.doris.load.loadv2.dpp.ColumnParser;
import org.apache.doris.load.loadv2.dpp.DecimalParser;
import org.apache.doris.load.loadv2.dpp.DefaultSparkRDDAggregator;
import org.apache.doris.load.loadv2.dpp.DorisRangePartitioner;
import org.apache.doris.load.loadv2.dpp.DppColumns;
import org.apache.doris.load.loadv2.dpp.DppResult;
import org.apache.doris.load.loadv2.dpp.DppUtils;
import org.apache.doris.load.loadv2.dpp.EncodeBaseAggregateTableFunction;
import org.apache.doris.load.loadv2.dpp.EncodeRollupAggregateTableFunction;
import org.apache.doris.load.loadv2.dpp.MinimumCoverageRollupTreeBuilder;
import org.apache.doris.load.loadv2.dpp.RollupTreeNode;
import org.apache.doris.load.loadv2.dpp.SparkRDDAggregator;
import org.apache.doris.load.loadv2.dpp.StringAccumulator;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.spark.Partitioner;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.AccumulatorV2;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public final class SparkDpp
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(SparkDpp.class);
    private static final String NULL_FLAG = "\\N";
    private static final String DPP_RESULT_FILE = "dpp_result.json";
    private static final String BITMAP_TYPE = "bitmap";
    private SparkSession spark = null;
    private EtlJobConfig etlJobConfig = null;
    private LongAccumulator abnormalRowAcc = null;
    private LongAccumulator scannedRowsAcc = null;
    private LongAccumulator fileNumberAcc = null;
    private LongAccumulator fileSizeAcc = null;
    private Map<String, Integer> bucketKeyMap = new HashMap<String, Integer>();
    private StringAccumulator invalidRows = new StringAccumulator();
    private SerializableConfiguration serializableHadoopConf;
    private DppResult dppResult = new DppResult();
    Map<Long, Set<String>> tableToBitmapDictColumns = new HashMap<Long, Set<String>>();
    Map<Long, Set<String>> tableToBinaryBitmapColumns = new HashMap<Long, Set<String>>();

    public SparkDpp() {
    }

    public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig, Map<Long, Set<String>> tableToBitmapDictColumns, Map<Long, Set<String>> tableToBinaryBitmapColumns) {
        this.spark = spark;
        this.etlJobConfig = etlJobConfig;
        if (tableToBitmapDictColumns != null) {
            this.tableToBitmapDictColumns = tableToBitmapDictColumns;
        }
        if (tableToBinaryBitmapColumns != null) {
            this.tableToBinaryBitmapColumns = tableToBinaryBitmapColumns;
        }
    }

    public void init() {
        this.abnormalRowAcc = this.spark.sparkContext().longAccumulator("abnormalRowAcc");
        this.scannedRowsAcc = this.spark.sparkContext().longAccumulator("scannedRowsAcc");
        this.fileNumberAcc = this.spark.sparkContext().longAccumulator("fileNumberAcc");
        this.fileSizeAcc = this.spark.sparkContext().longAccumulator("fileSizeAcc");
        this.spark.sparkContext().register((AccumulatorV2)this.invalidRows, "InvalidRowsAccumulator");
        this.serializableHadoopConf = new SerializableConfiguration(this.spark.sparkContext().hadoopConfiguration());
    }

    private JavaPairRDD<List<Object>, Object[]> processRDDAggregate(JavaPairRDD<List<Object>, Object[]> currentPairRDD, RollupTreeNode curNode, SparkRDDAggregator[] sparkRDDAggregators) throws SparkDppException {
        boolean isDuplicateTable;
        boolean bl = isDuplicateTable = !StringUtils.equalsIgnoreCase((CharSequence)curNode.indexMeta.indexType, (CharSequence)"AGGREGATE") && !StringUtils.equalsIgnoreCase((CharSequence)curNode.indexMeta.indexType, (CharSequence)"UNIQUE");
        if (!isDuplicateTable) {
            int idx = 0;
            for (int i = 0; i < curNode.indexMeta.columns.size(); ++i) {
                if (curNode.indexMeta.columns.get((int)i).isKey) continue;
                sparkRDDAggregators[idx] = SparkRDDAggregator.buildAggregator(curNode.indexMeta.columns.get(i));
                ++idx;
            }
            if (curNode.indexMeta.isBaseIndex) {
                JavaPairRDD result = currentPairRDD.mapToPair((PairFunction)new EncodeBaseAggregateTableFunction(sparkRDDAggregators)).reduceByKey((Function2)new AggregateReduceFunction(sparkRDDAggregators));
                return result;
            }
            JavaPairRDD result = currentPairRDD.mapToPair((PairFunction)new EncodeRollupAggregateTableFunction(this.getColumnIndexInParentRollup(curNode.keyColumnNames, curNode.valueColumnNames, curNode.parent.keyColumnNames, curNode.parent.valueColumnNames))).reduceByKey((Function2)new AggregateReduceFunction(sparkRDDAggregators));
            return result;
        }
        int idx = 0;
        for (int i = 0; i < curNode.indexMeta.columns.size(); ++i) {
            if (curNode.indexMeta.columns.get((int)i).isKey) continue;
            sparkRDDAggregators[idx] = new DefaultSparkRDDAggregator();
            ++idx;
        }
        if (curNode.indexMeta.isBaseIndex) {
            return currentPairRDD;
        }
        return currentPairRDD.mapToPair((PairFunction)new EncodeRollupAggregateTableFunction(this.getColumnIndexInParentRollup(curNode.keyColumnNames, curNode.valueColumnNames, curNode.parent.keyColumnNames, curNode.parent.valueColumnNames)));
    }

    private void writeRepartitionAndSortedRDDToParquet(JavaPairRDD<List<Object>, Object[]> resultRDD, String pathPattern, long tableId, EtlJobConfig.EtlIndex indexMeta, SparkRDDAggregator[] sparkRDDAggregators) {
        StructType dstSchema = DppUtils.createDstTableSchema(indexMeta.columns, false, true);
        ExpressionEncoder encoder = RowEncoder.apply((StructType)dstSchema);
        resultRDD.repartitionAndSortWithinPartitions((Partitioner)new BucketPartitioner(this.bucketKeyMap), (Comparator)new BucketComparator()).foreachPartition((VoidFunction & Serializable)t -> {
            Configuration conf = new Configuration(this.serializableHadoopConf.value());
            FileSystem fs = FileSystem.get((URI)URI.create(this.etlJobConfig.outputPath), (Configuration)conf);
            String lastBucketKey = null;
            ParquetWriter parquetWriter = null;
            TaskContext taskContext = TaskContext.get();
            long taskAttemptId = taskContext.taskAttemptId();
            String dstPath = "";
            String tmpPath = "";
            while (t.hasNext()) {
                int i;
                Tuple2 pair = (Tuple2)t.next();
                List keyColumns = (List)pair._1();
                Object[] valueColumns = (Object[])pair._2();
                if (keyColumns.size() + valueColumns.length <= 1) {
                    LOG.warn("invalid row:" + pair);
                    continue;
                }
                String curBucketKey = keyColumns.get(0).toString();
                ArrayList<Object> columnObjects = new ArrayList<Object>();
                for (i = 1; i < keyColumns.size(); ++i) {
                    columnObjects.add(keyColumns.get(i));
                }
                for (i = 0; i < valueColumns.length; ++i) {
                    columnObjects.add(sparkRDDAggregators[i].finalize(valueColumns[i]));
                }
                Row rowWithoutBucketKey = RowFactory.create((Object[])columnObjects.toArray());
                if (lastBucketKey == null || !curBucketKey.equals(lastBucketKey)) {
                    String[] bucketKey;
                    if (parquetWriter != null) {
                        parquetWriter.close();
                        try {
                            fs.rename(new Path(tmpPath), new Path(dstPath));
                        }
                        catch (IOException ioe) {
                            LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:" + ioe);
                            throw ioe;
                        }
                    }
                    if ((bucketKey = curBucketKey.split("_")).length != 2) {
                        LOG.warn("invalid bucket key:" + curBucketKey);
                        continue;
                    }
                    long partitionId = Long.parseLong(bucketKey[0]);
                    int bucketId = Integer.parseInt(bucketKey[1]);
                    dstPath = String.format(pathPattern, tableId, partitionId, indexMeta.indexId, bucketId, indexMeta.schemaHash);
                    tmpPath = dstPath + "." + taskAttemptId;
                    conf.setBoolean("spark.sql.parquet.writeLegacyFormat", false);
                    conf.setBoolean("spark.sql.parquet.int64AsTimestampMillis", false);
                    conf.setBoolean("spark.sql.parquet.int96AsTimestamp", true);
                    conf.setBoolean("spark.sql.parquet.binaryAsString", false);
                    conf.set("spark.sql.parquet.outputTimestampType", "INT96");
                    ParquetWriteSupport.setSchema((StructType)dstSchema, (Configuration)conf);
                    ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
                    parquetWriter = new ParquetWriter(new Path(tmpPath), (WriteSupport)parquetWriteSupport, CompressionCodecName.SNAPPY, 0x10000000, 16384, 0x100000, true, false, ParquetProperties.WriterVersion.PARQUET_1_0, conf);
                    if (parquetWriter != null) {
                        LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
                    }
                    lastBucketKey = curBucketKey;
                }
                InternalRow internalRow = encoder.toRow((Object)rowWithoutBucketKey);
                parquetWriter.write((Object)internalRow);
            }
            if (parquetWriter != null) {
                parquetWriter.close();
                try {
                    fs.rename(new Path(tmpPath), new Path(dstPath));
                }
                catch (IOException ioe) {
                    LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:" + ioe);
                    throw ioe;
                }
            }
        });
    }

    private void processRollupTree(RollupTreeNode rootNode, JavaPairRDD<List<Object>, Object[]> rootRDD, long tableId, EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {
        LinkedList<RollupTreeNode> nodeQueue = new LinkedList<RollupTreeNode>();
        nodeQueue.offer(rootNode);
        int currentLevel = 0;
        HashMap<Long, JavaPairRDD<List<Object>, Object[]>> parentRDDMap = new HashMap<Long, JavaPairRDD<List<Object>, Object[]>>();
        parentRDDMap.put(baseIndex.indexId, rootRDD);
        HashMap<Long, JavaPairRDD<List<Object>, Object[]>> childrenRDDMap = new HashMap<Long, JavaPairRDD<List<Object>, Object[]>>();
        String pathPattern = this.etlJobConfig.outputPath + "/" + this.etlJobConfig.outputFilePattern;
        while (!nodeQueue.isEmpty()) {
            RollupTreeNode curNode = (RollupTreeNode)nodeQueue.poll();
            LOG.info("start to process index:" + curNode.indexId);
            if (curNode.children != null) {
                for (RollupTreeNode rollupTreeNode : curNode.children) {
                    nodeQueue.offer(rollupTreeNode);
                }
            }
            JavaPairRDD<List<Object>, Object[]> curRDD = null;
            if (curNode.level != currentLevel) {
                for (JavaPairRDD rdd : parentRDDMap.values()) {
                    rdd.unpersist();
                }
                currentLevel = curNode.level;
                parentRDDMap.clear();
                parentRDDMap = childrenRDDMap;
                childrenRDDMap = new HashMap();
            }
            long l = baseIndex.indexId;
            if (curNode.parent != null) {
                l = curNode.parent.indexId;
            }
            JavaPairRDD parentRDD = (JavaPairRDD)parentRDDMap.get(l);
            SparkRDDAggregator[] sparkRDDAggregators = new SparkRDDAggregator[curNode.valueColumnNames.size()];
            curRDD = this.processRDDAggregate((JavaPairRDD<List<Object>, Object[]>)parentRDD, curNode, sparkRDDAggregators);
            childrenRDDMap.put(curNode.indexId, curRDD);
            if (curNode.children != null && curNode.children.size() > 1) {
                curRDD.persist(StorageLevel.MEMORY_AND_DISK());
            }
            this.writeRepartitionAndSortedRDDToParquet(curRDD, pathPattern, tableId, curNode.indexMeta, sparkRDDAggregators);
        }
    }

    private Pair<Integer[], Integer[]> getColumnIndexInParentRollup(List<String> childRollupKeyColumns, List<String> childRollupValueColumns, List<String> parentRollupKeyColumns, List<String> parentRollupValueColumns) throws SparkDppException {
        int j;
        int i;
        ArrayList<Integer> keyMap = new ArrayList<Integer>();
        ArrayList<Integer> valueMap = new ArrayList<Integer>();
        block0: for (i = 0; i < childRollupKeyColumns.size(); ++i) {
            for (j = 0; j < parentRollupKeyColumns.size(); ++j) {
                if (!StringUtils.equalsIgnoreCase((CharSequence)childRollupKeyColumns.get(i), (CharSequence)parentRollupKeyColumns.get(j))) continue;
                keyMap.add(j);
                continue block0;
            }
        }
        block2: for (i = 0; i < childRollupValueColumns.size(); ++i) {
            for (j = 0; j < parentRollupValueColumns.size(); ++j) {
                if (!StringUtils.equalsIgnoreCase((CharSequence)childRollupValueColumns.get(i), (CharSequence)parentRollupValueColumns.get(j))) continue;
                valueMap.add(j);
                continue block2;
            }
        }
        if (keyMap.size() != childRollupKeyColumns.size() || valueMap.size() != childRollupValueColumns.size()) {
            throw new SparkDppException(String.format("column map index from child to parent has error, key size src: %s, dst: %s; value size src: %s, dst: %s", childRollupKeyColumns.size(), keyMap.size(), childRollupValueColumns.size(), valueMap.size()));
        }
        return Pair.of((Object)keyMap.toArray(new Integer[keyMap.size()]), (Object)valueMap.toArray(new Integer[valueMap.size()]));
    }

    public boolean validateData(Object srcValue, EtlJobConfig.EtlColumn etlColumn, ColumnParser columnParser, Row row) {
        switch (etlColumn.columnType.toUpperCase()) {
            case "DECIMALV2": 
            case "DECIMAL32": 
            case "DECIMAL64": 
            case "DECIMAL128": {
                DecimalParser decimalParser = (DecimalParser)columnParser;
                BigDecimal srcBigDecimal = (BigDecimal)srcValue;
                if (srcValue == null || decimalParser.getMaxValue().compareTo(srcBigDecimal) >= 0 && decimalParser.getMinValue().compareTo(srcBigDecimal) <= 0) break;
                LOG.warn(String.format("decimal value is not valid for defination, column=%s, value=%s,precision=%s,scale=%s", etlColumn.columnName, srcValue, srcBigDecimal.precision(), srcBigDecimal.scale()));
                return false;
            }
            case "CHAR": 
            case "VARCHAR": {
                int strSize = 0;
                if (srcValue == null || (strSize = srcValue.toString().getBytes(StandardCharsets.UTF_8).length) <= etlColumn.stringLength) break;
                LOG.warn(String.format("the length of input is too long than schema. column_name:%s,input_str[%s],schema length:%s,actual length:%s", etlColumn.columnName, row.toString(), etlColumn.stringLength, strSize));
                return false;
            }
            case "STRING": 
            case "TEXT": {
                int strDataSize = 0;
                if (srcValue == null || (strDataSize = srcValue.toString().getBytes(StandardCharsets.UTF_8).length) <= 0x100000) break;
                LOG.warn(String.format("The string type is limited to a maximum of %s bytes. column_name:%s,input_str[%s],actual length:%s", 0x100000, etlColumn.columnName, row.toString(), strDataSize));
                return false;
            }
            default: {
                return true;
            }
        }
        return true;
    }

    private JavaPairRDD<List<Object>, Object[]> fillTupleWithPartitionColumn(Dataset<Row> dataframe, EtlJobConfig.EtlPartitionInfo partitionInfo, List<Integer> partitionKeyIndex, List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys, List<String> keyAndPartitionColumnNames, List<String> valueColumnNames, StructType dstTableSchema, EtlJobConfig.EtlIndex baseIndex, List<Long> validPartitionIds) throws SparkDppException {
        int i;
        List<String> distributeColumns = partitionInfo.distributionColumnRefs;
        DorisRangePartitioner partitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndex, partitionRangeKeys);
        HashSet<Integer> validPartitionIndex = new HashSet<Integer>();
        if (validPartitionIds == null) {
            for (i = 0; i < partitionInfo.partitions.size(); ++i) {
                validPartitionIndex.add(i);
            }
        } else {
            for (i = 0; i < partitionInfo.partitions.size(); ++i) {
                if (!validPartitionIds.contains(partitionInfo.partitions.get((int)i).partitionId)) continue;
                validPartitionIndex.add(i);
            }
        }
        HashMap<String, ColumnParser> parsers = Maps.newHashMap();
        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
            parsers.put(column.columnName, ColumnParser.create(column));
        }
        JavaPairRDD resultPairRDD = dataframe.toJavaRDD().flatMapToPair((PairFlatMapFunction & Serializable)row -> {
            Object columnObject;
            String columnName;
            int i;
            ArrayList<Tuple2> result = new ArrayList<Tuple2>();
            ArrayList<Object> keyAndPartitionColumns = new ArrayList<Object>();
            ArrayList<Object> keyColumns = new ArrayList<Object>();
            ArrayList<Object> valueColumns = new ArrayList<Object>(valueColumnNames.size());
            for (i = 0; i < keyAndPartitionColumnNames.size(); ++i) {
                columnName = (String)keyAndPartitionColumnNames.get(i);
                columnObject = row.get(row.fieldIndex(columnName));
                if (!this.validateData(columnObject, baseIndex.getColumn(columnName), (ColumnParser)parsers.get(columnName), (Row)row)) {
                    this.abnormalRowAcc.add(1L);
                    return result.iterator();
                }
                keyAndPartitionColumns.add(columnObject);
                if (!baseIndex.getColumn((String)columnName).isKey) continue;
                keyColumns.add(columnObject);
            }
            for (i = 0; i < valueColumnNames.size(); ++i) {
                columnName = (String)valueColumnNames.get(i);
                columnObject = row.get(row.fieldIndex(columnName));
                if (!this.validateData(columnObject, baseIndex.getColumn(columnName), (ColumnParser)parsers.get(columnName), (Row)row)) {
                    this.abnormalRowAcc.add(1L);
                    return result.iterator();
                }
                valueColumns.add(columnObject);
            }
            DppColumns key = new DppColumns(keyAndPartitionColumns);
            int pid = partitioner.getPartition(key);
            if (!validPartitionIndex.contains(pid)) {
                LOG.warn("invalid partition for row:" + row + ", pid:" + pid);
                this.abnormalRowAcc.add(1L);
                LOG.info("abnormalRowAcc:" + this.abnormalRowAcc);
                if (this.abnormalRowAcc.value() < 5L) {
                    LOG.info("add row to invalidRows:" + row.toString());
                    this.invalidRows.add(row.toString());
                    LOG.info("invalid rows contents:" + this.invalidRows.value());
                }
            } else {
                long hashValue = DppUtils.getHashValue(row, distributeColumns, dstTableSchema);
                int bucketId = (int)((hashValue & 0xFFFFFFFFFFFFFFFFL) % (long)partitionInfo.partitions.get((int)pid).bucketNum);
                long partitionId = partitionInfo.partitions.get((int)pid).partitionId;
                String bucketKey = partitionId + "_" + bucketId;
                ArrayList<Object> tuple = new ArrayList<Object>();
                tuple.add(bucketKey);
                tuple.addAll(keyColumns);
                result.add(new Tuple2(tuple, (Object)valueColumns.toArray()));
            }
            return result.iterator();
        });
        int reduceNum = 0;
        for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
            for (int i2 = 0; i2 < partition.bucketNum; ++i2) {
                this.bucketKeyMap.put(partition.partitionId + "_" + i2, reduceNum);
                ++reduceNum;
            }
        }
        System.out.println("print bucket key map:" + this.bucketKeyMap.toString());
        return resultPairRDD;
    }

    private Dataset<Row> convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex baseIndex, Dataset<Row> srcDataframe, StructType dstTableSchema, EtlJobConfig.EtlFileGroup fileGroup) throws SparkDppException {
        Dataset dataframe = srcDataframe;
        StructType srcSchema = dataframe.schema();
        HashSet<String> srcColumnNames = new HashSet<String>();
        for (StructField field : srcSchema.fields()) {
            srcColumnNames.add(field.name());
        }
        Map<String, EtlJobConfig.EtlColumnMapping> columnMappings = fileGroup.columnMappings;
        Set<String> mappingColumns = null;
        if (columnMappings != null) {
            mappingColumns = columnMappings.keySet();
        }
        ArrayList<String> dstColumnNames = new ArrayList<String>();
        for (StructField dstField : dstTableSchema.fields()) {
            dstColumnNames.add(dstField.name());
            EtlJobConfig.EtlColumn column = baseIndex.getColumn(dstField.name());
            if (!srcColumnNames.contains(dstField.name())) {
                if (mappingColumns != null && mappingColumns.contains(dstField.name())) continue;
                if (column.defaultValue != null) {
                    dataframe = column.defaultValue.equals(NULL_FLAG) ? dataframe.withColumn(dstField.name(), functions.lit(null)) : dataframe.withColumn(dstField.name(), functions.lit((Object)column.defaultValue));
                } else if (column.isAllowNull) {
                    dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
                } else {
                    throw new SparkDppException("Reason: no data for column:" + dstField.name());
                }
            }
            if (column.columnType.equalsIgnoreCase("DATE")) {
                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.DateType));
            } else if (column.columnType.equalsIgnoreCase("DATETIME")) {
                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.TimestampType));
            } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
                dataframe = dataframe.withColumn(dstField.name(), functions.when((Column)functions.lower((Column)dataframe.col(dstField.name())).equalTo((Object)"true"), (Object)"1").when(dataframe.col(dstField.name()).equalTo((Object)"1"), (Object)"1").otherwise((Object)"0"));
            } else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) && !dstField.dataType().equals(DataTypes.StringType)) {
                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(dstField.dataType()));
            } else if (column.columnType.equalsIgnoreCase(BITMAP_TYPE) && dstField.dataType().equals(DataTypes.BinaryType)) {
                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(DataTypes.BinaryType));
            }
            if (!fileGroup.isNegative || column.isKey) continue;
            dataframe = dataframe.withColumn(dstField.name(), functions.expr((String)("-1 *" + dstField.name())));
        }
        for (String mappingColumn : mappingColumns) {
            String mappingDescription = columnMappings.get(mappingColumn).toDescription();
            if (mappingDescription.toLowerCase().contains("hll_hash")) continue;
            dataframe = dataframe.withColumn(mappingColumn, functions.expr((String)mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
        }
        return dataframe;
    }

    private Dataset<Row> loadDataFromPath(SparkSession spark, EtlJobConfig.EtlFileGroup fileGroup, String fileUrl, EtlJobConfig.EtlIndex baseIndex, List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
        List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
        List<String> dataSrcColumns = fileGroup.fileFieldNames;
        if (dataSrcColumns == null) {
            dataSrcColumns = new ArrayList<String>();
            for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
                dataSrcColumns.add(column.columnName);
            }
        }
        HashMap<String, Integer> dstColumnNameToIndex = new HashMap<String, Integer>();
        for (int i = 0; i < baseIndex.columns.size(); ++i) {
            dstColumnNameToIndex.put(baseIndex.columns.get((int)i).columnName, i);
        }
        ArrayList<String> srcColumnsWithColumnsFromPath = new ArrayList<String>();
        srcColumnsWithColumnsFromPath.addAll(dataSrcColumns);
        if (fileGroup.columnsFromPath != null) {
            srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
        }
        if (fileGroup.fileFormat.equalsIgnoreCase("parquet")) {
            Dataset dataFrame = spark.read().parquet(fileUrl);
            if (!CollectionUtils.isEmpty(columnValueFromPath)) {
                for (int k = 0; k < columnValueFromPath.size(); ++k) {
                    dataFrame = dataFrame.withColumn(fileGroup.columnsFromPath.get(k), functions.lit((Object)columnValueFromPath.get(k)));
                }
            }
            return dataFrame;
        }
        if (fileGroup.fileFormat.equalsIgnoreCase("orc")) {
            Dataset dataFrame = spark.read().orc(fileUrl);
            if (!CollectionUtils.isEmpty(columnValueFromPath)) {
                for (int k = 0; k < columnValueFromPath.size(); ++k) {
                    dataFrame = dataFrame.withColumn(fileGroup.columnsFromPath.get(k), functions.lit((Object)columnValueFromPath.get(k)));
                }
            }
            return dataFrame;
        }
        StructType srcSchema = this.createScrSchema(srcColumnsWithColumnsFromPath);
        JavaRDD sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
        int columnSize = dataSrcColumns.size();
        ArrayList<ColumnParser> parsers = new ArrayList<ColumnParser>();
        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
            parsers.add(ColumnParser.create(column));
        }
        char separator = (char)fileGroup.columnSeparator.getBytes(Charset.forName("UTF-8"))[0];
        JavaRDD rowRDD = sourceDataRdd.flatMap((FlatMapFunction & Serializable)record -> {
            this.scannedRowsAcc.add(1L);
            Object[] attributes = this.splitLine((String)record, separator);
            ArrayList<Row> result = new ArrayList<Row>();
            boolean validRow = true;
            if (attributes.length != columnSize) {
                LOG.warn("invalid src schema, data columns:" + attributes.length + ", file group columns:" + columnSize + ", row:" + record);
                validRow = false;
            } else {
                for (int i = 0; i < attributes.length; ++i) {
                    ColumnParser parser;
                    boolean valid;
                    boolean isStrictMode;
                    StructField field = srcSchema.apply(i);
                    String srcColumnName = field.name();
                    if (((String)attributes[i]).equals(NULL_FLAG) && dstColumnNameToIndex.containsKey(srcColumnName)) {
                        if (baseIndex.columns.get((int)((Integer)dstColumnNameToIndex.get((Object)srcColumnName)).intValue()).isAllowNull) {
                            attributes[i] = null;
                        } else {
                            LOG.warn("column name:" + srcColumnName + ", attribute: " + i + " can not be null. row:" + record);
                            validRow = false;
                            break;
                        }
                    }
                    if (!(isStrictMode = this.etlJobConfig.properties.strictMode) || !dstColumnNameToIndex.containsKey(srcColumnName)) continue;
                    int index = (Integer)dstColumnNameToIndex.get(srcColumnName);
                    String type = ((EtlJobConfig.EtlColumn)columns.get((int)index)).columnType;
                    if (type.equalsIgnoreCase("CHAR") || type.equalsIgnoreCase("VARCHAR") || fileGroup.columnMappings.containsKey(field.name()) || (valid = (parser = (ColumnParser)parsers.get(index)).parse((String)attributes[i]))) continue;
                    validRow = false;
                    LOG.warn("invalid row:" + record + ", attribute " + i + ": " + (String)attributes[i] + " parsed failed");
                    break;
                }
            }
            if (validRow) {
                Row row = null;
                if (fileGroup.columnsFromPath == null) {
                    row = RowFactory.create((Object[])attributes);
                } else {
                    ArrayList<Object> columnAttributes = new ArrayList<Object>();
                    columnAttributes.addAll(Arrays.asList(attributes));
                    columnAttributes.addAll(columnValueFromPath);
                    row = RowFactory.create((Object[])columnAttributes.toArray());
                }
                result.add(row);
            } else {
                this.abnormalRowAcc.add(1L);
                if (this.abnormalRowAcc.value() <= 5L) {
                    this.invalidRows.add((String)record);
                }
            }
            return result.iterator();
        });
        Dataset dataframe = spark.createDataFrame(rowRDD, srcSchema);
        if (!Strings.isNullOrEmpty(fileGroup.where)) {
            dataframe = dataframe.where(fileGroup.where);
        }
        return dataframe;
    }

    private StructType createScrSchema(List<String> srcColumns) {
        ArrayList<StructField> fields = new ArrayList<StructField>();
        for (String srcColumn : srcColumns) {
            StructField field = DataTypes.createStructField((String)srcColumn, (DataType)DataTypes.StringType, (boolean)true);
            fields.add(field);
        }
        StructType srcSchema = DataTypes.createStructType(fields);
        return srcSchema;
    }

    private String[] splitLine(String line, char sep) {
        if (line == null || line.equals("")) {
            return new String[0];
        }
        int index = 0;
        int lastIndex = 0;
        ArrayList<String> values2 = new ArrayList<String>();
        int i = 0;
        while (i < line.length()) {
            if (line.charAt(index) == sep) {
                values2.add(line.substring(lastIndex, index));
                lastIndex = index + 1;
            }
            ++i;
            ++index;
        }
        values2.add(line.substring(lastIndex, index));
        return values2.toArray(new String[0]);
    }

    private Object convertPartitionKey(Object srcValue, Class dstClass) throws SparkDppException {
        if (dstClass.equals(Float.class) || dstClass.equals(Double.class)) {
            return null;
        }
        if (srcValue instanceof Double) {
            if (dstClass.equals(Short.class)) {
                return ((Double)srcValue).shortValue();
            }
            if (dstClass.equals(Integer.class)) {
                return ((Double)srcValue).intValue();
            }
            if (dstClass.equals(Long.class)) {
                return ((Double)srcValue).longValue();
            }
            if (dstClass.equals(BigInteger.class)) {
                return new BigInteger(srcValue.toString());
            }
            if (dstClass.equals(java.sql.Date.class) || dstClass.equals(Date.class)) {
                double srcValueDouble = (Double)srcValue;
                return this.convertToJavaDate((int)srcValueDouble);
            }
            if (dstClass.equals(Timestamp.class)) {
                double srcValueDouble = (Double)srcValue;
                return this.convertToJavaDatetime((long)srcValueDouble);
            }
            return srcValue.toString();
        }
        LOG.warn("unsupport partition key:" + srcValue);
        throw new SparkDppException("unsupport partition key:" + srcValue);
    }

    private Timestamp convertToJavaDatetime(long src) {
        String dateTimeStr = Long.valueOf(src).toString();
        if (dateTimeStr.length() != 14) {
            throw new RuntimeException("invalid input date format for SparkDpp");
        }
        String year = dateTimeStr.substring(0, 4);
        String month = dateTimeStr.substring(4, 6);
        String day = dateTimeStr.substring(6, 8);
        String hour = dateTimeStr.substring(8, 10);
        String min2 = dateTimeStr.substring(10, 12);
        String sec = dateTimeStr.substring(12, 14);
        return Timestamp.valueOf(String.format("%s-%s-%s %s:%s:%s", year, month, day, hour, min2, sec));
    }

    private java.sql.Date convertToJavaDate(int originDate) {
        int day = originDate & 0x1F;
        int month = (originDate >>= 5) & 0xF;
        int year = originDate >>= 4;
        return java.sql.Date.valueOf(String.format("%04d-%02d-%02d", year, month, day));
    }

    private List<DorisRangePartitioner.PartitionRangeKey> createPartitionRangeKeys(EtlJobConfig.EtlPartitionInfo partitionInfo, List<Class> partitionKeySchema) throws SparkDppException {
        ArrayList<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<DorisRangePartitioner.PartitionRangeKey>();
        for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
            DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey();
            ArrayList<Object> startKeyColumns = new ArrayList<Object>();
            for (int i = 0; i < partition.startKeys.size(); ++i) {
                Object value = partition.startKeys.get(i);
                startKeyColumns.add(this.convertPartitionKey(value, partitionKeySchema.get(i)));
            }
            partitionRangeKey.startKeys = new DppColumns(startKeyColumns);
            if (!partition.isMaxPartition) {
                partitionRangeKey.isMaxPartition = false;
                ArrayList<Object> endKeyColumns = new ArrayList<Object>();
                for (int i = 0; i < partition.endKeys.size(); ++i) {
                    Object value = partition.endKeys.get(i);
                    endKeyColumns.add(this.convertPartitionKey(value, partitionKeySchema.get(i)));
                }
                partitionRangeKey.endKeys = new DppColumns(endKeyColumns);
            } else {
                partitionRangeKey.isMaxPartition = true;
            }
            partitionRangeKeys.add(partitionRangeKey);
        }
        return partitionRangeKeys;
    }

    private Dataset<Row> loadDataFromFilePaths(SparkSession spark, EtlJobConfig.EtlIndex baseIndex, List<String> filePaths, EtlJobConfig.EtlFileGroup fileGroup, StructType dstTableSchema) throws SparkDppException, IOException, URISyntaxException {
        Dataset<Row> fileGroupDataframe = null;
        for (String filePath : filePaths) {
            try {
                URI uri = new URI(filePath);
                FileSystem fs = FileSystem.get((URI)uri, (Configuration)this.serializableHadoopConf.value());
                FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
                if (fileStatuses == null) {
                    throw new SparkDppException("fs list status failed: " + filePath);
                }
                for (FileStatus fileStatus : fileStatuses) {
                    if (fileStatus.isDirectory()) continue;
                    this.fileNumberAcc.add(1L);
                    this.fileSizeAcc.add(fileStatus.getLen());
                }
            }
            catch (Exception e) {
                LOG.warn("parse path failed:" + filePath);
                throw e;
            }
            if (fileGroup.columnSeparator == null) {
                LOG.warn("invalid null column separator!");
                throw new SparkDppException("Reason: invalid null column separator!");
            }
            Dataset<Row> dataframe = null;
            dataframe = this.loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns);
            dataframe = this.convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
            if (fileGroupDataframe == null) {
                fileGroupDataframe = dataframe;
                continue;
            }
            fileGroupDataframe.union(dataframe);
        }
        return fileGroupDataframe;
    }

    private Dataset<Row> loadDataFromHiveTable(SparkSession spark, String hiveDbTableName, EtlJobConfig.EtlIndex baseIndex, EtlJobConfig.EtlFileGroup fileGroup, StructType dstTableSchema, Set<String> dictBitmapColumnSet, Set<String> binaryBitmapColumnsSet) throws SparkDppException {
        StringBuilder sql = new StringBuilder();
        sql.append("select ");
        baseIndex.columns.forEach(column -> sql.append(column.columnName).append(","));
        sql.deleteCharAt(sql.length() - 1).append(" from ").append(hiveDbTableName);
        if (!Strings.isNullOrEmpty(fileGroup.where)) {
            sql.append(" where ").append(fileGroup.where);
        }
        Dataset<Row> dataframe = spark.sql(sql.toString());
        dataframe = this.checkDataFromHiveWithStrictMode(dataframe, baseIndex, fileGroup.columnMappings.keySet(), this.etlJobConfig.properties.strictMode, dstTableSchema, dictBitmapColumnSet, binaryBitmapColumnsSet);
        dataframe = this.convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
        return dataframe;
    }

    private Dataset<Row> checkDataFromHiveWithStrictMode(Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex, Set<String> mappingColKeys, final boolean isStrictMode, StructType dstTableSchema, Set<String> dictBitmapColumnSet, Set<String> binaryBitmapColumnsSet) throws SparkDppException {
        ArrayList<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new ArrayList<EtlJobConfig.EtlColumn>();
        ArrayList<ColumnParser> columnParserArrayList = new ArrayList<ColumnParser>();
        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
            if (StringUtils.equalsIgnoreCase((CharSequence)column.columnType, (CharSequence)BITMAP_TYPE)) {
                if (dictBitmapColumnSet.contains(column.columnName.toLowerCase()) || binaryBitmapColumnsSet.contains(column.columnName.toLowerCase())) continue;
                columnNameNeedCheckArrayList.add(column);
                columnParserArrayList.add(new BigIntParser());
                continue;
            }
            if (StringUtils.equalsIgnoreCase((CharSequence)column.columnType, (CharSequence)"varchar") || StringUtils.equalsIgnoreCase((CharSequence)column.columnType, (CharSequence)"char") || mappingColKeys.contains(column.columnName)) continue;
            columnNameNeedCheckArrayList.add(column);
            columnParserArrayList.add(ColumnParser.create(column));
        }
        final ColumnParser[] columnParserArray = columnParserArrayList.toArray(new ColumnParser[0]);
        final EtlJobConfig.EtlColumn[] columnNameArray = columnNameNeedCheckArrayList.toArray(new EtlJobConfig.EtlColumn[0]);
        StructType srcSchema = dataframe.schema();
        JavaRDD result = dataframe.toJavaRDD().flatMap((FlatMapFunction)new FlatMapFunction<Row, Row>(){

            public Iterator<Row> call(Row row) throws Exception {
                ArrayList<Row> result = new ArrayList<Row>();
                HashSet<Integer> columnIndexNeedToRepalceNull = new HashSet<Integer>();
                boolean validRow = true;
                for (int i = 0; i < columnNameArray.length; ++i) {
                    EtlJobConfig.EtlColumn column = columnNameArray[i];
                    int fieldIndex = row.fieldIndex(column.columnName);
                    Object value = row.get(fieldIndex);
                    if (value == null && !column.isAllowNull) {
                        validRow = false;
                        LOG.warn("column:" + i + " can not be null. row:" + row.toString());
                        break;
                    }
                    if (value == null || columnParserArray[i].parse(value.toString())) continue;
                    if (isStrictMode) {
                        validRow = false;
                        LOG.warn(String.format("row parsed failed in strict mode, column name %s, src row %s", column.columnName, row.toString()));
                        continue;
                    }
                    if (!column.isAllowNull) {
                        validRow = false;
                        LOG.warn("column:" + i + " can not be null. row:" + row.toString());
                        break;
                    }
                    columnIndexNeedToRepalceNull.add(fieldIndex);
                }
                if (!validRow) {
                    SparkDpp.this.abnormalRowAcc.add(1L);
                    if (SparkDpp.this.abnormalRowAcc.value() <= 5L) {
                        SparkDpp.this.invalidRows.add(row.toString());
                    }
                } else if (columnIndexNeedToRepalceNull.size() != 0) {
                    Object[] newRow = new Object[row.size()];
                    for (int i = 0; i < row.size(); ++i) {
                        newRow[i] = columnIndexNeedToRepalceNull.contains(i) ? null : row.get(i);
                    }
                    result.add(RowFactory.create((Object[])newRow));
                } else {
                    result.add(row);
                }
                return result.iterator();
            }
        });
        return this.spark.createDataFrame(result, srcSchema);
    }

    private void process() throws Exception {
        try {
            for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : this.etlJobConfig.tables.entrySet()) {
                Long tableId = entry.getKey();
                EtlJobConfig.EtlTable etlTable = entry.getValue();
                Set dictBitmapColumnSet = this.tableToBitmapDictColumns.getOrDefault(tableId, new HashSet());
                Set binaryBitmapColumnSet = this.tableToBinaryBitmapColumns.getOrDefault(tableId, new HashSet());
                EtlJobConfig.EtlIndex baseIndex = null;
                for (EtlJobConfig.EtlIndex indexMeta : etlTable.indexes) {
                    if (!indexMeta.isBaseIndex) continue;
                    baseIndex = indexMeta;
                    break;
                }
                ArrayList<String> keyAndPartitionColumnNames = new ArrayList<String>();
                ArrayList<String> valueColumnNames = new ArrayList<String>();
                for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
                    if (etlColumn.isKey) {
                        keyAndPartitionColumnNames.add(etlColumn.columnName);
                        continue;
                    }
                    if (etlTable.partitionInfo.partitionColumnRefs.contains(etlColumn.columnName)) {
                        keyAndPartitionColumnNames.add(etlColumn.columnName);
                    }
                    valueColumnNames.add(etlColumn.columnName);
                }
                EtlJobConfig.EtlPartitionInfo partitionInfo = etlTable.partitionInfo;
                ArrayList<Integer> partitionKeyIndex = new ArrayList<Integer>();
                ArrayList<Class> partitionKeySchema = new ArrayList<Class>();
                block8: for (String key : partitionInfo.partitionColumnRefs) {
                    for (int i = 0; i < baseIndex.columns.size(); ++i) {
                        EtlJobConfig.EtlColumn column = baseIndex.columns.get(i);
                        if (!column.columnName.equals(key)) continue;
                        partitionKeyIndex.add(keyAndPartitionColumnNames.indexOf(key));
                        partitionKeySchema.add(DppUtils.getClassFromColumn(column));
                        continue block8;
                    }
                }
                List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = this.createPartitionRangeKeys(partitionInfo, partitionKeySchema);
                StructType dstTableSchema = DppUtils.createDstTableSchema(baseIndex.columns, false, false);
                dstTableSchema = DppUtils.replaceBinaryColsInSchema(binaryBitmapColumnSet, dstTableSchema);
                MinimumCoverageRollupTreeBuilder rollupTreeParser = new MinimumCoverageRollupTreeBuilder();
                RollupTreeNode rootNode = rollupTreeParser.build(etlTable);
                LOG.info("Start to process rollup tree:" + rootNode);
                JavaPairRDD<List<Object>, Object[]> tablePairRDD = null;
                for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) {
                    List<String> filePaths = fileGroup.filePaths;
                    Dataset<Row> fileGroupDataframe = null;
                    EtlJobConfig.SourceType sourceType = fileGroup.sourceType;
                    if (sourceType == EtlJobConfig.SourceType.FILE) {
                        fileGroupDataframe = this.loadDataFromFilePaths(this.spark, baseIndex, filePaths, fileGroup, dstTableSchema);
                    } else if (sourceType == EtlJobConfig.SourceType.HIVE) {
                        fileGroupDataframe = this.loadDataFromHiveTable(this.spark, fileGroup.dppHiveDbTableName, baseIndex, fileGroup, dstTableSchema, dictBitmapColumnSet, binaryBitmapColumnSet);
                    } else {
                        throw new RuntimeException("Unknown source type: " + sourceType.name());
                    }
                    if (fileGroupDataframe == null) {
                        LOG.info("no data for file file group:" + fileGroup);
                        continue;
                    }
                    JavaPairRDD<List<Object>, Object[]> ret = this.fillTupleWithPartitionColumn(fileGroupDataframe, partitionInfo, partitionKeyIndex, partitionRangeKeys, keyAndPartitionColumnNames, valueColumnNames, dstTableSchema, baseIndex, fileGroup.partitions);
                    if (tablePairRDD == null) {
                        tablePairRDD = ret;
                        continue;
                    }
                    tablePairRDD.union(ret);
                }
                this.processRollupTree(rootNode, tablePairRDD, tableId, baseIndex);
            }
            LOG.info("invalid rows contents:" + this.invalidRows.value());
            this.dppResult.isSuccess = true;
            this.dppResult.failedReason = "";
        }
        catch (Exception exception) {
            LOG.warn("spark dpp failed for exception:" + exception);
            this.dppResult.isSuccess = false;
            this.dppResult.failedReason = exception.getMessage();
            throw exception;
        }
        finally {
            this.spark.stop();
            this.dppResult.normalRows = this.scannedRowsAcc.value() - this.abnormalRowAcc.value();
            this.dppResult.scannedRows = this.scannedRowsAcc.value();
            this.dppResult.fileNumber = this.fileNumberAcc.value();
            this.dppResult.fileSize = this.fileSizeAcc.value();
            this.dppResult.abnormalRows = this.abnormalRowAcc.value();
            this.dppResult.partialAbnormalRows = this.invalidRows.value();
        }
    }

    private void writeDppResult(DppResult dppResult) throws Exception {
        String outputPath = this.etlJobConfig.getOutputPath();
        String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
        URI uri = new URI(outputPath);
        FileSystem fs = FileSystem.get((URI)uri, (Configuration)this.serializableHadoopConf.value());
        Path filePath = new Path(resultFilePath);
        FSDataOutputStream outputStream2 = fs.create(filePath);
        Gson gson = new Gson();
        outputStream2.write(gson.toJson(dppResult).getBytes());
        outputStream2.write(10);
        outputStream2.close();
    }

    public void doDpp() throws Exception {
        try {
            this.process();
        }
        catch (Exception e) {
            throw e;
        }
        finally {
            this.writeDppResult(this.dppResult);
        }
    }
}

