/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.cli.parser;

import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.flink.cdc.cli.parser.PipelineDefinitionParser;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

public class YamlPipelineDefinitionParser
implements PipelineDefinitionParser {
    private static final String SOURCE_KEY = "source";
    private static final String SINK_KEY = "sink";
    private static final String ROUTE_KEY = "route";
    private static final String TRANSFORM_KEY = "transform";
    private static final String PIPELINE_KEY = "pipeline";
    private static final String MODEL_KEY = "model";
    private static final String TYPE_KEY = "type";
    private static final String NAME_KEY = "name";
    private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
    private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";
    private static final String ROUTE_SOURCE_TABLE_KEY = "source-table";
    private static final String ROUTE_SINK_TABLE_KEY = "sink-table";
    private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
    private static final String ROUTE_DESCRIPTION_KEY = "description";
    private static final String TRANSFORM_SOURCE_TABLE_KEY = "source-table";
    private static final String TRANSFORM_PROJECTION_KEY = "projection";
    private static final String TRANSFORM_FILTER_KEY = "filter";
    private static final String TRANSFORM_DESCRIPTION_KEY = "description";
    private static final String TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY = "converter-after-transform";
    private static final String UDF_KEY = "user-defined-function";
    private static final String UDF_FUNCTION_NAME_KEY = "name";
    private static final String UDF_CLASSPATH_KEY = "classpath";
    private static final String MODEL_NAME_KEY = "model-name";
    private static final String MODEL_CLASS_NAME_KEY = "class-name";
    public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys";
    public static final String TRANSFORM_PARTITION_KEY_KEY = "partition-keys";
    public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";
    private final ObjectMapper mapper = new ObjectMapper((JsonFactory)new YAMLFactory());

    @Override
    public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfig) throws Exception {
        FileSystem fileSystem = FileSystem.get((URI)pipelineDefPath.toUri());
        FSDataInputStream pipelineInStream = fileSystem.open(pipelineDefPath);
        return this.parse(this.mapper.readTree((InputStream)pipelineInStream), globalPipelineConfig);
    }

    @Override
    public PipelineDef parse(String pipelineDefText, Configuration globalPipelineConfig) throws Exception {
        return this.parse(this.mapper.readTree(pipelineDefText), globalPipelineConfig);
    }

    private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig) throws Exception {
        ArrayList udfDefs = new ArrayList();
        ArrayList modelDefs = new ArrayList();
        if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) {
            Optional.ofNullable(((ObjectNode)pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY)).ifPresent(node -> node.forEach(udf -> udfDefs.add(this.toUdfDef((JsonNode)udf))));
            Optional.ofNullable(((ObjectNode)pipelineDefJsonNode.get(PIPELINE_KEY)).remove(MODEL_KEY)).ifPresent(node -> modelDefs.addAll(this.parseModels((JsonNode)node)));
        }
        Configuration userPipelineConfig = this.toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));
        SchemaChangeBehavior schemaChangeBehavior = (SchemaChangeBehavior)userPipelineConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
        SourceDef sourceDef = this.toSourceDef((JsonNode)Preconditions.checkNotNull((Object)pipelineDefJsonNode.get(SOURCE_KEY), (String)"Missing required field \"%s\" in pipeline definition", (Object[])new Object[]{SOURCE_KEY}));
        SinkDef sinkDef = this.toSinkDef((JsonNode)Preconditions.checkNotNull((Object)pipelineDefJsonNode.get(SINK_KEY), (String)"Missing required field \"%s\" in pipeline definition", (Object[])new Object[]{SINK_KEY}), schemaChangeBehavior);
        ArrayList transformDefs = new ArrayList();
        Optional.ofNullable(pipelineDefJsonNode.get(TRANSFORM_KEY)).ifPresent(node -> node.forEach(transform -> transformDefs.add(this.toTransformDef((JsonNode)transform))));
        ArrayList routeDefs = new ArrayList();
        Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY)).ifPresent(node -> node.forEach(route -> routeDefs.add(this.toRouteDef((JsonNode)route))));
        Configuration pipelineConfig = new Configuration();
        pipelineConfig.addAll(globalPipelineConfig);
        pipelineConfig.addAll(userPipelineConfig);
        return new PipelineDef(sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
    }

    private SourceDef toSourceDef(JsonNode sourceNode) {
        Map sourceMap = (Map)this.mapper.convertValue((Object)sourceNode, (TypeReference)new TypeReference<Map<String, String>>(){});
        String type = (String)Preconditions.checkNotNull(sourceMap.remove(TYPE_KEY), (String)"Missing required field \"%s\" in source configuration", (Object[])new Object[]{TYPE_KEY});
        String name = (String)sourceMap.remove("name");
        return new SourceDef(type, name, Configuration.fromMap((Map)sourceMap));
    }

    private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
        ArrayList includedSETypes = new ArrayList();
        ArrayList excludedSETypes = new ArrayList();
        boolean excludedFieldNotPresent = sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES) == null;
        Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES)).ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText())));
        Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES)).ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));
        if (includedSETypes.isEmpty()) {
            Arrays.stream(SchemaChangeEventTypeFamily.ALL).map(SchemaChangeEventType::getTag).forEach(includedSETypes::add);
        }
        if (excludedFieldNotPresent && SchemaChangeBehavior.LENIENT.equals((Object)schemaChangeBehavior)) {
            Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE).map(SchemaChangeEventType::getTag).forEach(excludedSETypes::add);
        }
        Set declaredSETypes = ChangeEventUtils.resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);
        if (sinkNode instanceof ObjectNode) {
            ((ObjectNode)sinkNode).remove(INCLUDE_SCHEMA_EVOLUTION_TYPES);
            ((ObjectNode)sinkNode).remove(EXCLUDE_SCHEMA_EVOLUTION_TYPES);
        }
        Map sinkMap = (Map)this.mapper.convertValue((Object)sinkNode, (TypeReference)new TypeReference<Map<String, String>>(){});
        String type = (String)Preconditions.checkNotNull(sinkMap.remove(TYPE_KEY), (String)"Missing required field \"%s\" in sink configuration", (Object[])new Object[]{TYPE_KEY});
        String name = (String)sinkMap.remove("name");
        return new SinkDef(type, name, Configuration.fromMap((Map)sinkMap), declaredSETypes);
    }

    private RouteDef toRouteDef(JsonNode routeNode) {
        String sourceTable = ((JsonNode)Preconditions.checkNotNull((Object)routeNode.get("source-table"), (String)"Missing required field \"%s\" in route configuration", (Object[])new Object[]{"source-table"})).asText();
        String sinkTable = ((JsonNode)Preconditions.checkNotNull((Object)routeNode.get(ROUTE_SINK_TABLE_KEY), (String)"Missing required field \"%s\" in route configuration", (Object[])new Object[]{ROUTE_SINK_TABLE_KEY})).asText();
        String replaceSymbol = Optional.ofNullable(routeNode.get(ROUTE_REPLACE_SYMBOL)).map(JsonNode::asText).orElse(null);
        String description = Optional.ofNullable(routeNode.get("description")).map(JsonNode::asText).orElse(null);
        return new RouteDef(sourceTable, sinkTable, replaceSymbol, description);
    }

    private UdfDef toUdfDef(JsonNode udfNode) {
        String functionName = ((JsonNode)Preconditions.checkNotNull((Object)udfNode.get("name"), (String)"Missing required field \"%s\" in UDF configuration", (Object[])new Object[]{"name"})).asText();
        String classpath = ((JsonNode)Preconditions.checkNotNull((Object)udfNode.get(UDF_CLASSPATH_KEY), (String)"Missing required field \"%s\" in UDF configuration", (Object[])new Object[]{UDF_CLASSPATH_KEY})).asText();
        return new UdfDef(functionName, classpath);
    }

    private TransformDef toTransformDef(JsonNode transformNode) {
        String sourceTable = ((JsonNode)Preconditions.checkNotNull((Object)transformNode.get("source-table"), (String)"Missing required field \"%s\" in transform configuration", (Object[])new Object[]{"source-table"})).asText();
        String projection = Optional.ofNullable(transformNode.get(TRANSFORM_PROJECTION_KEY)).map(JsonNode::asText).orElse(null);
        if (!StringUtils.isNullOrWhitespaceOnly((String)projection) && projection.contains("\\*")) {
            projection = projection.replace("\\*", "*");
        }
        String filter = Optional.ofNullable(transformNode.get(TRANSFORM_FILTER_KEY)).map(JsonNode::asText).orElse(null);
        String primaryKeys = Optional.ofNullable(transformNode.get(TRANSFORM_PRIMARY_KEY_KEY)).map(JsonNode::asText).orElse(null);
        String partitionKeys = Optional.ofNullable(transformNode.get(TRANSFORM_PARTITION_KEY_KEY)).map(JsonNode::asText).orElse(null);
        String tableOptions = Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY)).map(JsonNode::asText).orElse(null);
        String description = Optional.ofNullable(transformNode.get("description")).map(JsonNode::asText).orElse(null);
        String postTransformConverter = Optional.ofNullable(transformNode.get(TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY)).map(JsonNode::asText).orElse(null);
        return new TransformDef(sourceTable, projection, filter, primaryKeys, partitionKeys, tableOptions, description, postTransformConverter);
    }

    private Configuration toPipelineConfig(JsonNode pipelineConfigNode) {
        if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
            return new Configuration();
        }
        Map pipelineConfigMap = (Map)this.mapper.convertValue((Object)pipelineConfigNode, (TypeReference)new TypeReference<Map<String, String>>(){});
        return Configuration.fromMap((Map)pipelineConfigMap);
    }

    private List<ModelDef> parseModels(JsonNode modelsNode) {
        ArrayList<ModelDef> modelDefs = new ArrayList<ModelDef>();
        Preconditions.checkNotNull((Object)modelsNode, (String)"`model` in `pipeline` should not be empty.");
        if (modelsNode.isArray()) {
            for (JsonNode modelNode : modelsNode) {
                modelDefs.add(this.convertJsonNodeToModelDef(modelNode));
            }
        } else {
            modelDefs.add(this.convertJsonNodeToModelDef(modelsNode));
        }
        return modelDefs;
    }

    private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) {
        String name = ((JsonNode)Preconditions.checkNotNull((Object)modelNode.get(MODEL_NAME_KEY), (String)"Missing required field \"%s\" in `model`", (Object[])new Object[]{MODEL_NAME_KEY})).asText();
        String model = ((JsonNode)Preconditions.checkNotNull((Object)modelNode.get(MODEL_CLASS_NAME_KEY), (String)"Missing required field \"%s\" in `model`", (Object[])new Object[]{MODEL_CLASS_NAME_KEY})).asText();
        Map properties = (Map)this.mapper.convertValue((Object)modelNode, Map.class);
        return new ModelDef(name, model, properties);
    }
}

