/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.jet.kafka.impl.KafkaTestSupport;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.file.AvroResolver;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlService;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;

public abstract class KafkaSqlTestSupport
extends SqlTestSupport {
    protected static KafkaTestSupport kafkaTestSupport;
    protected static SqlService sqlService;

    @BeforeClass
    public static void setup() throws Exception {
        KafkaSqlTestSupport.setup(1, null);
    }

    protected static void setup(int memberCount, Config config) throws Exception {
        KafkaSqlTestSupport.initialize((int)memberCount, (Config)config);
        KafkaSqlTestSupport.createKafkaCluster();
    }

    protected static void setupWithClient(int memberCount, Config config, ClientConfig clientConfig) throws Exception {
        KafkaSqlTestSupport.initializeWithClient((int)memberCount, (Config)config, (ClientConfig)clientConfig);
        KafkaSqlTestSupport.createKafkaCluster();
    }

    private static void createKafkaCluster() throws Exception {
        sqlService = KafkaSqlTestSupport.instance().getSql();
        kafkaTestSupport = KafkaTestSupport.create();
        kafkaTestSupport.createKafkaCluster();
    }

    protected static void createSchemaRegistry() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("listeners", "http://0.0.0.0:0");
        properties.setProperty("kafkastore.bootstrap.servers", kafkaTestSupport.getBrokerConnectionString());
        properties.setProperty("kafkastore.timeout.ms", "5000");
        SchemaRegistryConfig config = new SchemaRegistryConfig(properties);
        kafkaTestSupport.createSchemaRegistry(config);
    }

    @AfterClass
    public static void teardown() throws Exception {
        if (kafkaTestSupport != null) {
            kafkaTestSupport.shutdownSchemaRegistry();
            kafkaTestSupport.shutdownKafkaCluster();
        }
    }

    protected static String createRandomTopic(int partitionCount) {
        String topicName = "t_" + KafkaSqlTestSupport.randomString().replace('-', '_');
        kafkaTestSupport.createTopic(topicName, partitionCount);
        return topicName;
    }

    public static GenericRecord createRecord(Schema schema, SqlTestSupport.Type.Field[] fields, Object[] values) {
        GenericRecordBuilder record = new GenericRecordBuilder(schema);
        for (int i = 0; i < fields.length; ++i) {
            Schema.Field field = schema.getField(fields[i].name);
            if (values[i] == null) {
                record.set(field, null);
                continue;
            }
            Schema fieldSchema = AvroResolver.unwrapNullableType((Schema)field.schema());
            record.set(field, fieldSchema.getType() == Schema.Type.RECORD ? KafkaSqlTestSupport.createRecord(fieldSchema, fields[i].type.fields, (Object[])values[i]) : values[i]);
        }
        return record.build();
    }

    public static GenericRecord createRecord(Schema schema, Object ... values) {
        return KafkaSqlTestSupport.createRecord(schema, new SqlTestSupport.Type((Schema)schema).fields, values);
    }

    protected static void createSqlKafkaDataConnection(String dlName, boolean isShared, String options) {
        try (SqlResult result = KafkaSqlTestSupport.instance().getSql().execute("CREATE DATA CONNECTION " + dlName + " TYPE Kafka " + (isShared ? " SHARED " : " NOT SHARED ") + options, new Object[0]);){
            Assertions.assertThat((long)result.updateCount()).isZero();
        }
    }

    protected static void createSqlKafkaDataConnection(String dlName, boolean isShared) {
        KafkaSqlTestSupport.createSqlKafkaDataConnection(dlName, isShared, isShared ? KafkaSqlTestSupport.defaultSharedOptions() : KafkaSqlTestSupport.defaultNotSharedOptions());
    }

    protected static void createKafkaMappingUsingDataConnection(String name, String dataConnection, String sqlOptions) {
        try (SqlResult result = KafkaSqlTestSupport.instance().getSql().execute("CREATE OR REPLACE MAPPING " + name + " DATA CONNECTION " + KafkaSqlTestSupport.quoteName(dataConnection) + "\n" + sqlOptions, new Object[0]);){
            Assertions.assertThat((long)result.updateCount()).isZero();
        }
    }

    protected static String defaultSharedOptions() {
        return String.format("OPTIONS ( 'bootstrap.servers' = '%s', 'key.serializer' = '%s', 'key.deserializer' = '%s', 'value.serializer' = '%s', 'value.deserializer' = '%s', 'auto.offset.reset' = 'earliest') ", kafkaTestSupport.getBrokerConnectionString(), IntegerSerializer.class.getCanonicalName(), IntegerDeserializer.class.getCanonicalName(), StringSerializer.class.getCanonicalName(), StringDeserializer.class.getCanonicalName());
    }

    protected static String defaultNotSharedOptions() {
        return String.format("OPTIONS ( 'bootstrap.servers' = '%s', 'auto.offset.reset' = 'earliest') ", kafkaTestSupport.getBrokerConnectionString());
    }

    protected static String constructMappingOptions(String keyFormat, String valueFormat) {
        return "OPTIONS ('keyFormat'='" + keyFormat + "', 'valueFormat'='" + valueFormat + "')";
    }
}

