/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.transforms.SmtManager;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.cache.Cache;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.cache.LRUCache;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.transforms.Transformation;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.Requirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ByLogicalTableRouter<R extends ConnectRecord<R>>
implements Transformation<R> {
    private static final io.debezium.config.Field TOPIC_REGEX = io.debezium.config.Field.create("topic.regex").withDisplayName("Topic regex").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).required().withValidation(io.debezium.config.Field::isRegex).withDescription("The regex used for extracting the name of the logical table from the original topic name.");
    private static final io.debezium.config.Field TOPIC_REPLACEMENT = io.debezium.config.Field.create("topic.replacement").withDisplayName("Topic replacement").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).required().withDescription("The replacement string used in conjunction with " + TOPIC_REGEX.name() + ". This will be used to create the new topic name.");
    private static final io.debezium.config.Field KEY_ENFORCE_UNIQUENESS = io.debezium.config.Field.create("key.enforce.uniqueness").withDisplayName("Add source topic name into key").withType(ConfigDef.Type.BOOLEAN).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDefault(true).withDescription("Augment each record's key with a field denoting the source topic. This field distinguishes records coming from different physical tables which may otherwise have primary/unique key conflicts. If the source tables are guaranteed to have globally unique keys then this may be set to false to disable key rewriting.");
    private static final io.debezium.config.Field KEY_FIELD_REGEX = io.debezium.config.Field.create("key.field.regex").withDisplayName("Key field regex").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(io.debezium.config.Field::isRegex).withDescription("The regex used for extracting the physical table identifier from the original topic name. Now that multiple physical tables can share a topic, the event's key may need to be augmented to include fields other than just those for the record's primary/unique key, since these are not guaranteed to be unique across tables. We need some identifier added to the key that distinguishes the different physical tables.");
    private static final io.debezium.config.Field KEY_FIELD_NAME = io.debezium.config.Field.create("key.field.name").withDisplayName("Key field name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withDefault("__dbz__physicalTableIdentifier").withDescription("Each record's key schema will be augmented with this field name. The purpose of this field is to distinguish the different physical tables that can now share a single topic. Make sure not to configure a field name that is at risk of conflict with existing key schema field names.");
    private static final io.debezium.config.Field KEY_FIELD_REPLACEMENT = io.debezium.config.Field.create("key.field.replacement").withDisplayName("Key field replacement").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.LOW).withValidation(ByLogicalTableRouter::validateKeyFieldReplacement).withDescription("The replacement string used in conjunction with " + KEY_FIELD_REGEX.name() + ". This will be used to create the physical table identifier in the record's key.");
    private static final io.debezium.config.Field SCHEMA_NAME_ADJUSTMENT_MODE = io.debezium.config.Field.create("schema.name.adjustment.mode").withDisplayName("Schema Name Adjustment").withEnum(CommonConnectorConfig.SchemaNameAdjustmentMode.class, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Specify how the message key schema names derived from the resulting topic name should be adjusted for compatibility with the message converter used by the connector, including:'avro' replaces the characters that cannot be used in the Avro type name with underscore (default)'none' does not apply any adjustment");
    private static final Logger LOGGER = LoggerFactory.getLogger(ByLogicalTableRouter.class);
    private SchemaNameAdjuster schemaNameAdjuster;
    private Pattern topicRegex;
    private String topicReplacement;
    private Pattern keyFieldRegex;
    private boolean keyEnforceUniqueness;
    private String keyFieldReplacement;
    private String keyFieldName;
    private final Cache<Schema, Schema> keySchemaUpdateCache = new SynchronizedCache<Schema, Schema>(new LRUCache(16));
    private final Cache<Schema, Schema> envelopeSchemaUpdateCache = new SynchronizedCache<Schema, Schema>(new LRUCache(16));
    private final Cache<String, String> keyRegexReplaceCache = new SynchronizedCache<String, String>(new LRUCache(16));
    private final Cache<String, String> topicRegexReplaceCache = new SynchronizedCache<String, String>(new LRUCache(16));
    private SmtManager<R> smtManager;

    private static int validateKeyFieldReplacement(Configuration config, io.debezium.config.Field field, Field.ValidationOutput problems) {
        String keyFieldReplacement;
        String keyFieldRegex = config.getString(KEY_FIELD_REGEX);
        if (keyFieldRegex != null) {
            keyFieldRegex = keyFieldRegex.trim();
        }
        if ((keyFieldReplacement = config.getString(KEY_FIELD_REPLACEMENT)) != null) {
            keyFieldReplacement = keyFieldReplacement.trim();
        }
        if (!Strings.isNullOrEmpty(keyFieldRegex) && Strings.isNullOrEmpty(keyFieldReplacement)) {
            problems.accept(KEY_FIELD_REPLACEMENT, null, String.format("%s must be non-empty if %s is set.", KEY_FIELD_REPLACEMENT.name(), KEY_FIELD_REGEX.name()));
            return 1;
        }
        return 0;
    }

    @Override
    public void configure(Map<String, ?> props) {
        Configuration config = Configuration.from(props);
        Field.Set configFields = io.debezium.config.Field.setOf(TOPIC_REGEX, TOPIC_REPLACEMENT, KEY_ENFORCE_UNIQUENESS, KEY_FIELD_REGEX, KEY_FIELD_REPLACEMENT, SCHEMA_NAME_ADJUSTMENT_MODE);
        if (!config.validateAndRecord(configFields, LOGGER::error)) {
            throw new ConnectException("Unable to validate config.");
        }
        this.topicRegex = Pattern.compile(config.getString(TOPIC_REGEX));
        this.topicReplacement = config.getString(TOPIC_REPLACEMENT);
        String keyFieldRegexString = config.getString(KEY_FIELD_REGEX);
        if (keyFieldRegexString != null) {
            keyFieldRegexString = keyFieldRegexString.trim();
        }
        if (keyFieldRegexString != null && !keyFieldRegexString.isEmpty()) {
            this.keyFieldRegex = Pattern.compile(config.getString(KEY_FIELD_REGEX));
            this.keyFieldReplacement = config.getString(KEY_FIELD_REPLACEMENT);
        }
        this.keyFieldName = config.getString(KEY_FIELD_NAME);
        this.keyEnforceUniqueness = config.getBoolean(KEY_ENFORCE_UNIQUENESS);
        this.smtManager = new SmtManager(config);
        this.schemaNameAdjuster = CommonConnectorConfig.SchemaNameAdjustmentMode.parse(config.getString(SCHEMA_NAME_ADJUSTMENT_MODE)).createAdjuster();
    }

    @Override
    public R apply(R record) {
        String oldTopic = ((ConnectRecord)record).topic();
        String newTopic = this.determineNewTopic(oldTopic);
        if (newTopic == null) {
            return record;
        }
        if (newTopic.isEmpty()) {
            LOGGER.warn("Routing regex returned an empty topic name, propagating original record");
            return record;
        }
        LOGGER.debug("Applying topic name transformation from {} to {}", (Object)oldTopic, (Object)newTopic);
        Schema newKeySchema = null;
        Struct newKey = null;
        if (((ConnectRecord)record).key() != null) {
            Struct oldKey = Requirements.requireStruct(((ConnectRecord)record).key(), "Updating schema");
            newKeySchema = this.updateKeySchema(oldKey.schema(), newTopic);
            newKey = this.updateKey(newKeySchema, oldKey, oldTopic);
        }
        if (((ConnectRecord)record).value() == null || !this.smtManager.isValidEnvelope(record)) {
            return ((ConnectRecord)record).newRecord(newTopic, ((ConnectRecord)record).kafkaPartition(), newKeySchema, newKey, ((ConnectRecord)record).valueSchema(), ((ConnectRecord)record).value(), ((ConnectRecord)record).timestamp());
        }
        Struct oldEnvelope = Requirements.requireStruct(((ConnectRecord)record).value(), "Updating schema");
        Schema newEnvelopeSchema = this.updateEnvelopeSchema(oldEnvelope.schema(), newTopic);
        Struct newEnvelope = this.updateEnvelope(newEnvelopeSchema, oldEnvelope);
        return ((ConnectRecord)record).newRecord(newTopic, ((ConnectRecord)record).kafkaPartition(), newKeySchema, newKey, newEnvelopeSchema, newEnvelope, ((ConnectRecord)record).timestamp());
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        io.debezium.config.Field.group(config, null, TOPIC_REGEX, TOPIC_REPLACEMENT, KEY_ENFORCE_UNIQUENESS, KEY_FIELD_REGEX, KEY_FIELD_REPLACEMENT);
        return config;
    }

    private String determineNewTopic(String oldTopic) {
        String newTopic = this.topicRegexReplaceCache.get(oldTopic);
        if (newTopic != null) {
            return newTopic;
        }
        Matcher matcher = this.topicRegex.matcher(oldTopic);
        if (matcher.matches()) {
            newTopic = matcher.replaceFirst(this.topicReplacement);
            this.topicRegexReplaceCache.put(oldTopic, newTopic);
            return newTopic;
        }
        return null;
    }

    private Schema updateKeySchema(Schema oldKeySchema, String newTopicName) {
        Schema newKeySchema = this.keySchemaUpdateCache.get(oldKeySchema);
        if (newKeySchema != null) {
            return newKeySchema;
        }
        SchemaBuilder builder = this.copySchemaExcludingName(oldKeySchema, SchemaBuilder.struct());
        builder.name(this.schemaNameAdjuster.adjust(newTopicName + ".Key"));
        if (this.keyEnforceUniqueness) {
            builder.field(this.keyFieldName, Schema.STRING_SCHEMA);
        }
        newKeySchema = builder.build();
        this.keySchemaUpdateCache.put(oldKeySchema, newKeySchema);
        return newKeySchema;
    }

    private Struct updateKey(Schema newKeySchema, Struct oldKey, String oldTopic) {
        Struct newKey = new Struct(newKeySchema);
        for (Field field : oldKey.schema().fields()) {
            newKey.put(field.name(), oldKey.get(field));
        }
        String physicalTableIdentifier = oldTopic;
        if (this.keyEnforceUniqueness) {
            if (this.keyFieldRegex != null && (physicalTableIdentifier = this.keyRegexReplaceCache.get(oldTopic)) == null) {
                Matcher matcher = this.keyFieldRegex.matcher(oldTopic);
                if (matcher.matches()) {
                    physicalTableIdentifier = matcher.replaceFirst(this.keyFieldReplacement);
                    this.keyRegexReplaceCache.put(oldTopic, physicalTableIdentifier);
                } else {
                    physicalTableIdentifier = oldTopic;
                }
            }
            newKey.put(this.keyFieldName, (Object)physicalTableIdentifier);
        }
        return newKey;
    }

    private Schema updateEnvelopeSchema(Schema oldEnvelopeSchema, String newTopicName) {
        Schema newEnvelopeSchema = this.envelopeSchemaUpdateCache.get(oldEnvelopeSchema);
        if (newEnvelopeSchema != null) {
            return newEnvelopeSchema;
        }
        Schema oldValueSchema = oldEnvelopeSchema.field("before").schema();
        SchemaBuilder valueBuilder = this.copySchemaExcludingName(oldValueSchema, SchemaBuilder.struct());
        valueBuilder.name(this.schemaNameAdjuster.adjust(newTopicName + ".Value"));
        Schema newValueSchema = valueBuilder.build();
        SchemaBuilder envelopeBuilder = this.copySchemaExcludingName(oldEnvelopeSchema, SchemaBuilder.struct(), false);
        for (Field field : oldEnvelopeSchema.fields()) {
            String fieldName = field.name();
            Schema fieldSchema = field.schema();
            if (Objects.equals(fieldName, "before") || Objects.equals(fieldName, "after")) {
                fieldSchema = newValueSchema;
            }
            envelopeBuilder.field(fieldName, fieldSchema);
        }
        envelopeBuilder.name(this.schemaNameAdjuster.adjust(Envelope.schemaName(newTopicName)));
        newEnvelopeSchema = envelopeBuilder.build();
        this.envelopeSchemaUpdateCache.put(oldEnvelopeSchema, newEnvelopeSchema);
        return newEnvelopeSchema;
    }

    private Struct updateEnvelope(Schema newEnvelopeSchema, Struct oldEnvelope) {
        Struct newEnvelope = new Struct(newEnvelopeSchema);
        Schema newValueSchema = newEnvelopeSchema.field("before").schema();
        for (Field field : oldEnvelope.schema().fields()) {
            String fieldName = field.name();
            Object fieldValue = oldEnvelope.get(field);
            if ((Objects.equals(fieldName, "before") || Objects.equals(fieldName, "after")) && fieldValue != null) {
                fieldValue = this.updateValue(newValueSchema, Requirements.requireStruct(fieldValue, "Updating schema"));
            }
            newEnvelope.put(fieldName, fieldValue);
        }
        return newEnvelope;
    }

    private Struct updateValue(Schema newValueSchema, Struct oldValue) {
        Struct newValue = new Struct(newValueSchema);
        for (Field field : oldValue.schema().fields()) {
            newValue.put(field.name(), oldValue.get(field));
        }
        return newValue;
    }

    private SchemaBuilder copySchemaExcludingName(Schema source, SchemaBuilder builder) {
        return this.copySchemaExcludingName(source, builder, true);
    }

    private SchemaBuilder copySchemaExcludingName(Schema source, SchemaBuilder builder, boolean copyFields) {
        builder.version(source.version());
        builder.doc(source.doc());
        Map<String, String> params = source.parameters();
        if (params != null) {
            builder.parameters(params);
        }
        if (source.isOptional()) {
            builder.optional();
        } else {
            builder.required();
        }
        if (copyFields) {
            for (Field field : source.fields()) {
                builder.field(field.name(), field.schema());
            }
        }
        return builder;
    }
}

