/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.samza.runtime.OpAdapter;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.translation.PortableTranslationContext;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.MapFunction;

class FlattenPCollectionsTranslator<T>
implements TransformTranslator<Flatten.PCollections<T>> {
    FlattenPCollectionsTranslator() {
    }

    @Override
    public void translate(Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        FlattenPCollectionsTranslator.doTranslate(transform, node, ctx);
    }

    private static <T> void doTranslate(Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
        PCollection output = (PCollection)ctx.getOutput(transform);
        ArrayList<MessageStream<OpMessage<T>>> inputStreams = new ArrayList<MessageStream<OpMessage<T>>>();
        for (Map.Entry taggedPValue : node.getInputs().entrySet()) {
            if (!(taggedPValue.getValue() instanceof PCollection)) {
                throw new IllegalArgumentException(String.format("Got non-PCollection input for flatten. Tag: %s. Input: %s. Type: %s", taggedPValue.getKey(), taggedPValue.getValue(), ((PValue)taggedPValue.getValue()).getClass()));
            }
            PCollection input = (PCollection)taggedPValue.getValue();
            inputStreams.add(ctx.getMessageStream((PValue)input));
        }
        if (inputStreams.isEmpty()) {
            MessageStream noOpStream = ctx.getDummyStream().flatMap(OpAdapter.adapt((inputElement, emitter) -> {}));
            ctx.registerMessageStream((PValue)output, noOpStream);
            return;
        }
        ctx.registerMessageStream((PValue)output, FlattenPCollectionsTranslator.mergeInputStreams(inputStreams));
    }

    @Override
    public void translatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        FlattenPCollectionsTranslator.doTranslatePortable(transform, pipeline, ctx);
    }

    private static <T> void doTranslatePortable(PipelineNode.PTransformNode transform, QueryablePipeline pipeline, PortableTranslationContext ctx) {
        List inputStreams = ctx.getAllInputMessageStreams(transform);
        String outputId = ctx.getOutputId(transform);
        Preconditions.checkState((!inputStreams.isEmpty() ? 1 : 0) != 0, (String)"no input streams defined for Flatten: %s", (Object)transform.getId());
        ctx.registerMessageStream(outputId, FlattenPCollectionsTranslator.mergeInputStreams(inputStreams));
    }

    private static <T> MessageStream<OpMessage<T>> mergeInputStreams(List<MessageStream<OpMessage<T>>> inputStreams) {
        if (inputStreams.size() == 1) {
            return (MessageStream)Iterables.getOnlyElement(inputStreams);
        }
        HashSet streamsToMerge = new HashSet();
        inputStreams.forEach(stream -> {
            if (!streamsToMerge.add(stream)) {
                streamsToMerge.add(stream.map((MapFunction & Serializable)m -> m));
            }
        });
        return MessageStream.mergeAll(streamsToMerge);
    }
}

