/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.io.Serializable;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.translation.SamzaPublishView;
import org.apache.beam.runners.samza.translation.TransformTranslator;
import org.apache.beam.runners.samza.translation.TranslationContext;
import org.apache.beam.runners.samza.util.SamzaCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

class SamzaPublishViewTranslator<@UnknownKeyFor ElemT, @UnknownKeyFor ViewT>
implements TransformTranslator<SamzaPublishView<ElemT, ViewT>> {
    SamzaPublishViewTranslator() {
    }

    @Override
    public void translate(@UnknownKeyFor @NonNull @Initialized SamzaPublishView<ElemT, ViewT> transform, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized TransformHierarchy. @UnknownKeyFor @NonNull @Initialized Node node, @UnknownKeyFor @NonNull @Initialized TranslationContext ctx) {
        SamzaPublishViewTranslator.doTranslate(transform, node, ctx);
    }

    private static <ElemT, ViewT> void doTranslate(@UnknownKeyFor @NonNull @Initialized SamzaPublishView<ElemT, ViewT> transform, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized TransformHierarchy. @UnknownKeyFor @NonNull @Initialized Node node, @UnknownKeyFor @NonNull @Initialized TranslationContext ctx) {
        PCollection input = (PCollection)ctx.getInput(transform);
        MessageStream inputStream = ctx.getMessageStream((PValue)input);
        Coder elementCoder = SamzaCoders.of(input);
        MessageStream elementStream = inputStream.filter((FilterFunction & Serializable)msg -> msg.getType() == OpMessage.Type.ELEMENT).map(OpMessage::getElement);
        MessageStream broadcastStream = ctx.getPipelineOptions().getMaxSourceParallelism() == 1 ? elementStream : elementStream.broadcast(SamzaCoders.toSerde(elementCoder), "view-" + ctx.getTransformId());
        String viewId = ctx.getViewId(transform.getView());
        MessageStream outputStream = broadcastStream.map((MapFunction & Serializable)element -> OpMessage.ofSideInput(viewId, element));
        ctx.registerViewStream(transform.getView(), outputStream);
    }
}

