/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.config;

import io.debezium.DebeziumException;
import io.debezium.bean.DefaultBeanRegistry;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory;
import io.debezium.pipeline.txmetadata.spi.TransactionMetadataFactory;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.service.DefaultServiceRegistry;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Strings;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CommonConnectorConfig {
    public static final String TASK_ID = "task.id";
    public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_.\\-]+$");
    public static final String MULTI_PARTITION_MODE = "multi.partition.mode";
    public static final String SNAPSHOT_MODE_PROPERTY_NAME = "snapshot.mode";
    public static final String SNAPSHOT_LOCKING_MODE_PROPERTY_NAME = "snapshot.locking.mode";
    private static final Logger LOGGER = LoggerFactory.getLogger(CommonConnectorConfig.class);
    protected SnapshotQueryMode snapshotQueryMode;
    protected String snapshotQueryModeCustomName;
    protected String snapshotLockingModeCustomName;
    protected final boolean snapshotModeConfigurationBasedSnapshotData;
    protected final boolean snapshotModeConfigurationBasedSnapshotSchema;
    protected final boolean snapshotModeConfigurationBasedStream;
    protected final boolean snapshotModeConfigurationBasedSnapshotOnSchemaError;
    protected final boolean snapshotModeConfigurationBasedSnapshotOnDataError;
    protected final boolean isLogPositionCheckEnabled;
    protected final boolean isAdvancedMetricsEnabled;
    private static final String CONFLUENT_AVRO_CONVERTER = "io.confluent.connect.avro.AvroConverter";
    private static final String APICURIO_AVRO_CONVERTER = "io.apicurio.registry.utils.converter.AvroConverter";
    public static final long EXECUTOR_SHUTDOWN_TIMEOUT_SEC = 90L;
    public static final int DEFAULT_MAX_QUEUE_SIZE = 8192;
    public static final int DEFAULT_MAX_BATCH_SIZE = 2048;
    public static final int DEFAULT_QUERY_FETCH_SIZE = 0;
    public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500L;
    public static final String DATABASE_CONFIG_PREFIX = "database.";
    public static final String DRIVER_CONFIG_PREFIX = "driver.";
    private static final String CONVERTER_TYPE_SUFFIX = ".type";
    public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L;
    public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0L;
    public static final String NOTIFICATION_CONFIGURATION_FIELD_PREFIX_STRING = "notification.";
    public static final long DEFAULT_CONNECTION_VALIDATION_TIMEOUT_MS = 60000L;
    public static final int DEFAULT_MAX_RETRIES = -1;
    public static final String ERRORS_MAX_RETRIES = "errors.max.retries";
    private final int maxRetriesOnError;
    public static final Field TOPIC_PREFIX = Field.create("topic.prefix").withDisplayName("Topic prefix").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 0)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.HIGH).withValidation(CommonConnectorConfig::validateTopicName).required().withDescription("Topic prefix that identifies and provides a namespace for the particular database server/cluster is capturing changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events emitted by this connector. Only alphanumeric characters, hyphens, dots and underscores must be accepted.");
    public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms").withDisplayName("Retriable restart wait (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 18)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDefault(10000L).withDescription("Time to wait before restarting connector after retriable exception occurs. Defaults to 10000ms.").withValidation(Field::isPositiveLong);
    public static final Field TOMBSTONES_ON_DELETE = Field.create("tombstones.on.delete").withDisplayName("Change the behaviour of Debezium with regards to delete operations").withType(ConfigDef.Type.BOOLEAN).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 1)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(true).withValidation(Field::isBoolean).withDescription("Whether delete operations should be represented by a delete event and a subsequent tombstone event (true) or only by a delete event (false). Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.");
    public static final Field MAX_QUEUE_SIZE = Field.create("max.queue.size").withDisplayName("Change event buffer size").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 15)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size.").withDefault(8192).withValidation(CommonConnectorConfig::validateMaxQueueSize);
    public static final Field MAX_BATCH_SIZE = Field.create("max.batch.size").withDisplayName("Change event batch size").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 14)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Maximum size of each batch of source records. Defaults to 2048.").withDefault(2048).withValidation(Field::isPositiveInteger);
    public static final Field POLL_INTERVAL_MS = Field.create("poll.interval.ms").withDisplayName("Poll interval (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 17)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Time to wait for new change events to appear after receiving no events, given in milliseconds. Defaults to 500 ms.").withDefault(500L).withValidation(Field::isPositiveInteger);
    public static final Field MAX_QUEUE_SIZE_IN_BYTES = Field.create("max.queue.size.in.bytes").withDisplayName("Change event buffer size in bytes").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 16)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled").withDefault(0L).withValidation(Field::isNonNegativeLong);
    public static final Field SNAPSHOT_DELAY_MS = Field.create("snapshot.delay.ms").withDisplayName("Snapshot Delay (milliseconds)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 5)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("A delay period before a snapshot will begin, given in milliseconds. Defaults to 0 ms.").withDefault(0L).withValidation(Field::isNonNegativeLong);
    public static final Field SNAPSHOT_FETCH_SIZE = Field.create("snapshot.fetch.size").withDisplayName("Snapshot fetch size").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 3)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("The maximum number of records that should be loaded into memory while performing a snapshot.").withValidation(Field::isNonNegativeInteger);
    public static final Field INCREMENTAL_SNAPSHOT_CHUNK_SIZE = Field.create("incremental.snapshot.chunk.size").withDisplayName("Incremental snapshot chunk size").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("The maximum size of chunk (number of documents/rows) for incremental snapshotting").withDefault(1024).withValidation(Field::isNonNegativeInteger);
    public static final Field INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES = Field.create("incremental.snapshot.allow.schema.changes").withDisplayName("Allow schema changes during incremental snapshot if supported.").withType(ConfigDef.Type.BOOLEAN).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("Detect schema change during an incremental snapshot and re-select a current chunk to avoid locking DDLs. Note that changes to a primary key are not supported and can cause incorrect results if performed during an incremental snapshot. Another limitation is that if a schema change affects only columns' default values, then the change won't be detected until the DDL is processed from the binlog stream. This doesn't affect the snapshot events' values, but the schema of snapshot events may have outdated defaults.").withDefault(Boolean.FALSE);
    public static final Field SNAPSHOT_MODE_TABLES = Field.create("snapshot.include.collection.list").withDisplayName("Snapshot mode include data collection").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 2)).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withValidation(Field::isListOfRegex).withDescription("This setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector.");
    public static final Field PROVIDE_TRANSACTION_METADATA = Field.create("provide.transaction.metadata").withDisplayName("Store transaction metadata information in a dedicated topic.").withType(ConfigDef.Type.BOOLEAN).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 17)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("Enables transaction metadata extraction together with event counting").withDefault(Boolean.FALSE);
    public static final Field TRANSACTION_METADATA_FACTORY = Field.create("transaction.metadata.factory").withDisplayName("Factory class to create transaction context & transaction struct maker classes").withType(ConfigDef.Type.CLASS).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDefault(DefaultTransactionMetadataFactory.class.getName()).withDescription("Class to make transaction context & transaction struct/schemas");
    public static final Field EVENT_PROCESSING_FAILURE_HANDLING_MODE = Field.create("event.processing.failure.handling.mode").withDisplayName("Event deserialization failure handling").withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 12)).withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Specify how failures during processing of events (i.e. when encountering a corrupted event) should be handled, including: 'fail' (the default) an exception indicating the problematic event and its position is raised, causing the connector to be stopped; 'warn' the problematic event and its position will be logged and the event will be skipped; 'ignore' the problematic event will be skipped.");
    public static final Field CUSTOM_CONVERTERS = Field.create("converters").withDisplayName("List of prefixes defining custom values converters.").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 10)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Optional list of custom converters that would be used instead of default ones. The converters are defined using '<converter.prefix>.type' config option and configured using options '<converter.prefix>.<option>'");
    public static final Field CUSTOM_POST_PROCESSORS = Field.create("post.processors").withDisplayName("List of change event post processors.").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 998)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Optional list of post processors. The processors are defined using '<post.processor.prefix>.type' config option and configured using options '<post.processor.prefix.<option>'");
    public static final Field SKIPPED_OPERATIONS = Field.create("skipped.operations").withDisplayName("Skipped Operations").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 11)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withValidation(CommonConnectorConfig::validateSkippedOperation).withDefault("t").withDescription("The comma-separated list of operations to skip during streaming, defined as: 'c' for inserts/create; 'u' for updates; 'd' for deletes, 't' for truncates, and 'none' to indicate nothing skipped. By default, only truncate operations will be skipped.");
    public static final Field SKIP_MESSAGES_WITHOUT_CHANGE = Field.create("skip.messages.without.change").withDisplayName("Enable skipping messages without change").withType(ConfigDef.Type.BOOLEAN).withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED, 0)).withDefault(false).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Enable to skip publishing messages when there is no change in included columns.This would essentially filter messages to be sent when there is no change in columns included as per column.include.list/column.exclude.list.For Postgres - this would require REPLICA IDENTITY of table to be FULL.").withValidation(Field::isBoolean);
    public static final Field BINARY_HANDLING_MODE = Field.create("binary.handling.mode").withDisplayName("Binary Handling").withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 3)).withEnum(BinaryHandlingMode.class, BinaryHandlingMode.BYTES).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Specify how binary (blob, binary, etc.) columns should be represented in change events, including: 'bytes' represents binary data as byte array (default); 'base64' represents binary data as base64-encoded string; 'base64-url-safe' represents binary data as base64-url-safe-encoded string; 'hex' represents binary data as hex-encoded (base16) string");
    public static final Field SCHEMA_NAME_ADJUSTMENT_MODE = Field.create("schema.name.adjustment.mode").withDisplayName("Schema Name Adjustment").withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 7)).withEnum(SchemaNameAdjustmentMode.class, SchemaNameAdjustmentMode.NONE).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Specify how schema names should be adjusted for compatibility with the message converter used by the connector, including: 'avro' replaces the characters that cannot be used in the Avro type name with underscore; 'avro_unicode' replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java;'none' does not apply any adjustment (default)");
    public static final Field FIELD_NAME_ADJUSTMENT_MODE = Field.create("field.name.adjustment.mode").withDisplayName("Field Name Adjustment").withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 7)).withEnum(FieldNameAdjustmentMode.class, FieldNameAdjustmentMode.NONE).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Specify how field names should be adjusted for compatibility with the message converter used by the connector, including: 'avro' replaces the characters that cannot be used in the Avro type name with underscore; 'avro_unicode' replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java;'none' does not apply any adjustment (default)");
    public static final Field QUERY_FETCH_SIZE = Field.create("query.fetch.size").withDisplayName("Query fetch size").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 13)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("The maximum number of records that should be loaded into memory while streaming. A value of '0' uses the default JDBC fetch size.").withValidation(Field::isNonNegativeInteger).withDefault(0);
    public static final Field SNAPSHOT_MAX_THREADS = Field.create("snapshot.max.threads").withDisplayName("Snapshot maximum threads").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 7)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(1).withValidation(Field::isPositiveInteger).withDescription("The maximum number of threads used to perform the snapshot. Defaults to 1.");
    public static final Field SIGNAL_DATA_COLLECTION = Field.create("signal.data.collection").withDisplayName("Signaling data collection").withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 20)).withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("The name of the data collection that is used to send signals/commands to Debezium. Signaling is disabled when not set.");
    public static final Field SIGNAL_POLL_INTERVAL_MS = Field.create("signal.poll.interval.ms").withDisplayName("Signal processor poll interval").withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 21)).withType(ConfigDef.Type.LONG).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.MEDIUM).withDefault(5000L).withValidation(Field::isPositiveInteger).withDescription("Interval for looking for new signals in registered channels, given in milliseconds. Defaults to 5 seconds.");
    public static final Field SIGNAL_ENABLED_CHANNELS = Field.create("signal.enabled.channels").withDisplayName("Enabled channels names").withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 22)).withType(ConfigDef.Type.LIST).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withDefault("source").withDescription("List of channels names that are enabled. Source channel is enabled by default");
    public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy").withDisplayName("Topic naming strategy class").withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 23)).withType(ConfigDef.Type.CLASS).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc.").withDefault(SchemaTopicNamingStrategy.class.getName());
    public static final Field CUSTOM_RETRIABLE_EXCEPTION = Field.createInternal("custom.retriable.exception").withDisplayName("Regular expression to match the exception message.").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 999)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Provide a temporary workaround for an error that should be retriable. If set a stacktrace of non-retriable exception is traversed and messages are matched against this regular expression. If matched the error is changed to retriable.").withDefault(false);
    public static final Field NOTIFICATION_ENABLED_CHANNELS = Field.create("notification.enabled.channels").withDisplayName("Enabled notification channels names").withType(ConfigDef.Type.LIST).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.MEDIUM).withDescription("List of notification channels names that are enabled.");
    public static final Field SOURCE_INFO_STRUCT_MAKER = Field.create("sourceinfo.struct.maker").withDisplayName("Source info struct maker class").withType(ConfigDef.Type.CLASS).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("The name of the SourceInfoStructMaker class that returns SourceInfo schema and struct.");
    public static final Field MAX_RETRIES_ON_ERROR = Field.create("errors.max.retries").withDisplayName("The maximum number of retries").withType(ConfigDef.Type.INT).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 24)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDefault(-1).withValidation(Field::isInteger).withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
    public static final Field CUSTOM_METRIC_TAGS = Field.create("custom.metric.tags").withDisplayName("Customize metric tags").withType(ConfigDef.Type.LIST).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 25)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withValidation(Field::isListOfMap).withDescription("The custom metric tags will accept key-value pairs to customize the MBean object name which should be appended the end of regular name, each key would represent a tag for the MBean object name, and the corresponding value would be the value of that tag the key is. For example: k1=v1,k2=v2");
    public static final Field INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY = Field.create("incremental.snapshot.watermarking.strategy").withDisplayName("Incremental snapshot watermarking strategy").withEnum(WatermarkStrategy.class, WatermarkStrategy.INSERT_INSERT).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("Specify the strategy used for watermarking during an incremental snapshot: 'insert_insert' both open and close signal is written into signal data collection (default); 'insert_delete' only open signal is written on signal data collection, the close will delete the relative open signal;");
    public static final Field SNAPSHOT_MODE_CUSTOM_NAME = Field.create("snapshot.mode.custom.name").withDisplayName("Snapshot Mode Custom Name").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 12)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withValidation((config, field, output) -> {
        if ("custom".equalsIgnoreCase(config.getString(SNAPSHOT_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
            output.accept(field, "", "snapshot.mode.custom.name cannot be empty when snapshot.mode 'custom' is defined");
            return 1;
        }
        return 0;
    }).withDescription("When 'snapshot.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. The implementations must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot.");
    public static final Field EVENT_CONVERTING_FAILURE_HANDLING_MODE = Field.create("event.converting.failure.handling.mode").withDisplayName("Event converting failure handling mode").withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 26)).withEnum(EventConvertingFailureHandlingMode.class, EventConvertingFailureHandlingMode.WARN).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withDescription("Specify how failures during converting of event should be handled, including: 'fail' throw an exception that the column of event conversion is failed with unmatched schema type, causing the connector to be stopped. it could need schema recovery to covert successfully; 'warn' (the default) the value of column of event that conversion failed will be null and be logged with warn level; 'skip' the value of column of event that conversion failed will be null and be logged with debug level.");
    public static final Field STREAMING_DELAY_MS = Field.create("streaming.delay.ms").withDisplayName("Streaming Delay (milliseconds)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 27)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.LOW).withDescription("A delay period after the snapshot is completed and the streaming begins, given in milliseconds. Defaults to 0 ms.").withDefault(0L).withValidation(Field::isNonNegativeLong);
    public static final Field SNAPSHOT_LOCKING_MODE_CUSTOM_NAME = Field.create("snapshot.locking.mode.custom.name").withDisplayName("Snapshot Locking Mode Custom Name").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 14)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withValidation((config, field, output) -> {
        if ("custom".equalsIgnoreCase(config.getString(SNAPSHOT_LOCKING_MODE_PROPERTY_NAME)) && config.getString(field, "").isEmpty()) {
            output.accept(field, "", "snapshot.locking.mode.custom.name cannot be empty when snapshot.locking.mode 'custom' is defined");
            return 1;
        }
        return 0;
    }).withDescription("When 'snapshot.locking.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. The implementations must implement the 'SnapshotterLocking' interface and is called to determine how to lock tables during schema snapshot.");
    public static final Field SNAPSHOT_QUERY_MODE = Field.create("snapshot.query.mode").withDisplayName("Snapshot query mode").withEnum(SnapshotQueryMode.class, SnapshotQueryMode.SELECT_ALL).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 15)).withDescription("Controls query used during the snapshot");
    public static final Field SNAPSHOT_QUERY_MODE_CUSTOM_NAME = Field.create("snapshot.query.mode.custom.name").withDisplayName("Snapshot Query Mode Custom Name").withType(ConfigDef.Type.STRING).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 16)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).withValidation((config, field, output) -> {
        if ("custom".equalsIgnoreCase(config.getString(SNAPSHOT_QUERY_MODE)) && config.getString(field, "").isEmpty()) {
            output.accept(field, "", "snapshot.query.mode.custom.name cannot be empty when snapshot.query.mode 'custom' is defined");
            return 1;
        }
        return 0;
    }).withDescription("When 'snapshot.query.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. The implementations must implement the 'SnapshotterQuery' interface and is called to determine how to build queries during snapshot.");
    public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA = Field.create("snapshot.mode.configuration.based.snapshot.data").withDisplayName("Snapshot mode property based snapshot data").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 17)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When 'snapshot.mode' is set as configuration_based, this setting permits to specify whenever the data should be snapshotted or not.");
    public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA = Field.create("snapshot.mode.configuration.based.snapshot.schema").withDisplayName("Snapshot mode property based snapshot schema").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 18)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When 'snapshot.mode' is set as configuration_based, this setting permits to specify whenever the schema should be snapshotted or not.");
    public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM = Field.create("snapshot.mode.configuration.based.start.stream").withDisplayName("Snapshot mode property based start stream").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 19)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When 'snapshot.mode' is set as configuration_based, this setting permits to specify whenever the stream should start or not after snapshot.");
    public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR = Field.create("snapshot.mode.configuration.based.snapshot.on.schema.error").withDisplayName("Snapshot mode property based snapshot on schema error").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 20)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When 'snapshot.mode' is set as configuration_based, this setting permits to specify whenever the schema should be snapshotted or not in case of error.");
    public static final Field SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR = Field.create("snapshot.mode.configuration.based.snapshot.on.data.error").withDisplayName("Snapshot mode property based snapshot on data error").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 21)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When 'snapshot.mode' is set as configuration_based, this setting permits to specify whenever the data should be snapshotted or not in case of error.");
    public static final Field LOG_POSITION_CHECK_ENABLED = Field.createInternal("log.position.check.enable").withDisplayName("Enable/Disable log position check").withType(ConfigDef.Type.BOOLEAN).withDefault(true).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 30)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When enabled the connector checks if the position stored in the offset is still available in the log");
    public static final Field ADVANCED_METRICS_ENABLE = Field.createInternal("advanced.metrics.enable").withDisplayName("Enable/Disable advance metrics").withType(ConfigDef.Type.BOOLEAN).withDefault(false).withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 31)).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.MEDIUM).optional().withDescription("When enabled the connector will emit advanced streaming metrics");
    public static final Field CONNECTION_VALIDATION_TIMEOUT_MS = Field.create("connection.validation.timeout.ms").withDisplayName("Connection validation timeout (ms)").withType(ConfigDef.Type.LONG).withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 13)).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDefault(60000L).withDescription("The maximum time in milliseconds to wait for connection validation to complete. Defaults to 60 seconds.").withValidation(Field::isPositiveLong);
    protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor().connector(EVENT_PROCESSING_FAILURE_HANDLING_MODE, MAX_BATCH_SIZE, MAX_QUEUE_SIZE, POLL_INTERVAL_MS, MAX_QUEUE_SIZE_IN_BYTES, PROVIDE_TRANSACTION_METADATA, SKIPPED_OPERATIONS, SNAPSHOT_DELAY_MS, STREAMING_DELAY_MS, SNAPSHOT_MODE_TABLES, SNAPSHOT_FETCH_SIZE, SNAPSHOT_MAX_THREADS, SNAPSHOT_MODE_CUSTOM_NAME, SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA, SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR, SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR, RETRIABLE_RESTART_WAIT, QUERY_FETCH_SIZE, MAX_RETRIES_ON_ERROR, INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY, LOG_POSITION_CHECK_ENABLED, ADVANCED_METRICS_ENABLE, CONNECTION_VALIDATION_TIMEOUT_MS).events(CUSTOM_CONVERTERS, CUSTOM_POST_PROCESSORS, TOMBSTONES_ON_DELETE, Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX, SIGNAL_DATA_COLLECTION, SIGNAL_POLL_INTERVAL_MS, SIGNAL_ENABLED_CHANNELS, TOPIC_NAMING_STRATEGY, NOTIFICATION_ENABLED_CHANNELS, SinkNotificationChannel.NOTIFICATION_TOPIC, TRANSACTION_METADATA_FACTORY, CUSTOM_METRIC_TAGS).create();
    private final Configuration config;
    private final boolean emitTombstoneOnDelete;
    private final int maxQueueSize;
    private final int maxBatchSize;
    private final long maxQueueSizeInBytes;
    private final Duration pollInterval;
    protected final String logicalName;
    private final String heartbeatTopicsPrefix;
    private final Duration heartbeatInterval;
    private final Duration snapshotDelay;
    private final Duration streamingDelay;
    private final Duration retriableRestartWait;
    private final int snapshotFetchSize;
    private final int incrementalSnapshotChunkSize;
    private final boolean incrementalSnapshotAllowSchemaChanges;
    private final int snapshotMaxThreads;
    private final String snapshotModeCustomName;
    private final Integer queryFetchSize;
    private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
    private final TransactionMetadataFactory transactionMetadataFactory;
    private final boolean shouldProvideTransactionMetadata;
    private final EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode;
    private final CustomConverterRegistry customConverterRegistry;
    private final BinaryHandlingMode binaryHandlingMode;
    private final SchemaNameAdjustmentMode schemaNameAdjustmentMode;
    private final FieldNameAdjustmentMode fieldNameAdjustmentMode;
    private final EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode;
    private final String signalingDataCollection;
    private final TableId signalingDataCollectionId;
    private final Duration signalPollInterval;
    private final List<String> signalEnabledChannels;
    private final EnumSet<Envelope.Operation> skippedOperations;
    private final String taskId;
    private final boolean skipMessagesWithoutChange;
    private final String notificationTopicName;
    private final List<String> enabledNotificationChannels;
    private final Map<String, String> customMetricTags;
    private WatermarkStrategy incrementalSnapshotWatermarkingStrategy;
    protected final DefaultBeanRegistry beanRegistry = new DefaultBeanRegistry();
    protected final DefaultServiceRegistry serviceRegistry;

    protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSize) {
        this.serviceRegistry = new DefaultServiceRegistry(config, this.beanRegistry);
        this.config = config;
        this.emitTombstoneOnDelete = config.getBoolean(TOMBSTONES_ON_DELETE);
        this.maxQueueSize = config.getInteger(MAX_QUEUE_SIZE);
        this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
        this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS);
        this.maxQueueSizeInBytes = config.getLong(MAX_QUEUE_SIZE_IN_BYTES);
        this.logicalName = config.getString(TOPIC_PREFIX);
        this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
        this.heartbeatInterval = config.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
        this.snapshotDelay = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS));
        this.streamingDelay = Duration.ofMillis(config.getLong(STREAMING_DELAY_MS));
        this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT));
        this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
        this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS);
        this.snapshotModeCustomName = config.getString(SNAPSHOT_MODE_CUSTOM_NAME);
        this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE);
        this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        this.incrementalSnapshotAllowSchemaChanges = config.getBoolean(INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES);
        this.schemaNameAdjustmentMode = SchemaNameAdjustmentMode.parse(config.getString(SCHEMA_NAME_ADJUSTMENT_MODE));
        this.fieldNameAdjustmentMode = FieldNameAdjustmentMode.parse(config.getString(FIELD_NAME_ADJUSTMENT_MODE));
        this.eventConvertingFailureHandlingMode = EventConvertingFailureHandlingMode.parse(config.getString(EVENT_CONVERTING_FAILURE_HANDLING_MODE));
        this.sourceInfoStructMaker = this.getSourceInfoStructMaker(Version.V2);
        this.transactionMetadataFactory = this.getTransactionMetadataFactory();
        this.shouldProvideTransactionMetadata = config.getBoolean(PROVIDE_TRANSACTION_METADATA);
        this.eventProcessingFailureHandlingMode = EventProcessingFailureHandlingMode.parse(config.getString(EVENT_PROCESSING_FAILURE_HANDLING_MODE));
        this.customConverterRegistry = new CustomConverterRegistry(this.getCustomConverters());
        this.binaryHandlingMode = BinaryHandlingMode.parse(config.getString(BINARY_HANDLING_MODE));
        this.signalingDataCollection = config.getString(SIGNAL_DATA_COLLECTION);
        this.signalPollInterval = Duration.ofMillis(config.getLong(SIGNAL_POLL_INTERVAL_MS));
        this.signalEnabledChannels = CommonConnectorConfig.getSignalEnabledChannels(config);
        this.skippedOperations = CommonConnectorConfig.determineSkippedOperations(config);
        this.taskId = config.getString(TASK_ID);
        this.notificationTopicName = config.getString(SinkNotificationChannel.NOTIFICATION_TOPIC);
        this.enabledNotificationChannels = config.getList(NOTIFICATION_ENABLED_CHANNELS);
        this.skipMessagesWithoutChange = config.getBoolean(SKIP_MESSAGES_WITHOUT_CHANGE);
        this.maxRetriesOnError = config.getInteger(MAX_RETRIES_ON_ERROR);
        this.customMetricTags = this.createCustomMetricTags(config);
        this.incrementalSnapshotWatermarkingStrategy = WatermarkStrategy.parse(config.getString(INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY));
        this.snapshotLockingModeCustomName = config.getString(SNAPSHOT_LOCKING_MODE_CUSTOM_NAME, "");
        this.snapshotQueryMode = SnapshotQueryMode.parse(config.getString(SNAPSHOT_QUERY_MODE), SNAPSHOT_QUERY_MODE.defaultValueAsString());
        this.snapshotQueryModeCustomName = config.getString(SNAPSHOT_QUERY_MODE_CUSTOM_NAME, "");
        this.snapshotModeConfigurationBasedSnapshotData = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA);
        this.snapshotModeConfigurationBasedSnapshotSchema = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA);
        this.snapshotModeConfigurationBasedStream = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM);
        this.snapshotModeConfigurationBasedSnapshotOnSchemaError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR);
        this.snapshotModeConfigurationBasedSnapshotOnDataError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR);
        this.isLogPositionCheckEnabled = config.getBoolean(LOG_POSITION_CHECK_ENABLED);
        this.isAdvancedMetricsEnabled = config.getBoolean(ADVANCED_METRICS_ENABLE);
        this.signalingDataCollectionId = !Strings.isNullOrBlank(this.signalingDataCollection) ? TableId.parse(this.signalingDataCollection) : null;
    }

    private static List<String> getSignalEnabledChannels(Configuration config) {
        if (config.hasKey(SIGNAL_ENABLED_CHANNELS)) {
            return config.getList(SIGNAL_ENABLED_CHANNELS);
        }
        return Arrays.stream(Objects.requireNonNull(SIGNAL_ENABLED_CHANNELS.defaultValueAsString()).split(",")).map(String::trim).collect(Collectors.toList());
    }

    private static EnumSet<Envelope.Operation> determineSkippedOperations(Configuration config) {
        String operations = config.getString(SKIPPED_OPERATIONS);
        if (operations != null) {
            if (operations.trim().equalsIgnoreCase("none")) {
                return EnumSet.noneOf(Envelope.Operation.class);
            }
            return EnumSet.copyOf(Arrays.stream(operations.split(",")).map(String::trim).map(Envelope.Operation::forCode).collect(Collectors.toSet()));
        }
        return EnumSet.noneOf(Envelope.Operation.class);
    }

    @Deprecated
    public Configuration getConfig() {
        return this.config;
    }

    public BeanRegistry getBeanRegistry() {
        return this.beanRegistry;
    }

    public ServiceRegistry getServiceRegistry() {
        return this.serviceRegistry;
    }

    public boolean isEmitTombstoneOnDelete() {
        return this.emitTombstoneOnDelete;
    }

    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public int getMaxBatchSize() {
        return this.maxBatchSize;
    }

    public long getMaxQueueSizeInBytes() {
        return this.maxQueueSizeInBytes;
    }

    public Duration getPollInterval() {
        return this.pollInterval;
    }

    public String getLogicalName() {
        return this.logicalName;
    }

    public abstract String getContextName();

    public abstract String getConnectorName();

    public abstract EnumeratedValue getSnapshotMode();

    public abstract Optional<? extends EnumeratedValue> getSnapshotLockingMode();

    public String getHeartbeatTopicsPrefix() {
        return this.heartbeatTopicsPrefix;
    }

    public Duration getHeartbeatInterval() {
        return this.heartbeatInterval;
    }

    public Duration getSnapshotDelay() {
        return this.snapshotDelay;
    }

    public Duration getStreamingDelay() {
        return this.streamingDelay;
    }

    public int getSnapshotFetchSize() {
        return this.snapshotFetchSize;
    }

    public int getSnapshotMaxThreads() {
        return this.snapshotMaxThreads;
    }

    public String getSnapshotModeCustomName() {
        return this.snapshotModeCustomName;
    }

    public int getQueryFetchSize() {
        return this.queryFetchSize;
    }

    public int getIncrementalSnapshotChunkSize() {
        return this.incrementalSnapshotChunkSize;
    }

    public String getNotificationTopic() {
        return this.notificationTopicName;
    }

    public List<String> getEnabledNotificationChannels() {
        return this.enabledNotificationChannels;
    }

    public boolean shouldProvideTransactionMetadata() {
        return this.shouldProvideTransactionMetadata;
    }

    public boolean skipMessagesWithoutChange() {
        return this.skipMessagesWithoutChange;
    }

    public EventProcessingFailureHandlingMode getEventProcessingFailureHandlingMode() {
        return this.eventProcessingFailureHandlingMode;
    }

    public CustomConverterRegistry customConverterRegistry() {
        return this.customConverterRegistry;
    }

    public boolean supportsOperationFiltering() {
        return false;
    }

    protected boolean supportsSchemaChangesDuringIncrementalSnapshot() {
        return false;
    }

    public boolean isIncrementalSnapshotSchemaChangesEnabled() {
        return this.supportsSchemaChangesDuringIncrementalSnapshot() && this.incrementalSnapshotAllowSchemaChanges;
    }

    public TopicNamingStrategy getTopicNamingStrategy(Field topicNamingStrategyField) {
        return this.getTopicNamingStrategy(topicNamingStrategyField, false);
    }

    public TopicNamingStrategy getTopicNamingStrategy(Field topicNamingStrategyField, boolean multiPartitionMode) {
        Properties props = this.config.asProperties();
        props.put(MULTI_PARTITION_MODE, (Object)multiPartitionMode);
        String strategyName = this.config.getString(topicNamingStrategyField);
        TopicNamingStrategy topicNamingStrategy = this.config.getInstance(topicNamingStrategyField, TopicNamingStrategy.class, props);
        if (topicNamingStrategy == null) {
            throw new ConnectException("Unable to instantiate the topic naming strategy class " + strategyName);
        }
        LOGGER.info("Loading the custom topic naming strategy plugin: {}", (Object)strategyName);
        return topicNamingStrategy;
    }

    private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters() {
        String converterNameList = this.config.getString(CUSTOM_CONVERTERS);
        List<String> converterNames = Strings.listOf(converterNameList, x -> x.split(","), String::trim);
        return converterNames.stream().map(name -> {
            CustomConverter converter = this.config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
            converter.configure(this.config.subset((String)name, true).asProperties());
            return converter;
        }).collect(Collectors.toList());
    }

    public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStructMaker() {
        return this.sourceInfoStructMaker;
    }

    public TransactionMetadataFactory getTransactionMetadataFactory() {
        return this.getTransactionMetadataFactory(TRANSACTION_METADATA_FACTORY);
    }

    public EnumSet<Envelope.Operation> getSkippedOperations() {
        return this.skippedOperations;
    }

    public List<String> getDataCollectionsToBeSnapshotted() {
        return Optional.ofNullable(this.config.getString(SNAPSHOT_MODE_TABLES)).map(expr -> Arrays.asList(Strings.RegExSplitter.split(expr))).orElseGet(Collections::emptyList);
    }

    public Map<String, String> getCustomMetricTags() {
        return this.customMetricTags;
    }

    public Map<String, String> createCustomMetricTags(Configuration config) {
        LinkedHashMap<String, String> result = new LinkedHashMap<String, String>();
        String rawValue = config.getString(CUSTOM_METRIC_TAGS);
        if (Strings.isNullOrBlank(rawValue)) {
            return result;
        }
        List<String> values = Strings.listOf(rawValue, x -> x.split(","), String::trim);
        for (String v : values) {
            List<String> items = Strings.listOf(v, x -> x.split("="), String::trim);
            result.put(items.get(0), items.get(1));
        }
        if (result.size() < values.size()) {
            LOGGER.warn("There are duplicated key-value pairs: {}", (Object)rawValue);
        }
        return result;
    }

    public WatermarkStrategy getIncrementalSnapshotWatermarkingStrategy() {
        return this.incrementalSnapshotWatermarkingStrategy;
    }

    public EventConvertingFailureHandlingMode getEventConvertingFailureHandlingMode() {
        return this.eventConvertingFailureHandlingMode;
    }

    public boolean isSchemaChangesHistoryEnabled() {
        return false;
    }

    public boolean isSchemaCommentsHistoryEnabled() {
        return false;
    }

    public boolean validate(Iterable<Field> fields, Field.ValidationOutput problems) {
        return this.config.validate(fields, problems);
    }

    public boolean validateAndRecord(Iterable<Field> fields, Consumer<String> problems) {
        return this.config.validateAndRecord(fields, problems);
    }

    private static int validateMaxQueueSize(Configuration config, Field field, Field.ValidationOutput problems) {
        int maxQueueSize = config.getInteger(field);
        int maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
        int count = 0;
        if (maxQueueSize <= 0) {
            problems.accept(field, maxQueueSize, "A positive queue size is required");
            ++count;
        }
        if (maxQueueSize <= maxBatchSize) {
            problems.accept(field, maxQueueSize, "Must be larger than the maximum batch size");
            ++count;
        }
        return count;
    }

    protected static int validateSkippedOperation(Configuration config, Field field, Field.ValidationOutput problems) {
        String operations = config.getString(field);
        if (operations == null || "none".equals(operations)) {
            return 0;
        }
        boolean noneSpecified = false;
        boolean operationsSpecified = false;
        block12: for (String operation : operations.split(",")) {
            switch (operation.trim()) {
                case "none": {
                    noneSpecified = true;
                    continue block12;
                }
                case "r": 
                case "c": 
                case "u": 
                case "d": 
                case "t": {
                    operationsSpecified = true;
                    continue block12;
                }
                default: {
                    problems.accept(field, operation, "Invalid operation");
                    return 1;
                }
            }
        }
        if (noneSpecified && operationsSpecified) {
            problems.accept(field, "none", "'none' cannot be specified with other skipped operation types");
            return 1;
        }
        return 0;
    }

    private static boolean isUsingAvroConverter(Configuration config) {
        String keyConverter = config.getString("key.converter");
        String valueConverter = config.getString("value.converter");
        return CONFLUENT_AVRO_CONVERTER.equals(keyConverter) || CONFLUENT_AVRO_CONVERTER.equals(valueConverter) || APICURIO_AVRO_CONVERTER.equals(keyConverter) || APICURIO_AVRO_CONVERTER.equals(valueConverter);
    }

    public String snapshotLockingModeCustomName() {
        return this.snapshotLockingModeCustomName;
    }

    public boolean snapshotModeConfigurationBasedSnapshotData() {
        return this.snapshotModeConfigurationBasedSnapshotData;
    }

    public boolean snapshotModeConfigurationBasedSnapshotSchema() {
        return this.snapshotModeConfigurationBasedSnapshotSchema;
    }

    public boolean snapshotModeConfigurationBasedStream() {
        return this.snapshotModeConfigurationBasedStream;
    }

    public boolean snapshotModeConfigurationBasedSnapshotOnSchemaError() {
        return this.snapshotModeConfigurationBasedSnapshotOnSchemaError;
    }

    public boolean snapshotModeConfigurationBasedSnapshotOnDataError() {
        return this.snapshotModeConfigurationBasedSnapshotOnDataError;
    }

    public boolean isLogPositionCheckEnabled() {
        return this.isLogPositionCheckEnabled;
    }

    public boolean isAdvancedMetricsEnabled() {
        return this.isAdvancedMetricsEnabled;
    }

    public Duration getConnectionValidationTimeout() {
        return Duration.ofMillis(this.config.getLong(CONNECTION_VALIDATION_TIMEOUT_MS));
    }

    public EnumeratedValue snapshotQueryMode() {
        return this.snapshotQueryMode;
    }

    public String snapshotQueryModeCustomName() {
        return this.snapshotQueryModeCustomName;
    }

    protected abstract SourceInfoStructMaker<?> getSourceInfoStructMaker(Version var1);

    public BinaryHandlingMode binaryHandlingMode() {
        return this.binaryHandlingMode;
    }

    public SchemaNameAdjuster schemaNameAdjuster() {
        return this.schemaNameAdjustmentMode.createAdjuster();
    }

    public SchemaNameAdjuster fieldNameAdjuster() {
        if (this.fieldNameAdjustmentMode == FieldNameAdjustmentMode.NONE && CommonConnectorConfig.isUsingAvroConverter(this.config)) {
            return FieldNameAdjustmentMode.AVRO.createAdjuster();
        }
        return this.fieldNameAdjustmentMode.createAdjuster();
    }

    public String getSignalingDataCollectionId() {
        return this.signalingDataCollection;
    }

    public Duration getSignalPollInterval() {
        return this.signalPollInterval;
    }

    public List<String> getEnabledChannels() {
        return this.signalEnabledChannels;
    }

    public Optional<String[]> parseSignallingMessage(Struct value, String fieldName) {
        Struct event = value.getStruct(fieldName);
        if (event == null) {
            LOGGER.warn("Field {} part of signal '{}' is missing", (Object)fieldName, (Object)value);
            return Optional.empty();
        }
        List fields = event.schema().fields();
        if (fields.size() != 3) {
            LOGGER.warn("The signal event '{}' should have 3 fields but has {}", (Object)event, (Object)fields.size());
            return Optional.empty();
        }
        return Optional.of(new String[]{event.getString(((org.apache.kafka.connect.data.Field)fields.get(0)).name()), event.getString(((org.apache.kafka.connect.data.Field)fields.get(1)).name()), event.getString(((org.apache.kafka.connect.data.Field)fields.get(2)).name())});
    }

    public boolean isSignalDataCollection(DataCollectionId dataCollectionId) {
        return this.signalingDataCollectionId != null && this.signalingDataCollectionId.equals(dataCollectionId);
    }

    public Optional<String> customRetriableException() {
        return Optional.ofNullable(this.config.getString(CUSTOM_RETRIABLE_EXCEPTION));
    }

    public int getMaxRetriesOnError() {
        return this.maxRetriesOnError;
    }

    public String getTaskId() {
        return this.taskId;
    }

    public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, SchemaNameAdjuster schemaNameAdjuster, HeartbeatConnectionProvider connectionProvider, HeartbeatErrorHandler errorHandler) {
        if (this.getHeartbeatInterval().isZero()) {
            return Heartbeat.DEFAULT_NOOP_HEARTBEAT;
        }
        return new HeartbeatImpl(this.getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), this.getLogicalName(), schemaNameAdjuster);
    }

    public static int validateTopicName(Configuration config, Field field, Field.ValidationOutput problems) {
        String name = config.getString(field);
        if (name != null && !TOPIC_NAME_PATTERN.asPredicate().test(name)) {
            problems.accept(field, name, name + " has invalid format (only the underscore, hyphen, dot and alphanumeric characters are allowed)");
            return 1;
        }
        return 0;
    }

    public <T extends AbstractSourceInfo> SourceInfoStructMaker<T> getSourceInfoStructMaker(Field sourceInfoStructMakerField, String connector, String version, CommonConnectorConfig connectorConfig) {
        SourceInfoStructMaker sourceInfoStructMaker = this.config.getInstance(sourceInfoStructMakerField, SourceInfoStructMaker.class);
        if (sourceInfoStructMaker == null) {
            throw new DebeziumException("Unable to instantiate the source info struct maker class " + this.config.getString(sourceInfoStructMakerField));
        }
        LOGGER.info("Loading the custom source info struct maker plugin: {}", (Object)sourceInfoStructMaker.getClass().getName());
        sourceInfoStructMaker.init(connector, version, connectorConfig);
        return sourceInfoStructMaker;
    }

    public TransactionMetadataFactory getTransactionMetadataFactory(Field transactionMetadataFactoryField) {
        TransactionMetadataFactory factory = this.config.getInstance(transactionMetadataFactoryField, TransactionMetadataFactory.class, this.config);
        if (factory == null) {
            throw new DebeziumException("Unable to instantiate the transaction struct maker class " + String.valueOf(TRANSACTION_METADATA_FACTORY));
        }
        return factory;
    }

    public static enum SchemaNameAdjustmentMode implements EnumeratedValue
    {
        NONE("none"),
        AVRO("avro"),
        AVRO_UNICODE("avro_unicode");

        private final String value;

        private SchemaNameAdjustmentMode(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public SchemaNameAdjuster createAdjuster() {
            switch (this.ordinal()) {
                case 1: {
                    return SchemaNameAdjuster.AVRO;
                }
                case 2: {
                    return SchemaNameAdjuster.AVRO_UNICODE;
                }
            }
            return SchemaNameAdjuster.NO_OP;
        }

        public static SchemaNameAdjustmentMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (SchemaNameAdjustmentMode option : SchemaNameAdjustmentMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }
    }

    public static enum FieldNameAdjustmentMode implements EnumeratedValue
    {
        NONE("none"),
        AVRO("avro"),
        AVRO_UNICODE("avro_unicode");

        private final String value;

        private FieldNameAdjustmentMode(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public SchemaNameAdjuster createAdjuster() {
            switch (this.ordinal()) {
                case 1: {
                    return SchemaNameAdjuster.AVRO_FIELD_NAMER;
                }
                case 2: {
                    return SchemaNameAdjuster.AVRO_UNICODE_FIELD_NAMER;
                }
            }
            return SchemaNameAdjuster.NO_OP;
        }

        public static FieldNameAdjustmentMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (FieldNameAdjustmentMode option : FieldNameAdjustmentMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }
    }

    public static enum EventConvertingFailureHandlingMode implements EnumeratedValue
    {
        SKIP("skip"),
        WARN("warn"),
        FAIL("fail");

        private final String value;

        private EventConvertingFailureHandlingMode(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static EventConvertingFailureHandlingMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (EventConvertingFailureHandlingMode option : EventConvertingFailureHandlingMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }
    }

    public static enum Version implements EnumeratedValue
    {
        V1("v1"),
        V2("v2");

        private final String value;

        private Version(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static Version parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (Version option : Version.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }

        public static Version parse(String value, String defaultValue) {
            Version mode = Version.parse(value);
            if (mode == null && defaultValue != null) {
                mode = Version.parse(defaultValue);
            }
            return mode;
        }
    }

    public static enum EventProcessingFailureHandlingMode implements EnumeratedValue
    {
        SKIP("skip"),
        WARN("warn"),
        FAIL("fail"),
        IGNORE("ignore");

        private final String value;

        private EventProcessingFailureHandlingMode(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static EventProcessingFailureHandlingMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (EventProcessingFailureHandlingMode option : EventProcessingFailureHandlingMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }
    }

    public static enum BinaryHandlingMode implements EnumeratedValue
    {
        BYTES("bytes", SchemaBuilder::bytes),
        BASE64("base64", SchemaBuilder::string),
        BASE64_URL_SAFE("base64-url-safe", SchemaBuilder::string),
        HEX("hex", SchemaBuilder::string);

        private final String value;
        private final Supplier<SchemaBuilder> schema;

        private BinaryHandlingMode(String value, Supplier<SchemaBuilder> schema) {
            this.value = value;
            this.schema = schema;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public SchemaBuilder getSchema() {
            return this.schema.get();
        }

        public static BinaryHandlingMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (BinaryHandlingMode option : BinaryHandlingMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }

        public static BinaryHandlingMode parse(String value, String defaultValue) {
            BinaryHandlingMode mode = BinaryHandlingMode.parse(value);
            if (mode == null && defaultValue != null) {
                mode = BinaryHandlingMode.parse(defaultValue);
            }
            return mode;
        }
    }

    public static enum WatermarkStrategy implements EnumeratedValue
    {
        INSERT_INSERT("INSERT_INSERT"),
        INSERT_DELETE("INSERT_DELETE");

        private final String value;

        private WatermarkStrategy(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static WatermarkStrategy parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (WatermarkStrategy option : WatermarkStrategy.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }

        public static WatermarkStrategy parse(String value, String defaultValue) {
            WatermarkStrategy mode = WatermarkStrategy.parse(value);
            if (mode == null && defaultValue != null) {
                mode = WatermarkStrategy.parse(defaultValue);
            }
            return mode;
        }
    }

    public static enum SnapshotQueryMode implements EnumeratedValue
    {
        SELECT_ALL("select_all"),
        CUSTOM("custom");

        private final String value;

        private SnapshotQueryMode(String value) {
            this.value = value;
        }

        @Override
        public String getValue() {
            return this.value;
        }

        public static SnapshotQueryMode parse(String value) {
            if (value == null) {
                return null;
            }
            value = value.trim();
            for (SnapshotQueryMode option : SnapshotQueryMode.values()) {
                if (!option.getValue().equalsIgnoreCase(value)) continue;
                return option;
            }
            return null;
        }

        public static SnapshotQueryMode parse(String value, String defaultValue) {
            SnapshotQueryMode mode = SnapshotQueryMode.parse(value);
            if (mode == null && defaultValue != null) {
                mode = SnapshotQueryMode.parse(defaultValue);
            }
            return mode;
        }
    }
}

