/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.util.Arrays;
import java.util.Collections;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={NightlyTest.class, ParallelJVMTest.class})
public class KafkaDataConnectionIntegrationTest
extends KafkaSqlTestSupport {
    private static final int PARTITION_COUNT = 1;

    @Test
    public void when_createNonSharedProducer_then_success() {
        String dlName = KafkaDataConnectionIntegrationTest.randomName();
        String name1 = KafkaDataConnectionIntegrationTest.createRandomTopic(1);
        String name2 = KafkaDataConnectionIntegrationTest.createRandomTopic(1);
        KafkaDataConnectionIntegrationTest.createSqlKafkaDataConnection(dlName, false);
        KafkaDataConnectionIntegrationTest.createKafkaMappingUsingDataConnection(name1, dlName, KafkaDataConnectionIntegrationTest.constructMappingOptions("int", "varchar"));
        KafkaDataConnectionIntegrationTest.createKafkaMappingUsingDataConnection(name2, dlName, KafkaDataConnectionIntegrationTest.constructMappingOptions("varchar", "int"));
        sqlService.execute("INSERT INTO " + name1 + " VALUES(0, 'value-0'), (1, 'value-1'), (2, 'value-2'), (10, 'value-10')", new Object[0]);
        KafkaDataConnectionIntegrationTest.assertTipOfStream("SELECT window_start, window_end, MAX(__key) FROM TABLE(TUMBLE(  (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE " + name1 + ", DESCRIPTOR(__key), 2)))  , DESCRIPTOR(__key)  , 2)) GROUP BY window_start, window_end", Arrays.asList(new SqlTestSupport.Row(0, 2, 1), new SqlTestSupport.Row(2, 4, 2)));
        sqlService.execute("INSERT INTO " + name2 + " VALUES('value-0', 0), ('value-1', 1), ('value-2', 2), ('value-10', 10)", new Object[0]);
        KafkaDataConnectionIntegrationTest.assertTipOfStream("SELECT window_start, window_end, COUNT(__key) FROM TABLE(TUMBLE(  (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE " + name2 + ", DESCRIPTOR(this), 2)))  , DESCRIPTOR(this)  , 2)) GROUP BY window_start, window_end", Arrays.asList(new SqlTestSupport.Row(0, 2, 2L), new SqlTestSupport.Row(2, 4, 1L)));
    }

    @Test
    public void when_createSharedProducer_then_success() {
        String dcName = KafkaDataConnectionIntegrationTest.randomName();
        String name = KafkaDataConnectionIntegrationTest.createRandomTopic(1);
        KafkaDataConnectionIntegrationTest.createSqlKafkaDataConnection(dcName, true);
        KafkaDataConnectionIntegrationTest.createKafkaMappingUsingDataConnection(name, dcName, KafkaDataConnectionIntegrationTest.constructMappingOptions("int", "varchar"));
        try (SqlResult r = sqlService.execute("INSERT INTO " + name + " VALUES (0, 'value-0')", new Object[0]);){
            Assertions.assertThat((long)r.updateCount()).isZero();
        }
    }

    @Test
    public void when_createSharedProducerWithOverriddenProperty_then_success() {
        String dlName = KafkaDataConnectionIntegrationTest.randomName();
        String name = KafkaDataConnectionIntegrationTest.createRandomTopic(1);
        KafkaDataConnectionIntegrationTest.createSqlKafkaDataConnection(dlName, true);
        KafkaDataConnectionIntegrationTest.createKafkaMappingUsingDataConnection(name, dlName, "OPTIONS ('keyFormat'='int', 'valueFormat'='varchar', 'auto.offset.reset' = 'latest')");
        AssertionsForClassTypes.assertThatThrownBy(() -> sqlService.execute("INSERT INTO " + name + " VALUES(0, 'value-0')", new Object[0])).hasRootCauseInstanceOf(HazelcastException.class).hasMessageContaining("For shared Kafka producer, please provide all serialization options");
    }

    @Test
    public void when_creatingDataConnectionAndMapping_then_serdeDeterminesAutomatically() {
        String dlName = KafkaDataConnectionIntegrationTest.randomName();
        String name = KafkaDataConnectionIntegrationTest.createRandomTopic(1);
        KafkaDataConnectionIntegrationTest.createSqlKafkaDataConnection(dlName, false);
        KafkaDataConnectionIntegrationTest.createKafkaMappingUsingDataConnection(name, dlName, KafkaDataConnectionIntegrationTest.constructMappingOptions("int", "varchar"));
        try (SqlResult r = sqlService.execute("INSERT INTO " + name + " VALUES (0, 'value-0')", new Object[0]);){
            Assertions.assertThat((long)r.updateCount()).isZero();
        }
        KafkaDataConnectionIntegrationTest.assertTipOfStream("SELECT * FROM " + name, Collections.singletonList(new SqlTestSupport.Row(0, "value-0")));
    }
}

