/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.util.Preconditions;

@Internal
public class SinkTransformationTranslator<Input, Output>
implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
    @Override
    public Collection<Integer> translateForBatch(SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context, true);
    }

    @Override
    public Collection<Integer> translateForStreaming(SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context) {
        return this.translateInternal(transformation, context, false);
    }

    private Collection<Integer> translateInternal(SinkTransformation<Input, Output> transformation, TransformationTranslator.Context context, boolean batch) {
        SinkExpander<Input> expander = new SinkExpander<Input>(transformation.getInputStream(), transformation.getSink(), transformation, context, batch);
        expander.expand();
        return Collections.emptyList();
    }

    private static class SinkExpander<T> {
        private final SinkTransformation<T, ?> transformation;
        private final Sink<T> sink;
        private final TransformationTranslator.Context context;
        private final DataStream<T> inputStream;
        private final StreamExecutionEnvironment executionEnvironment;
        private final Optional<Integer> environmentParallelism;
        private final boolean isBatchMode;
        private final boolean isCheckpointingEnabled;

        public SinkExpander(DataStream<T> inputStream, Sink<T> sink, SinkTransformation<T, ?> transformation, TransformationTranslator.Context context, boolean isBatchMode) {
            this.inputStream = inputStream;
            this.executionEnvironment = inputStream.getExecutionEnvironment();
            this.environmentParallelism = this.executionEnvironment.getConfig().toConfiguration().getOptional(CoreOptions.DEFAULT_PARALLELISM);
            this.isCheckpointingEnabled = this.executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
            this.transformation = transformation;
            this.sink = sink;
            this.context = context;
            this.isBatchMode = isBatchMode;
        }

        private void expand() {
            int sizeBefore = this.executionEnvironment.getTransformations().size();
            DataStream prewritten = this.inputStream;
            if (this.sink instanceof SupportsPreWriteTopology) {
                prewritten = this.adjustTransformations(prewritten, ((SupportsPreWriteTopology)this.sink)::addPreWriteTopology, true, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            if (this.sink instanceof SupportsPreCommitTopology) {
                Preconditions.checkArgument((boolean)(this.sink instanceof SupportsCommitter), (Object)"Sink with SupportsPreCommitTopology should implement SupportsCommitter");
            }
            if (this.sink instanceof SupportsPostCommitTopology) {
                Preconditions.checkArgument((boolean)(this.sink instanceof SupportsCommitter), (Object)"Sink with SupportsPostCommitTopology should implement SupportsCommitter");
            }
            if (this.sink instanceof SupportsCommitter) {
                this.addCommittingTopology(this.sink, prewritten);
            } else {
                this.adjustTransformations(prewritten, input -> input.transform("Writer", CommittableMessageTypeInfo.noOutput(), new SinkWriterOperatorFactory(this.sink)), false, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            this.getSinkTransformations(sizeBefore).forEach(this.context::transform);
            this.repeatUntilConverged(() -> this.getSinkTransformations(sizeBefore).stream().flatMap(t -> this.context.transform((Transformation<?>)t).stream()).collect(Collectors.toList()));
            this.disallowUnalignedCheckpoint(this.getSinkTransformations(sizeBefore));
            while (this.executionEnvironment.getTransformations().size() > sizeBefore) {
                this.executionEnvironment.getTransformations().remove(this.executionEnvironment.getTransformations().size() - 1);
            }
        }

        private <R> void repeatUntilConverged(Supplier<R> producer) {
            R nextResult;
            R lastResult = producer.get();
            while (!lastResult.equals(nextResult = producer.get())) {
                lastResult = nextResult;
            }
        }

        private List<Transformation<?>> getSinkTransformations(int sizeBefore) {
            return this.executionEnvironment.getTransformations().subList(sizeBefore, this.executionEnvironment.getTransformations().size());
        }

        private void disallowUnalignedCheckpoint(List<Transformation<?>> sinkTransformations) {
            Optional<Transformation> writerOpt = sinkTransformations.stream().filter(SinkExpander::isWriter).findFirst();
            Preconditions.checkState((boolean)writerOpt.isPresent(), (Object)"Writer transformation not found.");
            Transformation writer = writerOpt.get();
            int indexOfWriter = sinkTransformations.indexOf(writer);
            HashSet<Integer> seen = new HashSet<Integer>(sinkTransformations.size() * 2);
            seen.add(writer.getId());
            ArrayDeque pending = new ArrayDeque(sinkTransformations.subList(indexOfWriter + 1, sinkTransformations.size()));
            while (!pending.isEmpty()) {
                Transformation current = (Transformation)pending.poll();
                seen.add(current.getId());
                for (Transformation input : current.getInputs()) {
                    if (input instanceof PartitionTransformation) {
                        ((PartitionTransformation)input).getPartitioner().disableUnalignedCheckpoints();
                    }
                    if (!seen.add(input.getId())) continue;
                    pending.add(input);
                }
            }
        }

        private static boolean isWriter(Transformation<?> t) {
            if (!(t instanceof OneInputTransformation)) {
                return false;
            }
            return ((OneInputTransformation)t).getOperatorFactory() instanceof SinkWriterOperatorFactory;
        }

        private <CommT, WriteResultT> void addCommittingTopology(Sink<T> sink, DataStream<T> inputStream) {
            DataStream precommitted;
            SupportsCommitter committingSink = (SupportsCommitter)sink;
            TypeInformation committableTypeInformation = CommittableMessageTypeInfo.of(() -> ((SupportsCommitter)committingSink).getCommittableSerializer());
            if (sink instanceof SupportsPreCommitTopology) {
                SupportsPreCommitTopology preCommittingSink = (SupportsPreCommitTopology)sink;
                TypeInformation writeResultTypeInformation = CommittableMessageTypeInfo.of(preCommittingSink::getWriteResultSerializer);
                DataStream writerResult = this.addWriter(sink, inputStream, writeResultTypeInformation);
                precommitted = this.adjustTransformations(writerResult, preCommittingSink::addPreCommitTopology, true, false);
            } else {
                precommitted = this.addWriter(sink, inputStream, committableTypeInformation);
            }
            DataStream committed = this.adjustTransformations(precommitted, pc -> pc.transform("Committer", committableTypeInformation, new CommitterOperatorFactory(committingSink, this.isBatchMode, this.isCheckpointingEnabled)), false, false);
            if (sink instanceof SupportsPostCommitTopology) {
                DataStream postcommitted = this.addFailOverRegion(committed);
                this.adjustTransformations(postcommitted, pc -> {
                    ((SupportsPostCommitTopology)sink).addPostCommitTopology(pc);
                    return null;
                }, true, false);
            }
        }

        private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(Sink<T> sink, DataStream<T> inputStream, TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
            DataStream written = this.adjustTransformations(inputStream, input -> input.transform("Writer", typeInformation, new SinkWriterOperatorFactory(sink)), false, sink instanceof SupportsConcurrentExecutionAttempts);
            return this.addFailOverRegion(written);
        }

        private <I> DataStream<I> addFailOverRegion(DataStream<I> input) {
            return new DataStream<I>(this.executionEnvironment, new PartitionTransformation<I>(input.getTransformation(), new ForwardPartitioner(), StreamExchangeMode.BATCH));
        }

        private <I, R> R adjustTransformations(DataStream<I> inputStream, Function<DataStream<I>, R> action, boolean isExpandedTopology, boolean supportsConcurrentExecutionAttempts) {
            this.executionEnvironment.setParallelism(-1);
            int numTransformsBefore = this.executionEnvironment.getTransformations().size();
            R result = action.apply(inputStream);
            List<Transformation<?>> transformations = this.executionEnvironment.getTransformations();
            List<Transformation<?>> expandedTransformations = transformations.subList(numTransformsBefore, transformations.size());
            CustomSinkOperatorUidHashes operatorsUidHashes = this.transformation.getSinkOperatorsUidHashes();
            for (Transformation<?> subTransformation : expandedTransformations) {
                Optional ssg;
                String subUid = subTransformation.getUid();
                if (isExpandedTopology && subUid != null && !subUid.isEmpty()) {
                    Preconditions.checkState((this.transformation.getUid() != null && !this.transformation.getUid().isEmpty() ? 1 : 0) != 0, (Object)("Sink " + this.transformation.getName() + " requires to set a uid since its customized topology has set uid for some operators."));
                }
                this.setOperatorUidHashIfPossible(subTransformation, "Writer", operatorsUidHashes.getWriterUidHash());
                this.setOperatorUidHashIfPossible(subTransformation, "Committer", operatorsUidHashes.getCommitterUidHash());
                this.setOperatorUidHashIfPossible(subTransformation, "Global Committer", operatorsUidHashes.getGlobalCommitterUidHash());
                this.concatUid(subTransformation, subTransformation.getName());
                this.concatProperty(subTransformation, Transformation::getCoLocationGroupKey, Transformation::setCoLocationGroupKey);
                this.concatProperty(subTransformation, Transformation::getName, Transformation::setName);
                this.concatProperty(subTransformation, Transformation::getDescription, Transformation::setDescription);
                String coLocationGroupKey = this.transformation.getCoLocationGroupKey();
                if (coLocationGroupKey != null && subTransformation.getCoLocationGroupKey() == null) {
                    subTransformation.setCoLocationGroupKey(coLocationGroupKey);
                }
                if ((ssg = this.transformation.getSlotSharingGroup()).isPresent() && !subTransformation.getSlotSharingGroup().isPresent()) {
                    subTransformation.setSlotSharingGroup((SlotSharingGroup)ssg.get());
                }
                if (subTransformation.getParallelism() == -1) {
                    subTransformation.setParallelism(this.transformation.getParallelism(), this.transformation.isParallelismConfigured());
                }
                if (subTransformation.getMaxParallelism() < 0 && this.transformation.getMaxParallelism() > 0) {
                    subTransformation.setMaxParallelism(this.transformation.getMaxParallelism());
                }
                if (!(subTransformation instanceof PhysicalTransformation)) continue;
                PhysicalTransformation physicalSubTransformation = (PhysicalTransformation)subTransformation;
                if (this.transformation.getChainingStrategy() != null) {
                    physicalSubTransformation.setChainingStrategy(this.transformation.getChainingStrategy());
                }
                physicalSubTransformation.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts);
            }
            if (this.environmentParallelism.isPresent()) {
                this.executionEnvironment.getConfig().setParallelism(this.environmentParallelism.get().intValue());
            } else {
                this.executionEnvironment.getConfig().resetParallelism();
            }
            return result;
        }

        private void setOperatorUidHashIfPossible(Transformation<?> transformation, String writerName, @Nullable String operatorUidHash) {
            if (operatorUidHash == null || !transformation.getName().equals(writerName)) {
                return;
            }
            transformation.setUidHash(operatorUidHash);
        }

        private void concatUid(Transformation<?> subTransformation, @Nullable String transformationName) {
            if (transformationName != null && this.transformation.getUid() != null) {
                if (transformationName.equals("Committer")) {
                    String committerFormat = "Sink Committer: %s";
                    subTransformation.setUid(String.format("Sink Committer: %s", this.transformation.getUid()));
                    return;
                }
                if (transformationName.equals("Writer")) {
                    subTransformation.setUid(this.transformation.getUid());
                    return;
                }
                if (transformationName.equals("Global Committer")) {
                    String committerFormat = "Sink %s Global Committer";
                    subTransformation.setUid(String.format("Sink %s Global Committer", this.transformation.getUid()));
                    return;
                }
            }
            this.concatProperty(subTransformation, Transformation::getUid, Transformation::setUid);
        }

        private void concatProperty(Transformation<?> subTransformation, Function<Transformation<?>, String> getter, BiConsumer<Transformation<?>, String> setter) {
            if (getter.apply(this.transformation) != null && getter.apply(subTransformation) != null) {
                setter.accept(subTransformation, getter.apply(this.transformation) + ": " + getter.apply(subTransformation));
            }
        }
    }
}

