/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.translation;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.runners.samza.util.HashIdGenerator;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.system.descriptors.GenericInputDescriptor;
import org.apache.samza.system.descriptors.GenericSystemDescriptor;
import org.apache.samza.system.descriptors.InputDescriptor;
import org.apache.samza.system.descriptors.OutputDescriptor;
import org.apache.samza.system.inmemory.InMemorySystemFactory;
import org.apache.samza.table.Table;
import org.apache.samza.table.descriptors.TableDescriptor;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TranslationContext {
    private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
    private final StreamApplicationDescriptor appDescriptor;
    private final Map<PValue, MessageStream<?>> messsageStreams = new HashMap();
    private final Map<PCollectionView<?>, MessageStream<?>> viewStreams = new HashMap();
    private final Map<PValue, String> idMap;
    private final Map<String, MessageStream> registeredInputStreams = new HashMap<String, MessageStream>();
    private final Map<String, Table> registeredTables = new HashMap<String, Table>();
    private final SamzaPipelineOptions options;
    private final HashIdGenerator idGenerator = new HashIdGenerator();
    private AppliedPTransform<?, ?, ?> currentTransform;

    public TranslationContext(StreamApplicationDescriptor appDescriptor, Map<PValue, String> idMap, SamzaPipelineOptions options) {
        this.appDescriptor = appDescriptor;
        this.idMap = idMap;
        this.options = options;
    }

    public <OutT> void registerInputMessageStream(PValue pvalue, InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
        this.registerInputMessageStreams(pvalue, Collections.singletonList(inputDescriptor));
    }

    public <OutT> void registerInputMessageStreams(PValue pvalue, List<? extends InputDescriptor<KV<?, OpMessage<OutT>>, ?>> inputDescriptors) {
        this.registerInputMessageStreams(pvalue, inputDescriptors, this::registerMessageStream);
    }

    protected <KeyT, OutT> void registerInputMessageStreams(KeyT key, List<? extends InputDescriptor<KV<?, OpMessage<OutT>>, ?>> inputDescriptors, BiConsumer<KeyT, MessageStream<OpMessage<OutT>>> registerFunction) {
        HashSet<Object> streamsToMerge = new HashSet<Object>();
        for (InputDescriptor<KV<?, OpMessage<OutT>>, ?> inputDescriptor : inputDescriptors) {
            String streamId = inputDescriptor.getStreamId();
            if (this.registeredInputStreams.containsKey(streamId)) {
                MessageStream messageStream = this.registeredInputStreams.get(streamId);
                LOG.info(String.format("Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.", streamId, messageStream, key));
                streamsToMerge.add(messageStream);
                continue;
            }
            MessageStream typedStream = TranslationContext.getValueStream(this.appDescriptor.getInputStream(inputDescriptor));
            this.registeredInputStreams.put(streamId, typedStream);
            streamsToMerge.add(typedStream);
        }
        registerFunction.accept(key, MessageStream.mergeAll(streamsToMerge));
    }

    public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
        if (this.messsageStreams.containsKey(pvalue)) {
            throw new IllegalArgumentException("Stream already registered for pvalue: " + pvalue);
        }
        this.messsageStreams.put(pvalue, stream);
    }

    public MessageStream<OpMessage<String>> getDummyStream() {
        InputDescriptor<OpMessage<String>, ?> dummyInput = TranslationContext.createDummyStreamDescriptor(UUID.randomUUID().toString());
        return this.appDescriptor.getInputStream(dummyInput);
    }

    public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
        MessageStream<?> stream = this.messsageStreams.get(pvalue);
        if (stream == null) {
            throw new IllegalArgumentException("No stream registered for pvalue: " + pvalue);
        }
        return stream;
    }

    public <ElemT, ViewT> void registerViewStream(PCollectionView<ViewT> view, MessageStream<OpMessage<Iterable<ElemT>>> stream) {
        if (this.viewStreams.containsKey(view)) {
            throw new IllegalArgumentException("Stream already registered for view: " + view);
        }
        this.viewStreams.put(view, stream);
    }

    public <InT> MessageStream<OpMessage<InT>> getViewStream(PCollectionView<?> view) {
        MessageStream<?> stream = this.viewStreams.get(view);
        if (stream == null) {
            throw new IllegalArgumentException("No stream registered for view: " + view);
        }
        return stream;
    }

    public <ViewT> String getViewId(PCollectionView<ViewT> view) {
        return this.getIdForPValue((PValue)view);
    }

    public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
        this.currentTransform = currentTransform;
    }

    public void clearCurrentTransform() {
        this.currentTransform = null;
    }

    public AppliedPTransform<?, ?, ?> getCurrentTransform() {
        return this.currentTransform;
    }

    public <InT extends PValue> InT getInput(PTransform<InT, ?> transform) {
        return (InT)((PValue)Iterables.getOnlyElement((Iterable)TransformInputs.nonAdditionalInputs(this.currentTransform)));
    }

    public <OutT extends PValue> OutT getOutput(PTransform<?, OutT> transform) {
        return (OutT)((PValue)Iterables.getOnlyElement(this.currentTransform.getOutputs().values()));
    }

    public <OutT> TupleTag<OutT> getOutputTag(PTransform<?, ? extends PCollection<OutT>> transform) {
        return (TupleTag)Iterables.getOnlyElement(this.currentTransform.getOutputs().keySet());
    }

    public SamzaPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public <OutT> OutputStream<OutT> getOutputStream(OutputDescriptor<OutT, ?> outputDescriptor) {
        return this.appDescriptor.getOutputStream(outputDescriptor);
    }

    public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
        return this.registeredTables.computeIfAbsent(tableDesc.getTableId(), id -> this.appDescriptor.getTable(tableDesc));
    }

    private static <T> MessageStream<T> getValueStream(MessageStream<KV<?, T>> input) {
        return input.map(KV::getValue);
    }

    public String getIdForPValue(PValue pvalue) {
        String id = this.idMap.get(pvalue);
        if (id == null) {
            throw new IllegalArgumentException("No id mapping for value: " + pvalue);
        }
        return id;
    }

    public String getTransformFullName() {
        return this.currentTransform.getFullName();
    }

    public String getTransformId() {
        return this.idGenerator.getId(this.getTransformFullName());
    }

    private static InputDescriptor<OpMessage<String>, ?> createDummyStreamDescriptor(String id) {
        GenericSystemDescriptor dummySystem = new GenericSystemDescriptor(id, InMemorySystemFactory.class.getName());
        GenericInputDescriptor dummyInput = dummySystem.getInputDescriptor(id, (Serde)new NoOpSerde());
        dummyInput.withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
        MapConfig config = new MapConfig(new Map[]{dummyInput.toConfig(), dummySystem.toConfig()});
        InMemorySystemFactory factory = new InMemorySystemFactory();
        StreamSpec dummyStreamSpec = new StreamSpec(id, id, id, 1);
        factory.getAdmin(id, (Config)config).createStream(dummyStreamSpec);
        SystemProducer producer = factory.getProducer(id, (Config)config, null);
        SystemStream sysStream = new SystemStream(id, id);
        Consumer<Object> sendFn = msg -> producer.send(id, new OutgoingMessageEnvelope(sysStream, (Object)0, null, msg));
        WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)"dummy", (Instant)new Instant());
        sendFn.accept(OpMessage.ofElement(windowedValue));
        sendFn.accept(new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
        sendFn.accept(new EndOfStreamMessage(null));
        return dummyInput;
    }
}

