/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl.aggregate;

import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.kafka.impl.KafkaTestSupport;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.sql.SqlService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class SqlAggregationWithDmlTest
extends SqlTestSupport {
    private static KafkaTestSupport kafkaTestSupport;
    private static SqlService sql;

    @BeforeClass
    public static void beforeClass() throws Exception {
        kafkaTestSupport = KafkaTestSupport.create();
        kafkaTestSupport.createKafkaCluster();
        kafkaTestSupport.createTopic("foo_topic", 1);
        SqlAggregationWithDmlTest.initialize((int)2, null);
        sql = SqlAggregationWithDmlTest.instance().getSql();
    }

    @Before
    public void before() {
        SqlAggregationWithDmlTest.createMapping("foo_map", Long.class, Long.class);
        sql.executeUpdate("CREATE MAPPING foo_topic(\n    tick BIGINT,\n    ticker VARCHAR,\n    price INT)\nTYPE Kafka\nOPTIONS (\n    'keyFormat' = 'int',\n    'valueFormat' = 'json-flat',\n    'bootstrap.servers' = '" + kafkaTestSupport.getBrokerConnectionString() + "')", new Object[0]);
    }

    @AfterClass
    public static void afterClass() {
        kafkaTestSupport.shutdownKafkaCluster();
    }

    @Test
    public void test_sink() {
        this.test_sink_insert("SINK");
    }

    @Test
    public void test_insert() {
        this.test_sink_insert("INSERT");
    }

    private void test_sink_insert(String command) {
        sql.executeUpdate("CREATE JOB jobAVG AS " + command + " INTO foo_map SELECT window_start, window_end FROM TABLE(TUMBLE(  (SELECT * FROM TABLE(IMPOSE_ORDER(TABLE foo_topic, DESCRIPTOR(tick), 2)))  , DESCRIPTOR(tick) ,10)) GROUP BY window_start, window_end", new Object[0]);
        Job job = SqlAggregationWithDmlTest.instance().getJet().getJob("jobAVG");
        Assert.assertNotNull((Object)job);
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.RUNNING);
    }
}

