/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.Grouping;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UdfOperatorUtils;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;

public class ReduceOperator<IN>
extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
    private final ReduceFunction<IN> function;
    private final Grouping<IN> grouper;
    private final String defaultName;

    public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function, String defaultName) {
        super(input, input.getType());
        this.function = function;
        this.grouper = null;
        this.defaultName = defaultName;
    }

    public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) {
        super(input.getDataSet(), input.getDataSet().getType());
        this.function = function;
        this.grouper = input;
        this.defaultName = defaultName;
        UdfOperatorUtils.analyzeSingleInputUdf(this, ReduceFunction.class, defaultName, function, this.grouper.keys);
    }

    protected ReduceFunction<IN> getFunction() {
        return this.function;
    }

    @Override
    public SingleInputSemanticProperties getSemanticProperties() {
        SingleInputSemanticProperties props = super.getSemanticProperties();
        if (props != null && this.grouper != null && this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
            int offset = ((Keys.SelectorFunctionKeys)this.grouper.keys).getKeyType().getTotalFields();
            if (this.grouper instanceof SortedGrouping) {
                offset += ((SortedGrouping)this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
            }
            props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
        }
        return props;
    }

    protected SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
        String name;
        String string = name = this.getName() != null ? this.getName() : "Reduce at " + this.defaultName;
        if (this.grouper == null) {
            UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(this.getInputType(), this.getInputType());
            ReduceOperatorBase po = new ReduceOperatorBase(this.function, operatorInfo, new int[0], name);
            po.setInput(input);
            po.setParallelism(1);
            return po;
        }
        if (this.grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
            Keys.SelectorFunctionKeys selectorKeys = (Keys.SelectorFunctionKeys)this.grouper.getKeys();
            MapOperatorBase po = ReduceOperator.translateSelectorFunctionReducer(selectorKeys, this.function, this.getInputType(), name, input, this.getParallelism());
            ((PlanUnwrappingReduceOperator)po.getInput()).setCustomPartitioner(this.grouper.getCustomPartitioner());
            return po;
        }
        if (this.grouper.getKeys() instanceof Keys.ExpressionKeys) {
            int[] logicalKeyPositions = this.grouper.getKeys().computeLogicalKeyPositions();
            UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(this.getInputType(), this.getInputType());
            ReduceOperatorBase po = new ReduceOperatorBase(this.function, operatorInfo, logicalKeyPositions, name);
            po.setCustomPartitioner(this.grouper.getCustomPartitioner());
            po.setInput(input);
            po.setParallelism(this.getParallelism());
            return po;
        }
        throw new UnsupportedOperationException("Unrecognized key type.");
    }

    private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys, ReduceFunction<T> function, TypeInformation<T> inputType, String name, Operator<T> input, int parallelism) {
        Keys.SelectorFunctionKeys<T, ?> keys = rawKeys;
        TupleTypeInfo typeInfoWithKey = new TupleTypeInfo(keys.getKeyType(), inputType);
        KeyExtractingMapper extractor = new KeyExtractingMapper(keys.getKeyExtractor());
        PlanUnwrappingReduceOperator reducer = new PlanUnwrappingReduceOperator(function, keys, name, inputType, typeInfoWithKey);
        MapOperatorBase keyExtractingMap = new MapOperatorBase(extractor, new UnaryOperatorInformation(inputType, typeInfoWithKey), "Key Extractor");
        MapOperatorBase keyRemovingMap = new MapOperatorBase(new KeyRemovingMapper(), new UnaryOperatorInformation(typeInfoWithKey, inputType), "Key Extractor");
        keyExtractingMap.setInput(input);
        reducer.setInput((Operator)keyExtractingMap);
        keyRemovingMap.setInput(reducer);
        keyExtractingMap.setParallelism(input.getParallelism());
        reducer.setParallelism(parallelism);
        keyRemovingMap.setParallelism(parallelism);
        return keyRemovingMap;
    }
}

