/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.KeyFunctions;
import org.apache.flink.api.java.operators.SingleInputOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

@Public
public class DistinctOperator<T>
extends SingleInputOperator<T, T, DistinctOperator<T>> {
    private final Keys<T> keys;
    private final String distinctLocationName;

    public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
        super(input, input.getType());
        this.distinctLocationName = distinctLocationName;
        if (keys == null) {
            keys = new Keys.ExpressionKeys(input.getType());
        }
        this.keys = keys;
    }

    protected GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
        String name;
        DistinctFunction function = new DistinctFunction();
        String string = name = this.getName() != null ? this.getName() : "Distinct at " + this.distinctLocationName;
        if (this.keys instanceof Keys.ExpressionKeys) {
            int[] logicalKeyPositions = this.keys.computeLogicalKeyPositions();
            UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(this.getInputType(), this.getResultType());
            GroupReduceOperatorBase po = new GroupReduceOperatorBase(function, operatorInfo, logicalKeyPositions, name);
            po.setCombinable(true);
            po.setInput(input);
            po.setParallelism(this.getParallelism());
            if (this.getType().isTupleType()) {
                SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
                for (int field : this.keys.computeLogicalKeyPositions()) {
                    sProps.addForwardedField(field, field);
                }
                po.setSemanticProperties(sProps);
            }
            return po;
        }
        if (this.keys instanceof Keys.SelectorFunctionKeys) {
            Keys.SelectorFunctionKeys selectorKeys = (Keys.SelectorFunctionKeys)this.keys;
            PlanUnwrappingReduceGroupOperator po = DistinctOperator.translateSelectorFunctionDistinct(selectorKeys, function, this.getResultType(), name, input);
            po.setParallelism(this.getParallelism());
            return po;
        }
        throw new UnsupportedOperationException("Unrecognized key type.");
    }

    private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function, TypeInformation<OUT> outputType, String name, Operator<IN> input) {
        Keys.SelectorFunctionKeys<IN, ?> keys = rawKeys;
        TypeInformation<Tuple2<?, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
        Operator<Tuple2<?, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
        PlanUnwrappingReduceGroupOperator reducer = new PlanUnwrappingReduceGroupOperator(function, keys, name, outputType, typeInfoWithKey, true);
        reducer.setInput(keyedInput);
        return reducer;
    }

    @Internal
    public static final class DistinctFunction<T>
    implements GroupReduceFunction<T, T>,
    GroupCombineFunction<T, T> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<T> values, Collector<T> out) {
            out.collect(values.iterator().next());
        }

        public void combine(Iterable<T> values, Collector<T> out) {
            out.collect(values.iterator().next());
        }
    }
}

