/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql_nightly;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.AbstractJobProxy;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.kafka.impl.KafkaTestSupport;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.sql.SqlService;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=HazelcastParametrizedRunner.class)
@Parameterized.UseParametersRunnerFactory(value=HazelcastSerialParametersRunnerFactory.class)
@Category(value={NightlyTest.class})
public class SqlSTSInnerEquiJoinFaultToleranceStressTest
extends JetTestSupport {
    protected static final int INSTANCE_COUNT = 5;
    protected static final int SNAPSHOT_TIMEOUT_SECONDS = 30;
    protected static final String JOB_NAME = "s2s_join";
    protected static final String EXACTLY_ONCE = "exactlyOnce";
    protected static final String AT_LEAST_ONCE = "atLeastOnce";
    protected HazelcastInstance[] instances;
    protected HazelcastInstance coordinator;
    protected int eventsPerSink = 100;
    protected int sinkCount = 100;
    protected int eventsToProcess = this.eventsPerSink * this.sinkCount;
    private static KafkaTestSupport kafkaTestSupport;
    private static final Duration STREAM_FETCH_TIMEOUT;
    private static final int TOPIC_PARTITION_COUNT = 5;
    private volatile Throwable ex;
    private final Map<String, Integer> resultSet = new HashMap<String, Integer>();
    private SqlService sqlService;
    private Thread kafkaFeedThread;
    private String sourceTopic;
    protected String sinkTopic;
    private JobRestarter jobRestarter;
    protected int expectedEventsCount = this.eventsToProcess;
    protected int firstItemId = 1;
    protected int lastItemId = this.eventsToProcess;
    @Parameterized.Parameter(value=0)
    public String processingGuarantee;
    @Parameterized.Parameter(value=1)
    public boolean restartGraceful;

    @Parameterized.Parameters(name="processingGuarantee:{0}, restartGraceful:{1}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList({AT_LEAST_ONCE, true}, {AT_LEAST_ONCE, false}, {EXACTLY_ONCE, true}, {EXACTLY_ONCE, false});
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        kafkaTestSupport = KafkaTestSupport.create();
        kafkaTestSupport.createKafkaCluster();
    }

    @AfterClass
    public static void afterClass() {
        kafkaTestSupport.shutdownKafkaCluster();
    }

    @Before
    public void setUp() throws Exception {
        if (Objects.equals(this.processingGuarantee, EXACTLY_ONCE) && !this.restartGraceful) {
            this.logger.warning("Test skipped: https://hazelcast.atlassian.net/browse/HZ-3187");
            return;
        }
        SqlSTSInnerEquiJoinFaultToleranceStressTest.assertTrueEventually(HazelcastTestSupport::assertNoRunningInstances, (long)30L);
        this.instances = this.createHazelcastInstances(5);
        this.coordinator = this.instances[0];
        this.sqlService = this.coordinator.getSql();
        this.sourceTopic = "source_topic_" + SqlTestSupport.randomName();
        kafkaTestSupport.createTopic(this.sourceTopic, 5);
        this.sqlService.execute("CREATE MAPPING " + this.sourceTopic + " TYPE Kafka OPTIONS ( 'keyFormat'='int', 'valueFormat'='varchar', 'bootstrap.servers'='" + kafkaTestSupport.getBrokerConnectionString() + "', 'auto.offset.reset'='earliest')", new Object[0]);
        this.kafkaFeedThread = new Thread(() -> this.createTopicData(this.sqlService, this.sourceTopic));
        this.kafkaFeedThread.start();
        this.sinkTopic = "sink_topic_" + SqlTestSupport.randomName();
        kafkaTestSupport.createTopic(this.sinkTopic, 5);
        this.sqlService.execute("CREATE MAPPING " + this.sinkTopic + " TYPE Kafka  OPTIONS ( 'keyFormat'='int', 'valueFormat'='varchar', 'bootstrap.servers'='" + kafkaTestSupport.getBrokerConnectionString() + "', 'auto.offset.reset'='earliest')", new Object[0]);
        this.sqlService.execute("CREATE VIEW s1 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE " + this.sourceTopic + " , DESCRIPTOR(__key), " + this.getAllowedLag() + "))", new Object[0]);
        this.sqlService.execute("CREATE VIEW s2 AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE " + this.sourceTopic + " , DESCRIPTOR(__key), " + this.getAllowedLag() + "))", new Object[0]);
        this.jobRestarter = new JobRestarter(this.coordinator);
        this.jobRestarter.start();
    }

    @After
    public void after() throws InterruptedException {
        if (Objects.equals(this.processingGuarantee, EXACTLY_ONCE) && !this.restartGraceful) {
            return;
        }
        this.kafkaFeedThread.join();
        this.kafkaFeedThread = null;
        this.jobRestarter.finish();
        this.jobRestarter.join();
        this.jobRestarter = null;
        try {
            super.shutdownFactory();
        }
        catch (Exception e) {
            this.ex = e;
        }
        if (this.ex != null) {
            throw new RuntimeException(this.ex);
        }
        SqlSTSInnerEquiJoinFaultToleranceStressTest.assertTrueEventually(HazelcastTestSupport::assertNoRunningInstances, (long)30L);
    }

    @Test(timeout=900000L)
    public void stressTest() throws Exception {
        if (Objects.equals(this.processingGuarantee, EXACTLY_ONCE) && !this.restartGraceful) {
            return;
        }
        this.sqlService.execute(this.setupFetchingQuery(), new Object[0]);
        try (SqlResult result = this.sqlService.execute("SELECT * FROM " + this.sinkTopic, new Object[0]);){
            NotBlockingIterator notBlockingResult = new NotBlockingIterator(result.iterator(), STREAM_FETCH_TIMEOUT);
            while (notBlockingResult.hasNext()) {
                SqlRow sqlRow = (SqlRow)notBlockingResult.next();
                String s = (String)sqlRow.getObject(1);
                this.resultSet.merge(s, 1, Integer::sum);
                if (this.resultSet.size() != this.expectedEventsCount) continue;
                break;
            }
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.resultSet.size()).as("Unexpected result count", new Object[0])).isEqualTo(this.expectedEventsCount);
        Job job = this.coordinator.getJet().getJob(JOB_NAME);
        this.jobRestarter.finish();
        this.jobRestarter.join();
        Assert.assertNotNull((Object)job);
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.RUNNING);
        job.cancel();
        JobAssertions.assertThat((Job)job).eventuallyHasStatus(JobStatus.FAILED);
        if (this.processingGuarantee.equals(EXACTLY_ONCE) || this.restartGraceful) {
            List duplicates = this.resultSet.entrySet().stream().filter(entry -> (Integer)entry.getValue() > 1).collect(Collectors.toList());
            ((ListAssert)Assertions.assertThat(duplicates).as("Non-unique result count: %d", new Object[]{duplicates.size()})).isEmpty();
        }
        for (int i = this.firstItemId; i <= this.lastItemId; ++i) {
            String key = "value-" + i;
            ((AbstractIntegerAssert)Assertions.assertThat((Integer)this.resultSet.remove(key)).as("Missing element: %s", new Object[]{key})).isNotNull();
        }
        ((MapAssert)Assertions.assertThat(this.resultSet).as("Unexpected items in the result: %s", new Object[]{this.resultSet})).isEmpty();
    }

    protected String setupFetchingQuery() {
        return "CREATE JOB s2s_join OPTIONS ('processingGuarantee'='" + this.processingGuarantee + "', 'snapshotIntervalMillis' = '500') AS SINK INTO " + this.sinkTopic + " SELECT s1.__key, s2.this FROM s1 JOIN s2 ON s1.__key = s2.__key";
    }

    protected int getAllowedLag() {
        return 1;
    }

    private void createTopicData(SqlService sqlService, String topicName) {
        try {
            int itemsSank = 0;
            for (int sink = 1; sink <= this.sinkCount; ++sink) {
                StringBuilder queryBuilder = new StringBuilder("INSERT INTO " + topicName + " VALUES ");
                for (int i = 0; i < this.eventsPerSink; ++i) {
                    queryBuilder.append("(").append(++itemsSank).append(", 'value-").append(itemsSank).append("'),");
                }
                queryBuilder.setLength(queryBuilder.length() - 1);
                Assert.assertEquals((long)itemsSank, (long)(this.eventsPerSink * sink));
                sqlService.execute(queryBuilder.toString(), new Object[0]);
                this.logger.info("Items sank " + itemsSank);
                Thread.sleep(500L);
            }
        }
        catch (Throwable e) {
            this.logger.warning(null, e);
            this.ex = e;
        }
    }

    static {
        STREAM_FETCH_TIMEOUT = Duration.ofMinutes(2L);
    }

    class JobRestarter
    extends Thread {
        private final JetService jetService;
        private final JetServiceBackend jetBackend;
        private volatile boolean finished;

        JobRestarter(HazelcastInstance hazelcastInstance) {
            this.jetService = hazelcastInstance.getJet();
            this.jetBackend = JetTestSupport.getJetServiceBackend((HazelcastInstance)hazelcastInstance);
        }

        @Override
        public void run() {
            try {
                HazelcastTestSupport.assertTrueEventually(() -> Assert.assertNotNull((Object)this.jetService.getJob(SqlSTSInnerEquiJoinFaultToleranceStressTest.JOB_NAME)));
                AbstractJobProxy job = (AbstractJobProxy)this.jetService.getJob(SqlSTSInnerEquiJoinFaultToleranceStressTest.JOB_NAME);
                Assert.assertNotNull((Object)job);
                Long lastExecutionId = null;
                while (!this.finished) {
                    JetTestSupport.waitForNextSnapshot((JobRepository)this.jetBackend.getJobRepository(), (long)job.getId(), (int)30, (boolean)true);
                    job.restart(SqlSTSInnerEquiJoinFaultToleranceStressTest.this.restartGraceful);
                    lastExecutionId = JobAssertions.assertThat((Job)job).eventuallyJobRunning(SqlSTSInnerEquiJoinFaultToleranceStressTest.this.coordinator, lastExecutionId);
                }
            }
            catch (NullPointerException e) {
                System.err.println(e);
            }
            catch (Throwable e) {
                SqlSTSInnerEquiJoinFaultToleranceStressTest.this.logger.warning(null, e);
                SqlSTSInnerEquiJoinFaultToleranceStressTest.this.ex = e;
            }
        }

        public void finish() {
            this.finished = true;
        }
    }

    public static class NotBlockingIterator<T>
    implements Iterator<T> {
        private final Iterator<T> delegate;
        private final ExecutorService executor;
        private final Duration timeout;

        public NotBlockingIterator(Iterator<T> delegate, Duration timeout) {
            this.delegate = delegate;
            this.executor = Executors.newSingleThreadExecutor();
            this.timeout = timeout;
        }

        @Override
        public boolean hasNext() {
            Future<Boolean> f = this.executor.submit(this.delegate::hasNext);
            try {
                return f.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                f.cancel(true);
                return false;
            }
            catch (Exception ee) {
                throw ExceptionUtil.rethrow((Throwable)ee);
            }
        }

        @Override
        public T next() {
            Future<Object> f = this.executor.submit(this.delegate::next);
            try {
                return (T)f.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                f.cancel(true);
                throw new NoSuchElementException();
            }
            catch (Exception ee) {
                throw ExceptionUtil.rethrow((Throwable)ee);
            }
        }
    }
}

