/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterOperator;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;

@Internal
public class GlobalCommitterTransformationTranslator<CommT>
implements TransformationTranslator<Void, GlobalCommitterTransform<CommT>> {
    @Override
    public Collection<Integer> translateForBatch(GlobalCommitterTransform<CommT> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, true);
    }

    @Override
    public Collection<Integer> translateForStreaming(GlobalCommitterTransform<CommT> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, false);
    }

    private Collection<Integer> translateInternal(GlobalCommitterTransform<CommT> globalCommitterTransform, boolean batch) {
        DataStream<CommittableMessage<CommT>> inputStream = globalCommitterTransform.getInputStream();
        boolean checkpointingEnabled = inputStream.getExecutionEnvironment().getCheckpointConfig().isCheckpointingEnabled();
        boolean commitOnInput = batch || !checkpointingEnabled || GlobalCommitterTransformationTranslator.hasUpstreamCommitter(inputStream);
        DataStream<CommittableMessage<CommT>> global = inputStream.global();
        PhysicalTransformation transformation = (PhysicalTransformation)global.transform("Global Committer", Types.VOID, new GlobalCommitterOperator(globalCommitterTransform.getCommitterFactory(), globalCommitterTransform.getCommittableSerializer(), commitOnInput)).getTransformation();
        transformation.setChainingStrategy(ChainingStrategy.ALWAYS);
        transformation.setParallelism(1);
        transformation.setMaxParallelism(1);
        GlobalCommitterTransformationTranslator.copySafely(transformation::setName, globalCommitterTransform::getName);
        GlobalCommitterTransformationTranslator.copySafely(transformation::setUid, globalCommitterTransform::getUid);
        GlobalCommitterTransformationTranslator.copySafely(transformation::setUidHash, globalCommitterTransform::getUserProvidedNodeHash);
        return Arrays.asList(global.getId(), transformation.getId());
    }

    private static <T> void copySafely(Consumer<T> consumer, Supplier<T> provider) {
        T value = provider.get();
        if (value != null) {
            consumer.accept(value);
        }
    }

    private static boolean hasUpstreamCommitter(DataStream<?> ds) {
        Transformation<?> dsTransformation = ds.getTransformation();
        HashSet<Integer> seenIds = new HashSet<Integer>();
        ArrayDeque pendingsTransformations = new ArrayDeque(Collections.singleton(dsTransformation));
        while (!pendingsTransformations.isEmpty()) {
            Transformation transformation = (Transformation)pendingsTransformations.poll();
            if (transformation instanceof OneInputTransformation) {
                StreamOperatorFactory operatorFactory = ((OneInputTransformation)transformation).getOperatorFactory();
                if (operatorFactory instanceof CommitterOperatorFactory) {
                    return true;
                }
                if (operatorFactory instanceof SinkWriterOperatorFactory) continue;
            }
            for (Transformation<?> input : transformation.getInputs()) {
                if (!seenIds.add(input.getId())) continue;
                pendingsTransformations.add(input);
            }
        }
        return false;
    }
}

