/*
 * Decompiled with CFR 0.152.
 */
package org.apache.calcite.adapter.kafka;

import java.lang.reflect.InvocationTargetException;
import java.util.Locale;
import java.util.Map;
import org.apache.calcite.adapter.kafka.KafkaRowConverter;
import org.apache.calcite.adapter.kafka.KafkaRowConverterImpl;
import org.apache.calcite.adapter.kafka.KafkaStreamTable;
import org.apache.calcite.adapter.kafka.KafkaTableOptions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.TableFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;

public class KafkaTableFactory
implements TableFactory<KafkaStreamTable> {
    public KafkaStreamTable create(SchemaPlus schema, String name, Map<String, Object> operand, @Nullable RelDataType rowType) {
        KafkaRowConverter<byte[], byte[]> rowConverter;
        KafkaTableOptions tableOptionBuilder = new KafkaTableOptions();
        tableOptionBuilder.setBootstrapServers(operand.getOrDefault("bootstrap.servers", null));
        tableOptionBuilder.setTopicName(operand.getOrDefault("topic.name", null));
        if (operand.containsKey("row.converter")) {
            String rowConverterClass = (String)operand.get("row.converter");
            try {
                Class<?> klass = Class.forName(rowConverterClass);
                rowConverter = (KafkaRowConverter)klass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                String details = String.format(Locale.ROOT, "Failed to create table '%s' with configuration:\n'%s'\nKafkaRowConverter '%s' is invalid", name, operand, rowConverterClass);
                throw new RuntimeException(details, e);
            }
        } else {
            rowConverter = new KafkaRowConverterImpl();
        }
        tableOptionBuilder.setRowConverter(rowConverter);
        if (operand.containsKey("consumer.params")) {
            tableOptionBuilder.setConsumerParams((Map)operand.get("consumer.params"));
        }
        if (operand.containsKey("consumer.cust")) {
            String custConsumerClass = (String)operand.get("consumer.cust");
            try {
                tableOptionBuilder.setConsumer((Consumer)Class.forName(custConsumerClass).getConstructor(OffsetResetStrategy.class).newInstance(OffsetResetStrategy.NONE));
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
                String details = String.format(Locale.ROOT, "Fail to create table '%s' with configuration:\n'%s'\nKafkaCustConsumer '%s' is invalid", name, operand, custConsumerClass);
                throw new RuntimeException(details, e);
            }
        }
        return new KafkaStreamTable(tableOptionBuilder);
    }
}

