/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.table;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

@Internal
class KafkaConnectorOptionsUtil {
    private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT = ConfigOptions.key((String)"schema-registry.subject").stringType().noDefaultValue();
    public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";
    protected static final String AVRO_CONFLUENT = "avro-confluent";
    protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent";
    private static final List<String> SCHEMA_REGISTRY_FORMATS = Arrays.asList("avro-confluent", "debezium-avro-confluent");

    public static void validateTableSourceOptions(ReadableConfig tableOptions) {
        KafkaConnectorOptionsUtil.validateTopic(tableOptions);
        KafkaConnectorOptionsUtil.validateScanStartupMode(tableOptions);
        KafkaConnectorOptionsUtil.validateScanBoundedMode(tableOptions);
    }

    public static void validateTableSinkOptions(ReadableConfig tableOptions) {
        KafkaConnectorOptionsUtil.validateTopic(tableOptions);
        KafkaConnectorOptionsUtil.validateSinkPartitioner(tableOptions);
    }

    public static void validateTopic(ReadableConfig tableOptions) {
        Optional topic = tableOptions.getOptional(KafkaConnectorOptions.TOPIC);
        Optional pattern = tableOptions.getOptional(KafkaConnectorOptions.TOPIC_PATTERN);
        if (topic.isPresent() && pattern.isPresent()) {
            throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
        }
        if (!topic.isPresent() && !pattern.isPresent()) {
            throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
        }
    }

    private static void validateScanStartupMode(ReadableConfig tableOptions) {
        tableOptions.getOptional(KafkaConnectorOptions.SCAN_STARTUP_MODE).ifPresent(mode -> {
            switch (mode) {
                case TIMESTAMP: {
                    if (tableOptions.getOptional(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) break;
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", new Object[]{KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS.key(), KafkaConnectorOptions.ScanStartupMode.TIMESTAMP}));
                }
                case SPECIFIC_OFFSETS: {
                    if (!tableOptions.getOptional(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                        throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", new Object[]{KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key(), KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS}));
                    }
                    if (!KafkaConnectorOptionsUtil.isSingleTopic(tableOptions)) {
                        throw new ValidationException("Currently Kafka source only supports specific offset for single topic.");
                    }
                    String specificOffsets = (String)tableOptions.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS);
                    KafkaConnectorOptionsUtil.parseSpecificOffsets(specificOffsets, KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key());
                }
            }
        });
    }

    static void validateScanBoundedMode(ReadableConfig tableOptions) {
        tableOptions.getOptional(KafkaConnectorOptions.SCAN_BOUNDED_MODE).ifPresent(mode -> {
            switch (mode) {
                case TIMESTAMP: {
                    if (tableOptions.getOptional(KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS).isPresent()) break;
                    throw new ValidationException(String.format("'%s' is required in '%s' bounded mode but missing.", new Object[]{KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS.key(), KafkaConnectorOptions.ScanBoundedMode.TIMESTAMP}));
                }
                case SPECIFIC_OFFSETS: {
                    if (!tableOptions.getOptional(KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS).isPresent()) {
                        throw new ValidationException(String.format("'%s' is required in '%s' bounded mode but missing.", new Object[]{KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS.key(), KafkaConnectorOptions.ScanBoundedMode.SPECIFIC_OFFSETS}));
                    }
                    if (!KafkaConnectorOptionsUtil.isSingleTopic(tableOptions)) {
                        throw new ValidationException("Currently Kafka source only supports specific offset for single topic.");
                    }
                    String specificOffsets = (String)tableOptions.get(KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS);
                    KafkaConnectorOptionsUtil.parseSpecificOffsets(specificOffsets, KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
                }
            }
        });
    }

    private static void validateSinkPartitioner(ReadableConfig tableOptions) {
        tableOptions.getOptional(KafkaConnectorOptions.SINK_PARTITIONER).ifPresent(partitioner -> {
            if (partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) && tableOptions.getOptional(KafkaConnectorOptions.KEY_FIELDS).isPresent()) {
                throw new ValidationException("Currently 'round-robin' partitioner only works when option 'key.fields' is not specified.");
            }
            if (partitioner.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", KafkaConnectorOptions.SINK_PARTITIONER.key()));
            }
        });
    }

    public static List<String> getTopics(ReadableConfig tableOptions) {
        return tableOptions.getOptional(KafkaConnectorOptions.TOPIC).orElse(null);
    }

    public static Pattern getTopicPattern(ReadableConfig tableOptions) {
        return tableOptions.getOptional(KafkaConnectorOptions.TOPIC_PATTERN).map(Pattern::compile).orElse(null);
    }

    private static boolean isSingleTopic(ReadableConfig tableOptions) {
        return tableOptions.getOptional(KafkaConnectorOptions.TOPIC).map(t -> t.size() == 1).orElse(false);
    }

    public static StartupOptions getStartupOptions(ReadableConfig tableOptions) {
        HashMap<KafkaTopicPartition, Long> specificOffsets = new HashMap<KafkaTopicPartition, Long>();
        StartupMode startupMode = tableOptions.getOptional(KafkaConnectorOptions.SCAN_STARTUP_MODE).map(KafkaConnectorOptionsUtil::fromOption).orElse(StartupMode.GROUP_OFFSETS);
        if (startupMode == StartupMode.SPECIFIC_OFFSETS) {
            KafkaConnectorOptionsUtil.buildSpecificOffsets(tableOptions, (String)((List)tableOptions.get(KafkaConnectorOptions.TOPIC)).get(0), specificOffsets);
        }
        StartupOptions options = new StartupOptions();
        options.startupMode = startupMode;
        options.specificOffsets = specificOffsets;
        if (startupMode == StartupMode.TIMESTAMP) {
            options.startupTimestampMillis = (Long)tableOptions.get(KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS);
        }
        return options;
    }

    public static BoundedOptions getBoundedOptions(ReadableConfig tableOptions) {
        HashMap<KafkaTopicPartition, Long> specificOffsets = new HashMap<KafkaTopicPartition, Long>();
        BoundedMode boundedMode = KafkaConnectorOptionsUtil.fromOption((KafkaConnectorOptions.ScanBoundedMode)((Object)tableOptions.get(KafkaConnectorOptions.SCAN_BOUNDED_MODE)));
        if (boundedMode == BoundedMode.SPECIFIC_OFFSETS) {
            KafkaConnectorOptionsUtil.buildBoundedOffsets(tableOptions, (String)((List)tableOptions.get(KafkaConnectorOptions.TOPIC)).get(0), specificOffsets);
        }
        BoundedOptions options = new BoundedOptions();
        options.boundedMode = boundedMode;
        options.specificOffsets = specificOffsets;
        if (boundedMode == BoundedMode.TIMESTAMP) {
            options.boundedTimestampMillis = (Long)tableOptions.get(KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS);
        }
        return options;
    }

    private static void buildSpecificOffsets(ReadableConfig tableOptions, String topic, Map<KafkaTopicPartition, Long> specificOffsets) {
        String specificOffsetsStrOpt = (String)tableOptions.get(KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS);
        Map<Integer, Long> offsetMap = KafkaConnectorOptionsUtil.parseSpecificOffsets(specificOffsetsStrOpt, KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS.key());
        offsetMap.forEach((partition, offset) -> {
            KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, (int)partition);
            specificOffsets.put(topicPartition, (Long)offset);
        });
    }

