/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.flink.translator.OperatorUidGenerator;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Internal
public class DataSourceTranslator {
    public DataStream<Event> translate(SourceDef sourceDef, DataSource dataSource, StreamExecutionEnvironment env, int sourceParallelism, OperatorUidGenerator operatorUidGenerator) {
        EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
        return this.createDataStreamSource(env, eventSourceProvider, sourceDef).setParallelism(sourceParallelism).uid(operatorUidGenerator.generateUid("source"));
    }

    private DataStreamSource<Event> createDataStreamSource(StreamExecutionEnvironment env, EventSourceProvider eventSourceProvider, SourceDef sourceDef) {
        if (eventSourceProvider instanceof FlinkSourceProvider) {
            FlinkSourceProvider sourceProvider = (FlinkSourceProvider)eventSourceProvider;
            return env.fromSource(sourceProvider.getSource(), WatermarkStrategy.noWatermarks(), sourceDef.getName().orElse(this.generateDefaultSourceName(sourceDef)), (TypeInformation)new EventTypeInfo());
        }
        if (eventSourceProvider instanceof FlinkSourceFunctionProvider) {
            FlinkSourceFunctionProvider sourceFunctionProvider = (FlinkSourceFunctionProvider)eventSourceProvider;
            DataStreamSource stream = env.addSource(sourceFunctionProvider.getSourceFunction(), (TypeInformation)new EventTypeInfo());
            sourceDef.getName().ifPresent(arg_0 -> ((DataStreamSource)stream).name(arg_0));
            return stream;
        }
        throw new IllegalStateException(String.format("Unsupported EventSourceProvider type \"%s\"", eventSourceProvider.getClass().getCanonicalName()));
    }

    public DataSource createDataSource(SourceDef sourceDef, Configuration pipelineConfig, StreamExecutionEnvironment env) {
        DataSourceFactory sourceFactory = FactoryDiscoveryUtils.getFactoryByIdentifier(sourceDef.getType(), DataSourceFactory.class);
        FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory).ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
        FactoryHelper.DefaultContext context = new FactoryHelper.DefaultContext(sourceDef.getConfig(), pipelineConfig, Thread.currentThread().getContextClassLoader());
        return sourceFactory.createDataSource((Factory.Context)context);
    }

    private String generateDefaultSourceName(SourceDef sourceDef) {
        return String.format("Flink CDC Event Source: %s", sourceDef.getType());
    }
}

