/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.sql.impl;

import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.function.RunnableEx;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.sql.SqlTestSupport;
import com.hazelcast.jet.sql.impl.QueryResultProducerImpl;
import com.hazelcast.sql.impl.QueryException;
import com.hazelcast.sql.impl.ResultIterator;
import com.hazelcast.sql.impl.row.JetSqlRow;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.Serializable;
import java.util.NoSuchElementException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(value=HazelcastParallelClassRunner.class)
@Category(value={QuickTest.class, ParallelJVMTest.class})
public class QueryResultProducerImplTest
extends JetTestSupport {
    private QueryResultProducerImpl producer;
    private ResultIterator<JetSqlRow> iterator;
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(new ProgressTracker());

    private void initProducer(boolean blockForNextItem) {
        this.producer = new QueryResultProducerImpl(blockForNextItem);
        this.iterator = this.producer.iterator();
    }

    @Test
    public void smokeTest_streaming() throws Exception {
        this.initProducer(false);
        Semaphore semaphore = new Semaphore(0);
        Future future = QueryResultProducerImplTest.spawn((RunnableEx & Serializable)() -> {
            try {
                Assertions.assertThat((Comparable)this.iterator.hasNext(0L, TimeUnit.SECONDS)).isEqualTo((Object)ResultIterator.HasNextResult.TIMEOUT);
                semaphore.release();
                Assertions.assertThat((boolean)this.iterator.hasNext()).isTrue();
                QueryResultProducerImplTest.assertInstanceOf(JetSqlRow.class, (Object)this.iterator.next());
                semaphore.release();
                Assertions.assertThat((boolean)this.iterator.hasNext()).isFalse();
                Assertions.assertThatThrownBy(() -> this.iterator.next()).isInstanceOf(NoSuchElementException.class);
                semaphore.release();
            }
            catch (Throwable t) {
                this.logger.info("", t);
                throw t;
            }
        });
        semaphore.acquire();
        QueryResultProducerImplTest.sleepMillis((int)50);
        Assertions.assertThat((int)semaphore.availablePermits()).isZero();
        this.inbox.queue().add(SqlTestSupport.jetRow(new Object[0]));
        this.producer.consume((Inbox)this.inbox);
        semaphore.acquire();
        QueryResultProducerImplTest.sleepMillis((int)50);
        Assertions.assertThat((int)semaphore.availablePermits()).isZero();
        this.producer.done();
        QueryResultProducerImplTest.assertTrueEventually(future::isDone, (long)5L);
        semaphore.acquire();
        future.get();
    }

    @Test
    public void smokeTest_blocking() throws Exception {
        this.initProducer(true);
        Semaphore semaphore = new Semaphore(0);
        Future future = QueryResultProducerImplTest.spawn((RunnableEx & Serializable)() -> {
            try {
                semaphore.release();
                Assertions.assertThat((Comparable)this.iterator.hasNext(0L, TimeUnit.SECONDS)).isEqualTo((Object)ResultIterator.HasNextResult.YES);
                Assertions.assertThat((int)((JetSqlRow)this.iterator.next()).getFieldCount()).isEqualTo(0);
                semaphore.release();
                Assertions.assertThat((Comparable)this.iterator.hasNext(0L, TimeUnit.SECONDS)).isEqualTo((Object)ResultIterator.HasNextResult.DONE);
                semaphore.release();
            }
            catch (Throwable t) {
                this.logger.info("", t);
                throw t;
            }
        });
        semaphore.acquire();
        QueryResultProducerImplTest.sleepMillis((int)50);
        Assertions.assertThat((int)semaphore.availablePermits()).isZero();
        this.inbox.queue().add(SqlTestSupport.jetRow(new Object[0]));
        this.producer.consume((Inbox)this.inbox);
        semaphore.acquire();
        QueryResultProducerImplTest.sleepMillis((int)50);
        Assertions.assertThat((int)semaphore.availablePermits()).isZero();
        this.producer.done();
        QueryResultProducerImplTest.assertTrueEventually(future::isDone, (long)5L);
        semaphore.acquire();
        future.get();
    }

    @Test
    public void when_done_then_remainingItemsIterated() {
        this.initProducer(false);
        this.inbox.queue().add(SqlTestSupport.jetRow(1));
        this.inbox.queue().add(SqlTestSupport.jetRow(2));
        this.producer.consume((Inbox)this.inbox);
        this.producer.done();
        Assertions.assertThat((boolean)this.iterator.hasNext()).isTrue();
        Assertions.assertThat((int)((Integer)((JetSqlRow)this.iterator.next()).get(0))).isEqualTo(1);
        Assertions.assertThat((boolean)this.iterator.hasNext()).isTrue();
        Assertions.assertThat((int)((Integer)((JetSqlRow)this.iterator.next()).get(0))).isEqualTo(2);
        Assertions.assertThat(this.iterator).isExhausted();
    }

    @Test
    public void when_doneWithErrorWhileWaiting_then_throw_async() {
        this.initProducer(false);
        Assertions.assertThat((Comparable)this.iterator.hasNext(0L, TimeUnit.SECONDS)).isEqualTo((Object)ResultIterator.HasNextResult.TIMEOUT);
        this.producer.onError(QueryException.error((String)"mock error"));
        Assertions.assertThatThrownBy(() -> this.iterator.hasNext(0L, TimeUnit.SECONDS)).hasMessageContaining("mock error");
    }

    @Test
    public void when_doneWithErrorWhileWaiting_then_throw_sync() throws Exception {
        this.initProducer(false);
        Future future = QueryResultProducerImplTest.spawn((RunnableEx & Serializable)() -> Assertions.assertThatThrownBy(() -> this.iterator.hasNext(1L, TimeUnit.DAYS)).hasMessageContaining("mock error"));
        QueryResultProducerImplTest.sleepMillis((int)50);
        this.producer.onError(QueryException.error((String)"mock error"));
        future.get();
    }

    @Test
    public void when_nextItemWhileWaiting_then_hasNextReturns() throws Exception {
        this.initProducer(false);
        Future future = QueryResultProducerImplTest.spawn((RunnableEx & Serializable)() -> {
            Assertions.assertThat((Comparable)this.iterator.hasNext(1L, TimeUnit.DAYS)).isEqualTo((Object)ResultIterator.HasNextResult.YES);
            Assertions.assertThat((int)((Integer)((JetSqlRow)this.iterator.next()).get(0))).isEqualTo(42);
        });
        QueryResultProducerImplTest.sleepMillis((int)50);
        this.inbox.queue().add(SqlTestSupport.jetRow(42));
        this.producer.consume((Inbox)this.inbox);
        Assertions.assertThat((Iterable)this.inbox).isEmpty();
        future.get();
    }

    @Test
    public void when_noNextItem_then_timeoutElapses() {
        this.initProducer(false);
        long start = System.nanoTime();
        this.iterator.hasNext(500L, TimeUnit.MILLISECONDS);
        long elapsed = TimeUnit.MILLISECONDS.toNanos(System.nanoTime() - start);
        Assertions.assertThat((elapsed >= 500L ? 1 : 0) != 0).isTrue();
    }

    @Test
    public void when_iteratorRequestedTheSecondTime_then_fail() {
        this.initProducer(false);
        Assertions.assertThatThrownBy(() -> ((QueryResultProducerImpl)this.producer).iterator()).hasMessageContaining("can be requested only once");
    }

    @Test
    public void when_onErrorAfterDone_then_ignored() {
        this.initProducer(false);
        this.producer.done();
        this.producer.onError(QueryException.error((String)"error"));
        Assertions.assertThat((boolean)this.iterator.hasNext()).isFalse();
    }

    @Test
    public void when_onErrorCalledTwice_then_secondIgnored() {
        this.initProducer(false);
        this.producer.onError(QueryException.error((String)"error1"));
        this.producer.onError(QueryException.error((String)"error2"));
        Assertions.assertThatThrownBy(() -> this.iterator.hasNext()).hasMessageContaining("error1");
    }

    @Test
    public void when_doneCalledTwice_then_secondIgnored() {
        this.initProducer(false);
        this.producer.done();
        this.producer.done();
        Assertions.assertThat((boolean)this.iterator.hasNext()).isFalse();
    }

    @Test
    public void when_queueCapacityExceeded_then_inboxNotConsumed() {
        this.initProducer(false);
        int numExcessItems = 2;
        for (int i = 0; i < 4096 + numExcessItems; ++i) {
            this.inbox.queue().add(SqlTestSupport.jetRow(new Object[0]));
        }
        this.producer.consume((Inbox)this.inbox);
        Assertions.assertThat((Iterable)this.inbox).hasSize(2);
    }
}

