/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.Serializable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.samza.runtime.DoFnOp;
import org.apache.beam.runners.samza.runtime.GroupByKeyOp;
import org.apache.beam.runners.samza.runtime.KvToKeyedWorkItemOp;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.transforms.GroupWithoutRepartition;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.runners.samza.util.SamzaPipelineTranslatorUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.serializers.KVSerde;

class GroupByKeyTranslator<K, InputT, OutputT>
implements TransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
    GroupByKeyTranslator() {
    }

    @Override
    public void translate(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        GroupByKeyTranslator.doTranslate(transform, node, ctx);
    }

    private static <K, InputT, OutputT> void doTranslate(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        PCollection<KV<K, InputT>> input = ctx.getInput(transform);
        PCollection<KV<K, OutputT>> output = ctx.getOutput(transform);
        TupleTag<KV<K, OutputT>> outputTag = ctx.getOutputTag(transform);
        WindowingStrategy windowingStrategy = input.getWindowingStrategy();
        MessageStream inputStream = ctx.getMessageStream((PValue)input);
        KvCoder kvInputCoder = (KvCoder)input.getCoder();
        Coder<WindowedValue<KV<K, InputT>>> elementCoder = SamzaCoders.of(input);
        SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn = GroupByKeyTranslator.getSystemReduceFn(transform, input.getPipeline(), kvInputCoder);
        MessageStream<OpMessage<KV<K, OutputT>>> outputStream = GroupByKeyTranslator.doTranslateGBK(inputStream, GroupByKeyTranslator.needRepartition(node, ctx), reduceFn, windowingStrategy, kvInputCoder, elementCoder, ctx.getCurrentTopologicalId(), node.getFullName(), outputTag, input.isBounded());
        ctx.registerMessageStream((PValue)output, outputStream);
    }

    @Override
    public void translatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        GroupByKeyTranslator.doTranslatePortable(transform, pipeline, ctx);
    }

    private static <K, InputT, OutputT> void doTranslatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        MessageStream inputStream = ctx.getOneInputMessageStream(transform);
        boolean needRepartition = ctx.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
        WindowingStrategy<?, BoundedWindow> windowingStrategy = ctx.getPortableWindowStrategy(transform, pipeline);
        Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
        String inputId = ctx.getInputId(transform);
        WindowedValue.WindowedValueCoder windowedInputCoder = ctx.instantiateCoder(inputId, pipeline.getComponents());
        KvCoder kvInputCoder = (KvCoder)windowedInputCoder.getValueCoder();
        WindowedValue.FullWindowedValueCoder elementCoder = WindowedValue.FullWindowedValueCoder.of((Coder)kvInputCoder, (Coder)windowCoder);
        int topologyId = ctx.getCurrentTopologicalId();
        String nodeFullname = transform.getTransform().getUniqueName();
        TupleTag outputTag = new TupleTag((String)Iterables.getOnlyElement(transform.getTransform().getOutputsMap().keySet()));
        SystemReduceFn reduceFn = SystemReduceFn.buffering((Coder)kvInputCoder.getValueCoder());
        RunnerApi.PCollection input = pipeline.getComponents().getPcollectionsOrThrow(inputId);
        PCollection.IsBounded isBounded = SamzaPipelineTranslatorUtils.isBounded(input);
        MessageStream<OpMessage<KV<K, OutputT>>> outputStream = GroupByKeyTranslator.doTranslateGBK(inputStream, needRepartition, reduceFn, windowingStrategy, kvInputCoder, elementCoder, topologyId, nodeFullname, outputTag, isBounded);
        ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
    }

    private static <K, InputT, OutputT> MessageStream<OpMessage<KV<K, OutputT>>> doTranslateGBK(MessageStream<OpMessage<KV<K, InputT>>> inputStream, boolean needRepartition, SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn, WindowingStrategy<?, BoundedWindow> windowingStrategy, KvCoder<K, InputT> kvInputCoder, Coder<WindowedValue<KV<K, InputT>>> elementCoder, int topologyId, String nodeFullname, TupleTag<KV<K, OutputT>> outputTag, PCollection.IsBounded isBounded) {
        MessageStream filteredInputStream = inputStream.filter((FilterFunction & Serializable)msg -> msg.getType() == OpMessage.Type.ELEMENT);
        MessageStream partitionedInputStream = !needRepartition ? filteredInputStream : filteredInputStream.partitionBy((MapFunction & Serializable)msg -> ((KV)msg.getElement().getValue()).getKey(), (MapFunction & Serializable)msg -> msg.getElement(), KVSerde.of(SamzaCoders.toSerde(kvInputCoder.getKeyCoder()), SamzaCoders.toSerde(elementCoder)), "gbk-" + topologyId).map((MapFunction & Serializable)kv -> OpMessage.ofElement((WindowedValue)kv.getValue()));
        KeyedWorkItemCoder keyedWorkItemCoder = KeyedWorkItemCoder.of((Coder)kvInputCoder.getKeyCoder(), (Coder)kvInputCoder.getValueCoder(), (Coder)windowingStrategy.getWindowFn().windowCoder());
        MessageStream outputStream = partitionedInputStream.flatMap(OpAdapter.adapt(new KvToKeyedWorkItemOp())).flatMap(OpAdapter.adapt(new GroupByKeyOp<K, InputT, OutputT>(outputTag, keyedWorkItemCoder, reduceFn, windowingStrategy, new DoFnOp.SingleOutputManagerFactory(), nodeFullname, outputTag.getId(), isBounded)));
        return outputStream;
    }

    private static <K, InputT, OutputT> SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, Pipeline pipeline, KvCoder<K, InputT> kvInputCoder) {
        if (transform instanceof GroupByKey) {
            return SystemReduceFn.buffering((Coder)kvInputCoder.getValueCoder());
        }
        if (transform instanceof Combine.PerKey) {
            CombineFnBase.GlobalCombineFn combineFn = ((Combine.PerKey)transform).getFn();
            return SystemReduceFn.combining((Coder)kvInputCoder.getKeyCoder(), (AppliedCombineFn)AppliedCombineFn.withInputCoder((CombineFnBase.GlobalCombineFn)combineFn, (CoderRegistry)pipeline.getCoderRegistry(), kvInputCoder));
        }
        throw new RuntimeException("Transform " + transform + " cannot be translated as GroupByKey.");
    }

    private static boolean needRepartition(TransformHierarchy.Node node, TranslationContext ctx) {
        if (ctx.getPipelineOptions().getMaxSourceParallelism() == 1) {
            return false;
        }
        if (node == null) {
            return true;
        }
        if (node.getTransform() instanceof GroupWithoutRepartition) {
            return false;
        }
        return GroupByKeyTranslator.needRepartition(node.getEnclosingNode(), ctx);
    }
}

