/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.translator;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.util.function.SerializableSupplier;

@Internal
public class DataSinkTranslator {
    private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
    private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";

    public DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig, StreamExecutionEnvironment env) {
        DataSinkFactory sinkFactory = FactoryDiscoveryUtils.getFactoryByIdentifier(sinkDef.getType(), DataSinkFactory.class);
        FactoryDiscoveryUtils.getJarPathByIdentifier(sinkFactory).ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
        return sinkFactory.createDataSink((Factory.Context)new FactoryHelper.DefaultContext(sinkDef.getConfig(), pipelineConfig, Thread.currentThread().getContextClassLoader()));
    }

    public void translate(SinkDef sinkDef, DataStream<Event> input, DataSink dataSink, OperatorID schemaOperatorID) {
        this.translate(sinkDef, input, dataSink, false, schemaOperatorID);
    }

    public void translate(SinkDef sinkDef, DataStream<Event> input, DataSink dataSink, boolean isBatchMode, OperatorID schemaOperatorID) {
        EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
        String sinkName = this.generateSinkName(sinkDef);
        if (eventSinkProvider instanceof FlinkSinkProvider) {
            FlinkSinkProvider sinkProvider = (FlinkSinkProvider)eventSinkProvider;
            Sink sink = sinkProvider.getSink();
            this.sinkTo(input, (Sink<Event>)sink, sinkName, isBatchMode, schemaOperatorID);
        } else if (eventSinkProvider instanceof FlinkSinkFunctionProvider) {
            FlinkSinkFunctionProvider sinkFunctionProvider = (FlinkSinkFunctionProvider)eventSinkProvider;
            SinkFunction sinkFunction = sinkFunctionProvider.getSinkFunction();
            this.sinkTo(input, (SinkFunction<Event>)sinkFunction, sinkName, isBatchMode, schemaOperatorID);
        }
    }

    @VisibleForTesting
    void sinkTo(DataStream<Event> input, Sink<Event> sink, String sinkName, boolean isBatchMode, OperatorID schemaOperatorID) {
        DataStream stream = input;
        if (sink instanceof WithPreWriteTopology) {
            stream = ((WithPreWriteTopology)sink).addPreWriteTopology(stream);
        }
        if (sink instanceof TwoPhaseCommittingSink) {
            this.addCommittingTopology(sink, (DataStream<Event>)stream, sinkName, isBatchMode, schemaOperatorID);
        } else {
            stream.transform(SINK_WRITER_PREFIX + sinkName, CommittableMessageTypeInfo.noOutput(), (OneInputStreamOperatorFactory)new DataSinkWriterOperatorFactory(sink, isBatchMode, schemaOperatorID));
        }
    }

    private void sinkTo(DataStream<Event> input, SinkFunction<Event> sinkFunction, String sinkName, boolean isBatchMode, OperatorID schemaOperatorID) {
        Object sinkOperator = isBatchMode ? new BatchDataSinkFunctionOperator(sinkFunction) : new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
        StreamExecutionEnvironment executionEnvironment = input.getExecutionEnvironment();
        LegacySinkTransformation transformation = new LegacySinkTransformation(input.getTransformation(), SINK_WRITER_PREFIX + sinkName, (StreamSink)sinkOperator, executionEnvironment.getParallelism(), false);
        executionEnvironment.addOperator((Transformation)transformation);
    }

    private <CommT> void addCommittingTopology(Sink<Event> sink, DataStream<Event> inputStream, String sinkName, boolean isBatchMode, OperatorID schemaOperatorID) {
        SingleOutputStreamOperator written;
        TypeInformation typeInformation = CommittableMessageTypeInfo.of((SerializableSupplier & Serializable)() -> DataSinkTranslator.getCommittableSerializer(sink));
        SingleOutputStreamOperator preCommitted = written = inputStream.transform(SINK_WRITER_PREFIX + sinkName, typeInformation, (OneInputStreamOperatorFactory)new DataSinkWriterOperatorFactory(sink, isBatchMode, schemaOperatorID));
        if (sink instanceof WithPreCommitTopology) {
            preCommitted = ((WithPreCommitTopology)sink).addPreCommitTopology((DataStream)written);
        }
        boolean isCheckpointingEnabled = true;
        SingleOutputStreamOperator committed = preCommitted.transform(SINK_COMMITTER_PREFIX + sinkName, typeInformation, DataSinkTranslator.getCommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled));
        if (sink instanceof WithPostCommitTopology) {
            ((WithPostCommitTopology)sink).addPostCommitTopology((DataStream)committed);
        }
    }

    private String generateSinkName(SinkDef sinkDef) {
        return sinkDef.getName().orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType()));
    }

    private static <CommT> SimpleVersionedSerializer<CommT> getCommittableSerializer(Object sink) {
        try {
            return (SimpleVersionedSerializer)sink.getClass().getDeclaredMethod("getCommittableSerializer", new Class[0]).invoke(sink, new Object[0]);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Failed to get CommittableSerializer", e);
        }
    }

    private static <CommT> OneInputStreamOperatorFactory<CommittableMessage<CommT>, CommittableMessage<CommT>> getCommitterOperatorFactory(Sink<Event> sink, boolean isBatchMode, boolean isCheckpointingEnabled) {
        try {
            return (OneInputStreamOperatorFactory)Class.forName("org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory").getDeclaredConstructors()[0].newInstance(sink, isBatchMode, isCheckpointingEnabled);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
            throw new RuntimeException("Failed to create CommitterOperatorFactory", e);
        }
    }
}