    public static void buildBoundedOffsets(ReadableConfig tableOptions, String topic, Map<KafkaTopicPartition, Long> specificOffsets) {
        String specificOffsetsEndOpt = (String)tableOptions.get(KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS);
        Map<Integer, Long> offsetMap = KafkaConnectorOptionsUtil.parseSpecificOffsets(specificOffsetsEndOpt, KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
        offsetMap.forEach((partition, offset) -> {
            KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, (int)partition);
            specificOffsets.put(topicPartition, (Long)offset);
        });
    }

    private static StartupMode fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
        switch (scanStartupMode) {
            case EARLIEST_OFFSET: {
                return StartupMode.EARLIEST;
            }
            case LATEST_OFFSET: {
                return StartupMode.LATEST;
            }
            case GROUP_OFFSETS: {
                return StartupMode.GROUP_OFFSETS;
            }
            case SPECIFIC_OFFSETS: {
                return StartupMode.SPECIFIC_OFFSETS;
            }
            case TIMESTAMP: {
                return StartupMode.TIMESTAMP;
            }
        }
        throw new TableException("Unsupported startup mode. Validator should have checked that.");
    }

    private static BoundedMode fromOption(KafkaConnectorOptions.ScanBoundedMode scanBoundedMode) {
        switch (scanBoundedMode) {
            case UNBOUNDED: {
                return BoundedMode.UNBOUNDED;
            }
            case LATEST_OFFSET: {
                return BoundedMode.LATEST;
            }
            case GROUP_OFFSETS: {
                return BoundedMode.GROUP_OFFSETS;
            }
            case TIMESTAMP: {
                return BoundedMode.TIMESTAMP;
            }
            case SPECIFIC_OFFSETS: {
                return BoundedMode.SPECIFIC_OFFSETS;
            }
        }
        throw new TableException("Unsupported bounded mode. Validator should have checked that.");
    }

    public static Properties getKafkaProperties(Map<String, String> tableOptions) {
        Properties kafkaProperties = new Properties();
        if (KafkaConnectorOptionsUtil.hasKafkaClientProperties(tableOptions)) {
            tableOptions.keySet().stream().filter(key -> key.startsWith(PROPERTIES_PREFIX)).forEach(key -> {
                String value = (String)tableOptions.get(key);
                String subKey = key.substring(PROPERTIES_PREFIX.length());
                kafkaProperties.put(subKey, value);
            });
        }
        return kafkaProperties;
    }

    public static Optional<KafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig tableOptions, ClassLoader classLoader) {
        return tableOptions.getOptional(KafkaConnectorOptions.SINK_PARTITIONER).flatMap(partitioner -> {
            switch (partitioner) {
                case "fixed": {
                    return Optional.of(new FlinkFixedPartitioner());
                }
                case "default": 
                case "round-robin": {
                    return Optional.empty();
                }
            }
            return Optional.of(KafkaConnectorOptionsUtil.initializePartitioner(partitioner, classLoader));
        });
    }

    public static Map<Integer, Long> parseSpecificOffsets(String specificOffsetsStr, String optionKey) {
        HashMap<Integer, Long> offsetMap = new HashMap<Integer, Long>();
        String[] pairs = specificOffsetsStr.split(";");
        String validationExceptionMessage = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", optionKey, specificOffsetsStr);
        if (pairs.length == 0) {
            throw new ValidationException(validationExceptionMessage);
        }
        for (String pair : pairs) {
            if (null == pair || pair.length() == 0 || !pair.contains(",")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String[] kv = pair.split(",");
            if (kv.length != 2 || !kv[0].startsWith("partition:") || !kv[1].startsWith("offset:")) {
                throw new ValidationException(validationExceptionMessage);
            }
            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
            try {
                Integer partition = Integer.valueOf(partitionValue);
                Long offset = Long.valueOf(offsetValue);
                offsetMap.put(partition, offset);
            }
            catch (NumberFormatException e) {
                throw new ValidationException(validationExceptionMessage, (Throwable)e);
            }
        }
        return offsetMap;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
        return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
    }

    private static <T> KafkaPartitioner<T> initializePartitioner(String name, ClassLoader classLoader) {
        try {
            Class<?> clazz = Class.forName(name, true, classLoader);
            if (!KafkaPartitioner.class.isAssignableFrom(clazz)) {
                throw new ValidationException(String.format("Sink partitioner class '%s' should implement the required class %s", name, KafkaPartitioner.class.getName()));
            }
            KafkaPartitioner kafkaPartitioner = (KafkaPartitioner)InstantiationUtil.instantiate((String)name, KafkaPartitioner.class, (ClassLoader)classLoader);
            return kafkaPartitioner;
        }
        catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate partitioner class '%s'", name), e);
        }
    }

    public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((boolean)physicalType.is(LogicalTypeRoot.ROW), (Object)"Row data type expected.");
        Optional optionalKeyFormat = options.getOptional(KafkaConnectorOptions.KEY_FORMAT);
        Optional optionalKeyFields = options.getOptional(KafkaConnectorOptions.KEY_FIELDS);
        if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
            throw new ValidationException(String.format("The option '%s' can only be declared if a key format is defined using '%s'.", KafkaConnectorOptions.KEY_FIELDS.key(), KafkaConnectorOptions.KEY_FORMAT.key()));
        }
        if (optionalKeyFormat.isPresent() && (!optionalKeyFields.isPresent() || ((List)optionalKeyFields.get()).size() == 0)) {
            throw new ValidationException(String.format("A key format '%s' requires the declaration of one or more of key fields using '%s'.", KafkaConnectorOptions.KEY_FORMAT.key(), KafkaConnectorOptions.KEY_FIELDS.key()));
        }
        if (!optionalKeyFormat.isPresent()) {
            return new int[0];
        }
        String keyPrefix = options.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse("");
        List keyFields = (List)optionalKeyFields.get();
        List physicalFields = LogicalTypeChecks.getFieldNames((LogicalType)physicalType);
        return keyFields.stream().mapToInt(keyField -> {
            int pos = physicalFields.indexOf(keyField);
            if (pos < 0) {
                throw new ValidationException(String.format("Could not find the field '%s' in the table schema for usage in the key format. A key field must be a regular, physical column. The following columns can be selected in the '%s' option:\n%s", keyField, KafkaConnectorOptions.KEY_FIELDS.key(), physicalFields));
            }
            if (!keyField.startsWith(keyPrefix)) {
                throw new ValidationException(String.format("All fields in '%s' must be prefixed with '%s' when option '%s' is set but field '%s' is not prefixed.", KafkaConnectorOptions.KEY_FIELDS.key(), keyPrefix, KafkaConnectorOptions.KEY_FIELDS_PREFIX.key(), keyField));
            }
            return pos;
        }).toArray();
    }

    public static int[] createValueFormatProjection(ReadableConfig options, DataType physicalDataType) {
        LogicalType physicalType = physicalDataType.getLogicalType();
        Preconditions.checkArgument((boolean)physicalType.is(LogicalTypeRoot.ROW), (Object)"Row data type expected.");
        int physicalFieldCount = LogicalTypeChecks.getFieldCount((LogicalType)physicalType);
        IntStream physicalFields = IntStream.range(0, physicalFieldCount);
        String keyPrefix = options.getOptional(KafkaConnectorOptions.KEY_FIELDS_PREFIX).orElse("");
        KafkaConnectorOptions.ValueFieldsStrategy strategy = (KafkaConnectorOptions.ValueFieldsStrategy)((Object)options.get(KafkaConnectorOptions.VALUE_FIELDS_INCLUDE));
        if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) {
            if (keyPrefix.length() > 0) {
                throw new ValidationException(String.format("A key prefix is not allowed when option '%s' is set to '%s'. Set it to '%s' instead to avoid field overlaps.", new Object[]{KafkaConnectorOptions.VALUE_FIELDS_INCLUDE.key(), KafkaConnectorOptions.ValueFieldsStrategy.ALL, KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY}));
            }
            return physicalFields.toArray();
        }
        if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) {
            int[] keyProjection = KafkaConnectorOptionsUtil.createKeyFormatProjection(options, physicalDataType);
            return physicalFields.filter(pos -> IntStream.of(keyProjection).noneMatch(k -> k == pos)).toArray();
        }
        throw new TableException("Unknown value fields strategy:" + (Object)((Object)strategy));
    }

    public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject(DynamicTableFactory.Context context) {
        Map tableOptions = context.getCatalogTable().getOptions();
        Map<String, String> newOptions = KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject(tableOptions);
        if (newOptions.size() > tableOptions.size()) {
            return new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), context.getCatalogTable().copy(newOptions), context.getEnrichmentOptions(), context.getConfiguration(), context.getClassLoader(), context.isTemporary());
        }
        return context;
    }

    private static Map<String, String> autoCompleteSchemaRegistrySubject(Map<String, String> options) {
        Configuration configuration = Configuration.fromMap(options);
        KafkaConnectorOptionsUtil.validateTopic((ReadableConfig)configuration);
        if (configuration.contains(KafkaConnectorOptions.TOPIC) && KafkaConnectorOptionsUtil.isSingleTopic((ReadableConfig)configuration)) {
            Optional valueFormat = configuration.getOptional(KafkaConnectorOptions.VALUE_FORMAT);
            Optional keyFormat = configuration.getOptional(KafkaConnectorOptions.KEY_FORMAT);
            Optional format = configuration.getOptional(FactoryUtil.FORMAT);
            String topic = (String)((List)configuration.get(KafkaConnectorOptions.TOPIC)).get(0);
            if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
                KafkaConnectorOptionsUtil.autoCompleteSubject(configuration, (String)format.get(), topic + "-value");
            } else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
                KafkaConnectorOptionsUtil.autoCompleteSubject(configuration, "value." + (String)valueFormat.get(), topic + "-value");
            }
            if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
                KafkaConnectorOptionsUtil.autoCompleteSubject(configuration, "key." + (String)keyFormat.get(), topic + "-key");
            }
        }
        return configuration.toMap();
    }

    private static void autoCompleteSubject(Configuration configuration, String format, String subject) {
        ConfigOption subjectOption = ConfigOptions.key((String)(format + "." + SCHEMA_REGISTRY_SUBJECT.key())).stringType().noDefaultValue();
        if (!configuration.getOptional(subjectOption).isPresent()) {
            configuration.setString(subjectOption, subject);
        }
    }

    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
        if (tableOptions.get(KafkaConnectorOptions.DELIVERY_GUARANTEE) == DeliveryGuarantee.EXACTLY_ONCE && !tableOptions.getOptional(KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX).isPresent()) {
            throw new ValidationException(KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX.key() + " must be specified when using DeliveryGuarantee.EXACTLY_ONCE.");
        }
    }

    private KafkaConnectorOptionsUtil() {
    }

    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long startupTimestampMillis;
    }

    public static class BoundedOptions {
        public BoundedMode boundedMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long boundedTimestampMillis;
    }
}

