/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.sql.impl.client;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.config.ClientSqlResubmissionMode;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.sql.HazelcastSqlException;
import com.hazelcast.sql.SqlResult;
import com.hazelcast.sql.SqlRow;
import com.hazelcast.sql.SqlStatement;
import com.hazelcast.sql.impl.client.SqlClientResult;
import com.hazelcast.sql.impl.client.SqlResubmissionTestSupport;
import com.hazelcast.test.ClusterFailureTestSupport;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.SlowTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=HazelcastParametrizedRunner.class)
@Parameterized.UseParametersRunnerFactory(value=HazelcastSerialParametersRunnerFactory.class)
@Category(value={SlowTest.class, ParallelJVMTest.class})
public class SqlResubmissionTest
extends SqlResubmissionTestSupport {
    private static final int INITIAL_CLUSTER_SIZE = 1;
    @Parameterized.Parameter
    public ClusterFailureTestSupport.SingleFailingInstanceClusterFailure clusterFailure;
    @Parameterized.Parameter(value=1)
    public ClientSqlResubmissionMode resubmissionMode;
    private HazelcastInstance client;
    private volatile State state;
    private volatile boolean done;
    private volatile long lastExecutionTime;
    private final Runnable cyclicFailure = () -> {
        boolean failAfterDone = true;
        while (!this.done) {
            State localState = this.state;
            switch (localState) {
                case BEFORE_FAIL: {
                    try {
                        Thread.sleep(this.lastExecutionTime / 2L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    this.state = State.BEFORE_RECOVER;
                    failAfterDone = false;
                    this.clusterFailure.fail();
                    break;
                }
                case BEFORE_RECOVER: {
                    failAfterDone = true;
                    this.clusterFailure.recover();
                    this.state = State.BEFORE_EXECUTE;
                }
            }
        }
        if (failAfterDone) {
            this.clusterFailure.fail();
        }
    };

    @Parameterized.Parameters(name="clusterFailure:{0}, mode:{1}")
    public static Collection<Object[]> parameters() {
        ArrayList<Object[]> res = new ArrayList<Object[]>();
        List<ClusterFailureTestSupport.SingleFailingInstanceClusterFailure> failures = Arrays.asList(new ClusterFailureTestSupport.NodeReplacementClusterFailure(), new ClusterFailureTestSupport.NodeShutdownClusterFailure(), new ClusterFailureTestSupport.NetworkProblemClusterFailure(), new ClusterFailureTestSupport.NodeTerminationClusterFailure());
        for (ClientSqlResubmissionMode mode : ClientSqlResubmissionMode.values()) {
            for (ClusterFailureTestSupport.SingleFailingInstanceClusterFailure failure : failures) {
                res.add(new Object[]{failure, mode});
            }
        }
        return res;
    }

    @Before
    public void initFailure() {
        this.clusterFailure.initialize(1, SqlResubmissionTest.smallInstanceConfig());
        this.createMap(this.clusterFailure.getNotFailingInstance(), COMMON_MAP_NAME, 10000, SqlResubmissionTestSupport.IntHolder::new, SqlResubmissionTestSupport.IntHolder.class);
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getSqlConfig().setResubmissionMode(this.resubmissionMode);
        this.client = this.clusterFailure.createClient(clientConfig);
        this.state = State.BEFORE_EXECUTE;
        this.done = false;
        this.lastExecutionTime = 0L;
    }

    @Test
    public void when_failingSelectBeforeAnyDataIsFetched() throws InterruptedException {
        SqlStatement statement = new SqlStatement("select * from " + COMMON_MAP_NAME);
        this.testStatement(statement, this.shouldFailBeforeAnyDataIsFetched(this.resubmissionMode));
    }

    @Test
    public void when_failingUpdate() throws InterruptedException {
        SqlStatement statement = new SqlStatement("update " + COMMON_MAP_NAME + " set field = 1");
        this.testStatement(statement, this.shouldFailNonSelectQuery(this.resubmissionMode));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testStatement(SqlStatement statement, boolean shouldFail) throws InterruptedException {
        Thread failureControlThread = new Thread(this.cyclicFailure);
        failureControlThread.start();
        try {
            if (shouldFail) {
                SqlResubmissionTest.assertThrows(HazelcastSqlException.class, () -> this.executeInLoop(statement, result -> false));
            } else {
                this.executeInLoop(statement, result -> ((SqlClientResult)result).wasResubmission());
            }
        }
        finally {
            this.done = true;
            failureControlThread.join();
            this.clusterFailure.cleanUp();
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void executeInLoop(SqlStatement statement, Predicate<SqlResult> shouldBreakFunction) {
        while (true) {
            State localState;
            if ((localState = this.state) != State.BEFORE_EXECUTE) {
                continue;
            }
            this.state = State.BEFORE_FAIL;
            try {
                long start = System.nanoTime();
                SqlResult result = this.client.getSql().execute(statement);
                this.lastExecutionTime = (System.nanoTime() - start) / 1000000L;
                if (!shouldBreakFunction.test(result)) continue;
                return;
            }
            catch (RuntimeException e) {
                if (e.getMessage() == null || !e.getMessage().contains("CREATE MAPPING")) throw e;
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void when_failingSelectAfterSomeDataIsFetched() {
        SqlStatement statement = new SqlStatement("select * from " + COMMON_MAP_NAME + " ORDER BY __key");
        statement.setCursorBufferSize(1);
        SqlResult rows = this.client.getSql().execute(statement);
        try {
            boolean resubmitted = false;
            int expectedValue = 0;
            int rowsSeen = 0;
            for (SqlRow r : rows) {
                int rowValue = (Integer)r.getObject("__key");
                if (rowsSeen++ == 5000) {
                    this.clusterFailure.fail();
                }
                if (expectedValue > 0 && rowValue == 0) {
                    Assert.assertFalse((String)"rows restarted from 0 for the 2nd time", (boolean)resubmitted);
                    resubmitted = true;
                    expectedValue = 0;
                }
                Assert.assertEquals((long)expectedValue, (long)rowValue);
                ++expectedValue;
            }
            Assert.assertEquals((long)10000L, (long)expectedValue);
            Assert.assertTrue((String)"resubmission didn't happen", (boolean)resubmitted);
        }
        catch (HazelcastSqlException e) {
            if (!this.shouldFailAfterSomeDataIsFetched(this.resubmissionMode)) {
                throw e;
            }
        }
        finally {
            this.clusterFailure.cleanUp();
        }
    }

    static enum State {
        BEFORE_FAIL,
        BEFORE_EXECUTE,
        BEFORE_RECOVER;

    }
}

