/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.connector.kafka;

import com.hazelcast.core.HazelcastJsonValue;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.connector.kafka.KafkaSqlTestSupport;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={NightlyTest.class, ParallelJVMTest.class})
public class KafkaIntegrationSqlTest
extends KafkaSqlTestSupport {
    @Test
    public void should_read_from_imap_to_kafka() {
        String topicName = "testTopic";
        kafkaTestSupport.createTopic(topicName, 1);
        this.createConfluentKafkaMapping(topicName);
        String mapName = "testMap";
        this.createMap(mapName);
        this.executeSql("INSERT INTO " + mapName + " VALUES\n  (1, 'ABCD', 5.5, 10),\n  (2, 'EFGH', 14, 20);");
        KafkaIntegrationSqlTest.assertMapContents(mapName, Map.of(1, new HazelcastJsonValue("{\"ticker\":\"ABCD\",\"price\":\"5.5\",\"amount\":10}"), 2, new HazelcastJsonValue("{\"ticker\":\"EFGH\",\"price\":\"14\",\"amount\":20}")));
        this.executeSql("CREATE JOB testJob\nAS\nSINK INTO " + topicName + "\nSELECT __key, ticker, price, amount FROM " + mapName);
        KafkaIntegrationSqlTest.assertRowsEventuallyInAnyOrder("SELECT __key,this FROM " + topicName, List.of(new SqlTestSupport.Row(1, Map.of("ticker", "ABCD", "price", "5.5", "amount", 10)), new SqlTestSupport.Row(2, Map.of("ticker", "EFGH", "price", "14", "amount", 20))));
        try (KafkaConsumer consumer = kafkaTestSupport.createConsumer(new String[]{topicName});){
            this.assertTopicContentsEventually((KafkaConsumer<Integer, String>)consumer, Map.of(1, "{\"ticker\":\"ABCD\",\"price\":\"5.5\",\"amount\":10}", 2, "{\"ticker\":\"EFGH\",\"price\":\"14\",\"amount\":20}"));
        }
    }

    private static void assertMapContents(String mapName, Map<Integer, HazelcastJsonValue> expected) {
        HashMap mapContents = new HashMap(KafkaIntegrationSqlTest.instance().getMap(mapName));
        KafkaIntegrationSqlTest.assertTrueEventually(() -> Assertions.assertThat((Map)mapContents).containsAllEntriesOf(expected));
    }

    private void createMap(String mapName) {
        this.executeSql("CREATE OR REPLACE MAPPING " + mapName + " (\n            __key INT,\n            ticker VARCHAR,\n            price DECIMAL,\n            amount BIGINT)\n        TYPE IMap\n        OPTIONS (\n            'keyFormat'='int',\n    'valueFormat'='json-flat'\n);");
    }

    private void createConfluentKafkaMapping(String topicName) {
        String createMappingQuery = String.format("CREATE OR REPLACE MAPPING %s (\n                    __key INT,\n                    ticker VARCHAR,\n                    price DECIMAL,\n                    amount BIGINT)\n        TYPE Kafka\n        OPTIONS (\n            'keyFormat'='int',\n            'valueFormat' = 'json-flat',\n            'bootstrap.servers' = '%s',\n            'auto.offset.reset' = 'earliest',\n            'session.timeout.ms' = '45000',\n            'acks' = 'all'\n);", topicName, kafkaTestSupport.getBrokerConnectionString());
        this.executeSql(createMappingQuery);
    }

    private void executeSql(String query) {
        this.logger.info("Execute sql: " + query);
        try {
            SqlResult ignored = sqlService.execute(query, new Object[0]);
            if (ignored != null) {
                ignored.close();
            }
        }
        catch (Exception ex) {
            this.logger.warning("Error while executing SQL: " + query, (Throwable)ex);
            throw ex;
        }
    }

    public void assertTopicContentsEventually(KafkaConsumer<Integer, String> consumer, Map<Integer, String> expected) {
        HashMap collected = new HashMap();
        KafkaIntegrationSqlTest.assertTrueEventually(() -> {
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(5L));
            this.logger.info("Polled records: " + records.count());
            for (ConsumerRecord record : records) {
                collected.put((Integer)record.key(), (String)record.value());
            }
            Assertions.assertThat((Map)collected).containsAllEntriesOf(expected);
        });
    }
}

