/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql_slow;

import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.config.IndexConfig;
import com.hazelcast.config.IndexType;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.map.IMap;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.SlowTest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastSerialClassRunner.class)
@Category(value={SlowTest.class})
public class MapScanMigrationStressTest
extends JetTestSupport {
    private static final int ITEM_COUNT = 500000;
    private static final String MAP_NAME = "map";
    private AtomicInteger progress;
    private AtomicReference<Throwable> mutatorException;
    private TestHazelcastFactory factory;
    private HazelcastInstance[] instances;
    private IMap<Integer, Integer> map;
    private MutatorThread mutator;

    @Before
    public void before() throws InterruptedException {
        this.factory = new TestHazelcastFactory();
        this.instances = new HazelcastInstance[4];
        for (int i = 0; i < this.instances.length - 1; ++i) {
            this.instances[i] = this.factory.newHazelcastInstance(MapScanMigrationStressTest.createFastRetryConfig());
        }
        SqlTestSupport.createMapping(this.instances[0], MAP_NAME, Integer.class, Integer.class);
        this.map = this.instances[0].getMap(MAP_NAME);
        this.mutatorException = new AtomicReference<Object>(null);
        this.progress = new AtomicInteger();
    }

    private static Config createFastRetryConfig() {
        return MapScanMigrationStressTest.smallInstanceConfig().setProperty("hazelcast.invocation.retry.pause.millis", "180");
    }

    @After
    public void after() throws Exception {
        if (this.mutator != null) {
            this.mutator.terminate();
            try {
                this.mutator.join(10000L);
            }
            catch (Throwable e) {
                Assert.fail((String)"Failed to stop the MutatorThread, unrelated tests might be affected");
            }
            this.mutator = null;
        }
        this.factory.shutdownAll();
        this.factory = null;
    }

    @Test(timeout=600000L)
    public void stressTest_noIndex() throws InterruptedException {
        ArrayList<SqlTestSupport.Row> expected = new ArrayList<SqlTestSupport.Row>();
        HashMap<Integer, Integer> temp = new HashMap<Integer, Integer>();
        for (int i = 0; i <= 500000; ++i) {
            temp.put(i, 1);
            expected.add(new SqlTestSupport.Row(i, i + "-1"));
        }
        this.map.putAll(temp);
        this.mutator = new MutatorThread(1000);
        this.assertRowsAnyOrder("SELECT __key, Concat_WS('-', __key, this) FROM map", expected, this.mutator, o -> (Integer)o.getValues()[0]);
        this.mutator.terminate();
        this.mutator.join();
        Assertions.assertThat((Throwable)this.mutatorException.get()).isNull();
    }

    @Test(timeout=600000L)
    public void stressTest_hashIndex() throws InterruptedException {
        ArrayList<SqlTestSupport.Row> expected = new ArrayList<SqlTestSupport.Row>();
        HashMap<Integer, Integer> temp = new HashMap<Integer, Integer>();
        for (int i = 0; i <= 500000; ++i) {
            temp.put(i, 1);
            expected.add(new SqlTestSupport.Row(i, 1));
        }
        this.map.putAll(temp);
        IndexConfig indexConfig = new IndexConfig(IndexType.HASH, new String[]{"this"}).setName(MapScanMigrationStressTest.randomName());
        this.map.addIndex(indexConfig);
        this.mutator = new MutatorThread(1000);
        this.assertRowsAnyOrder("SELECT * FROM map WHERE this = 1", expected, this.mutator, o -> (Integer)o.getValues()[0]);
        this.mutator.terminate();
        this.mutator.join();
        Assertions.assertThat((Throwable)this.mutatorException.get()).isNull();
    }

    @Test(timeout=600000L)
    public void stressTest_sortedIndex() throws InterruptedException {
        ArrayList<SqlTestSupport.Row> expected = new ArrayList<SqlTestSupport.Row>();
        HashMap<Integer, Integer> temp = new HashMap<Integer, Integer>();
        for (int i = 0; i <= 500000; ++i) {
            temp.put(i, i);
            expected.add(new SqlTestSupport.Row(500000 - i, 500000 - i));
        }
        this.map.putAll(temp);
        IndexConfig indexConfig = new IndexConfig(IndexType.SORTED, new String[]{"this"}).setName(MapScanMigrationStressTest.randomName());
        this.map.addIndex(indexConfig);
        this.mutator = new MutatorThread(100000);
        this.assertRowsOrdered("SELECT * FROM map ORDER BY this DESC", expected, this.mutator);
        this.mutator.terminate();
        this.mutator.join();
        Assertions.assertThat((Throwable)this.mutatorException.get()).isNull();
    }

    private void assertRowsAnyOrder(String sql, List<SqlTestSupport.Row> expectedRows, Thread mutator, Function<SqlTestSupport.Row, Integer> extractSortKeyFn) {
        List<SqlTestSupport.Row> actualRows = this.executeAndGetResult(sql, mutator);
        actualRows.sort(Comparator.comparing(extractSortKeyFn));
        Assert.assertEquals(expectedRows, actualRows);
    }

    private void assertRowsOrdered(String sql, Collection<SqlTestSupport.Row> expectedRows, Thread mutator) {
        List<SqlTestSupport.Row> actualRows = this.executeAndGetResult(sql, mutator);
        Assert.assertEquals(expectedRows, actualRows);
    }

    private List<SqlTestSupport.Row> executeAndGetResult(String sql, Thread mutator) {
        ArrayList<SqlTestSupport.Row> actualRows = new ArrayList<SqlTestSupport.Row>();
        Iterator rowIterator = this.instances[0].getSql().execute(sql, new Object[0]).iterator();
        Assertions.assertThat((boolean)rowIterator.hasNext()).isTrue();
        JobAssertions.assertThat((Job)((Job)this.instances[0].getJet().getJobs().get(0))).isExecutingOn(this.instances[0]);
        mutator.start();
        while (rowIterator.hasNext()) {
            SqlRow row = (SqlRow)rowIterator.next();
            actualRows.add(new SqlTestSupport.Row(row.getObject(0), row.getObject(1)));
            int i = this.progress.incrementAndGet();
            if (i % 10000 != 0) continue;
            this.logger.info("received " + i + " rows");
        }
        this.logger.info("results done");
        return actualRows;
    }

    private class MutatorThread
    extends Thread {
        private final int minProgressBetweenMutations;
        private boolean firstLaunch = true;
        private volatile boolean active = true;

        private MutatorThread(int minProgressBetweenMutations) {
            this.minProgressBetweenMutations = minProgressBetweenMutations;
        }

        private synchronized void terminate() {
            this.active = false;
        }

        @Override
        public void run() {
            int lastProgressSeen = 0;
            int currentProgress = 0;
            while (this.active) {
                try {
                    if (!this.firstLaunch) {
                        MapScanMigrationStressTest.this.instances[3].shutdown();
                    } else {
                        this.firstLaunch = false;
                    }
                    MapScanMigrationStressTest.this.instances[3] = MapScanMigrationStressTest.this.factory.newHazelcastInstance(MapScanMigrationStressTest.createFastRetryConfig());
                    while (this.active && (currentProgress = MapScanMigrationStressTest.this.progress.get()) < lastProgressSeen + this.minProgressBetweenMutations) {
                        Thread.yield();
                    }
                    MapScanMigrationStressTest.this.logger.info("Mutating");
                    lastProgressSeen = currentProgress;
                }
                catch (Exception e) {
                    MapScanMigrationStressTest.this.mutatorException.set(e);
                    e.printStackTrace();
                }
            }
        }
    }
}

