/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.dse.driver.internal.core.cql.reactive;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveRow;
import com.datastax.dse.driver.internal.core.cql.reactive.ReactiveOperators;
import com.datastax.dse.driver.internal.core.util.concurrent.BoundedConcurrentQueue;
import com.datastax.oss.driver.api.core.AsyncPagingIterable;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.collect.Iterators;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.jcip.annotations.ThreadSafe;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class ReactiveResultSetSubscription<ResultSetT extends AsyncPagingIterable<Row, ResultSetT>>
implements Subscription {
    private static final Logger LOG = LoggerFactory.getLogger(ReactiveResultSetSubscription.class);
    private static final int MAX_ENQUEUED_PAGES = 4;
    private final AtomicLong requested = new AtomicLong(0L);
    private final BoundedConcurrentQueue<Page<ResultSetT>> pages = new BoundedConcurrentQueue(4);
    private final AtomicInteger draining = new AtomicInteger(0);
    private final CompletableFuture<Void> firstSubscriberRequestArrived = new CompletableFuture();
    private volatile Subscriber<? super ReactiveRow> mainSubscriber;
    private volatile Subscriber<ColumnDefinitions> columnDefinitionsSubscriber;
    private volatile Subscriber<ExecutionInfo> executionInfosSubscriber;
    private volatile Subscriber<Boolean> wasAppliedSubscriber;
    private volatile boolean cancelled = false;

    ReactiveResultSetSubscription(@NonNull Subscriber<? super ReactiveRow> mainSubscriber, @NonNull Subscriber<ColumnDefinitions> columnDefinitionsSubscriber, @NonNull Subscriber<ExecutionInfo> executionInfosSubscriber, @NonNull Subscriber<Boolean> wasAppliedSubscriber) {
        this.mainSubscriber = mainSubscriber;
        this.columnDefinitionsSubscriber = columnDefinitionsSubscriber;
        this.executionInfosSubscriber = executionInfosSubscriber;
        this.wasAppliedSubscriber = wasAppliedSubscriber;
    }

    void start(@NonNull Callable<CompletionStage<ResultSetT>> firstPage) {
        this.firstSubscriberRequestArrived.thenAccept(aVoid -> this.fetchNextPageAndEnqueue(new Page(firstPage), true));
    }

    public void request(long n) {
        if (!this.cancelled) {
            if (n < 1L) {
                this.doOnError(new IllegalArgumentException(this.mainSubscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements."));
            } else {
                ReactiveOperators.addCap(this.requested, n);
                if (!this.firstSubscriberRequestArrived.isDone()) {
                    this.firstSubscriberRequestArrived.complete(null);
                }
                this.drain();
            }
        }
    }

    public void cancel() {
        if (!this.cancelled) {
            this.cancelled = true;
            if (this.draining.getAndIncrement() == 0) {
                this.clear();
            }
        }
    }

    private void drain() {
        if (this.draining.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        do {
            long emitted;
            long r = this.requested.get();
            for (emitted = 0L; emitted != r; ++emitted) {
                Object result;
                if (this.cancelled) {
                    this.clear();
                    return;
                }
                try {
                    result = this.tryNext();
                }
                catch (Throwable t) {
                    this.doOnError(t);
                    this.clear();
                    return;
                }
                if (result == null) break;
                if (result instanceof Throwable) {
                    this.doOnError((Throwable)result);
                    this.clear();
                    return;
                }
                this.doOnNext((ReactiveRow)result);
            }
            if (this.isExhausted()) {
                this.doOnComplete();
                this.clear();
                return;
            }
            if (this.cancelled) {
                this.clear();
                return;
            }
            if (emitted == 0L) continue;
            ReactiveOperators.subCap(this.requested, emitted);
        } while ((missed = this.draining.addAndGet(-missed)) != 0);
    }

    @Nullable
    private Object tryNext() {
        Page<ResultSetT> current = this.pages.peek();
        if (current != null) {
            if (current.hasMoreRows()) {
                return current.nextRow();
            }
            if (current.hasMorePages()) {
                if (this.pages.poll() == null) {
                    throw new AssertionError((Object)"Queue is empty, this should not happen");
                }
                current = this.pages.peek();
                if (current != null && current.hasMoreRows()) {
                    return current.nextRow();
                }
            }
        }
        return null;
    }

    private boolean isExhausted() {
        Page<ResultSetT> current = this.pages.peek();
        return current != null && !current.hasMoreRows() && !current.hasMorePages();
    }

    private void fetchNextPageAndEnqueue(@NonNull Page<ResultSetT> current, boolean firstPage) {
        current.fetchNextPage().handle((rs, t) -> {
            Page page;
            if (t == null) {
                page = this.toPage(rs);
                this.executionInfosSubscriber.onNext((Object)rs.getExecutionInfo());
                if (!page.hasMorePages()) {
                    this.executionInfosSubscriber.onComplete();
                }
                if (firstPage) {
                    this.columnDefinitionsSubscriber.onNext((Object)rs.getColumnDefinitions());
                    this.columnDefinitionsSubscriber.onComplete();
                    boolean wasApplied = rs.remaining() == 0 || rs.wasApplied();
                    this.wasAppliedSubscriber.onNext((Object)wasApplied);
                    this.wasAppliedSubscriber.onComplete();
                }
            } else {
                if (t instanceof CompletionException) {
                    t = t.getCause();
                }
                page = this.toErrorPage((Throwable)t);
                this.executionInfosSubscriber.onError(t);
                if (firstPage) {
                    this.columnDefinitionsSubscriber.onError(t);
                    this.wasAppliedSubscriber.onError(t);
                }
            }
            return page;
        }).thenCompose(this.pages::offer).thenAccept(page -> {
            if (page.hasMorePages() && !this.cancelled) {
                this.fetchNextPageAndEnqueue((Page<ResultSetT>)page, false);
            }
            this.drain();
        });
    }

    private void doOnNext(@NonNull ReactiveRow result) {
        try {
            this.mainSubscriber.onNext((Object)result);
        }
        catch (Throwable t) {
            LOG.error(this.mainSubscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext.", t);
            this.cancel();
        }
    }

    private void doOnComplete() {
        try {
            this.mainSubscriber.onComplete();
        }
        catch (Throwable t) {
            LOG.error(this.mainSubscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t);
        }
        this.cancel();
    }

    void doOnError(@NonNull Throwable error) {
        try {
            this.mainSubscriber.onError(error);
        }
        catch (Throwable t) {
            t.addSuppressed(error);
            LOG.error(this.mainSubscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t);
        }
        this.cancel();
    }

    private void clear() {
        this.pages.clear();
        this.mainSubscriber = null;
        this.columnDefinitionsSubscriber = null;
        this.executionInfosSubscriber = null;
        this.wasAppliedSubscriber = null;
    }

    @NonNull
    private Page<ResultSetT> toPage(@NonNull ResultSetT rs) {
        ExecutionInfo executionInfo = rs.getExecutionInfo();
        Iterator results = Iterators.transform(rs.currentPage().iterator(), row -> new DefaultReactiveRow(Objects.requireNonNull(row), executionInfo));
        return new Page(results, rs.hasMorePages() ? () -> rs.fetchNextPage() : null);
    }

    @NonNull
    private Page<ResultSetT> toErrorPage(@NonNull Throwable t) {
        return new Page((Iterator<?>)Iterators.singletonIterator((Object)t), null);
    }

    static class Page<ResultSetT extends AsyncPagingIterable<Row, ResultSetT>> {
        @NonNull
        final Iterator<?> iterator;
        @Nullable
        final Callable<CompletionStage<ResultSetT>> nextPage;

        Page(@NonNull Callable<CompletionStage<ResultSetT>> nextPage) {
            this.iterator = Collections.emptyIterator();
            this.nextPage = nextPage;
        }

        Page(@NonNull Iterator<?> iterator, @Nullable Callable<CompletionStage<ResultSetT>> nextPage) {
            this.iterator = iterator;
            this.nextPage = nextPage;
        }

        boolean hasMorePages() {
            return this.nextPage != null;
        }

        @NonNull
        CompletionStage<ResultSetT> fetchNextPage() {
            try {
                return Objects.requireNonNull(this.nextPage).call();
            }
            catch (Exception e) {
                return CompletableFutures.failedFuture(e);
            }
        }

        boolean hasMoreRows() {
            return this.iterator.hasNext();
        }

        @NonNull
        Object nextRow() {
            return this.iterator.next();
        }
    }
}

