/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client;

import com.mongodb.MongoException;
import com.mongodb.async.AsyncBatchCursor;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoIterable;
import com.mongodb.reactivestreams.client.SubscriptionSupport;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

class MongoIterablePublisher<TResult>
implements Publisher<TResult> {
    private final MongoIterable<TResult> mongoIterable;

    MongoIterablePublisher(MongoIterable<TResult> mongoIterable) {
        this.mongoIterable = mongoIterable;
    }

    public void subscribe(Subscriber<? super TResult> s) {
        new AsyncBatchCursorSubscription(s).start();
    }

    private class AsyncBatchCursorSubscription
    extends SubscriptionSupport<TResult> {
        private final AtomicBoolean requestedBatchCursorLock;
        private final AtomicBoolean bufferProcessingLock;
        private final AtomicBoolean batchCursorNextLock;
        private final AtomicBoolean cursorCompleted;
        private final AtomicReference<AsyncBatchCursor<TResult>> batchCursor;
        private final AtomicLong wanted;
        private final ConcurrentLinkedQueue<TResult> resultsQueue;

        public AsyncBatchCursorSubscription(Subscriber<? super TResult> subscriber) {
            super(subscriber);
            this.requestedBatchCursorLock = new AtomicBoolean();
            this.bufferProcessingLock = new AtomicBoolean();
            this.batchCursorNextLock = new AtomicBoolean();
            this.cursorCompleted = new AtomicBoolean();
            this.batchCursor = new AtomicReference();
            this.wanted = new AtomicLong();
            this.resultsQueue = new ConcurrentLinkedQueue();
        }

        @Override
        protected void doRequest(long n) {
            this.wanted.addAndGet(n);
            if (this.requestedBatchCursorLock.compareAndSet(false, true)) {
                if (n <= 1L) {
                    MongoIterablePublisher.this.mongoIterable.batchSize(2);
                } else if (n < Integer.MAX_VALUE) {
                    MongoIterablePublisher.this.mongoIterable.batchSize((int)n);
                } else {
                    MongoIterablePublisher.this.mongoIterable.batchSize(Integer.MAX_VALUE);
                }
                MongoIterablePublisher.this.mongoIterable.batchCursor(new SingleResultCallback<AsyncBatchCursor<TResult>>(){

                    public void onResult(AsyncBatchCursor<TResult> result, Throwable t) {
                        if (t != null) {
                            AsyncBatchCursorSubscription.this.onError(t);
                        } else if (result != null) {
                            AsyncBatchCursorSubscription.this.batchCursor.set(result);
                            AsyncBatchCursorSubscription.this.getNextBatch();
                        } else {
                            AsyncBatchCursorSubscription.this.onError((Throwable)new MongoException("Unexpected error, no AsyncBatchCursor returned from the MongoIterable."));
                        }
                    }
                });
            } else if (this.batchCursor.get() != null) {
                this.processResultsQueue();
            }
        }

        @Override
        protected void handleCancel() {
            super.handleCancel();
            AsyncBatchCursor cursor = this.batchCursor.get();
            if (cursor != null) {
                cursor.close();
            }
        }

        void getNextBatch() {
            this.log("getNextBatch");
            if (this.batchCursorNextLock.compareAndSet(false, true)) {
                AsyncBatchCursor cursor = this.batchCursor.get();
                if (cursor.isClosed()) {
                    this.cursorCompleted.set(true);
                    this.batchCursorNextLock.set(false);
                    this.processResultsQueue();
                } else {
                    int batchSize = this.wanted.get() > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.wanted.intValue();
                    cursor.setBatchSize(batchSize);
                    cursor.next(new SingleResultCallback<List<TResult>>(){

                        public void onResult(List<TResult> result, Throwable t) {
                            if (t != null) {
                                AsyncBatchCursorSubscription.this.onError(t);
                                AsyncBatchCursorSubscription.this.batchCursorNextLock.set(false);
                            } else {
                                if (result != null) {
                                    AsyncBatchCursorSubscription.this.resultsQueue.addAll(result);
                                } else {
                                    AsyncBatchCursorSubscription.this.cursorCompleted.set(true);
                                }
                                AsyncBatchCursorSubscription.this.batchCursorNextLock.set(false);
                                AsyncBatchCursorSubscription.this.processResultsQueue();
                            }
                        }
                    });
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void processResultsQueue() {
            if (this.bufferProcessingLock.compareAndSet(false, true)) {
                try {
                    Object item;
                    long i = this.wanted.get();
                    while (i > 0L && (item = this.resultsQueue.poll()) != null) {
                        this.onNext(item);
                        i = this.wanted.decrementAndGet();
                    }
                    if (this.cursorCompleted.get()) {
                        this.onComplete();
                    }
                }
                finally {
                    this.bufferProcessingLock.set(false);
                }
                if (!this.cursorCompleted.get() && this.wanted.get() > (long)this.resultsQueue.size()) {
                    this.getNextBatch();
                } else if (this.resultsQueue.peek() != null) {
                    if (this.wanted.get() > 0L) {
                        this.processResultsQueue();
                    } else if (this.cursorCompleted.get()) {
                        this.onComplete();
                    }
                }
            }
        }
    }
}

