/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client;

import com.mongodb.MongoException;
import com.mongodb.async.AsyncBatchCursor;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.async.client.MongoIterable;
import com.mongodb.reactivestreams.client.SubscriptionSupport;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

class MongoIterablePublisher<TResult>
implements Publisher<TResult> {
    private final MongoIterable<TResult> mongoIterable;

    MongoIterablePublisher(MongoIterable<TResult> mongoIterable) {
        this.mongoIterable = mongoIterable;
    }

    public void subscribe(Subscriber<? super TResult> s) {
        new AsyncBatchCursorSubscription(s).start();
    }

    private class AsyncBatchCursorSubscription
    extends SubscriptionSupport<TResult> {
        private final Lock lock;
        private boolean requestedBatchCursor;
        private boolean isReading;
        private boolean isProcessing;
        private boolean cursorCompleted;
        private long wanted;
        private volatile AsyncBatchCursor<TResult> batchCursor;
        private final ConcurrentLinkedQueue<TResult> resultsQueue;

        public AsyncBatchCursorSubscription(Subscriber<? super TResult> subscriber) {
            super(subscriber);
            this.lock = new ReentrantLock(false);
            this.wanted = 0L;
            this.batchCursor = null;
            this.resultsQueue = new ConcurrentLinkedQueue();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void doRequest(long n) {
            this.lock.lock();
            boolean mustGetCursor = false;
            try {
                this.wanted += n;
                if (!this.requestedBatchCursor) {
                    this.requestedBatchCursor = true;
                    mustGetCursor = true;
                }
            }
            finally {
                this.lock.unlock();
            }
            if (mustGetCursor) {
                this.getBatchCursor();
            } else {
                this.processResultsQueue();
            }
        }

        @Override
        protected void handleCancel() {
            super.handleCancel();
            if (this.batchCursor != null) {
                this.batchCursor.close();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processResultsQueue() {
            this.log("processResultsQueue");
            this.lock.lock();
            boolean mustProcess = false;
            try {
                if (!this.isProcessing) {
                    this.isProcessing = true;
                    mustProcess = true;
                }
            }
            finally {
                this.lock.unlock();
            }
            if (mustProcess) {
                this.log("processing");
                boolean getNextBatch = false;
                long processedCount = 0L;
                boolean completed = false;
                block8: while (true) {
                    long localWanted = 0L;
                    this.lock.lock();
                    try {
                        this.wanted -= processedCount;
                        if (this.resultsQueue.isEmpty()) {
                            completed = this.cursorCompleted;
                            getNextBatch = this.wanted > 0L;
                            this.isProcessing = false;
                            break;
                        }
                        if (this.wanted == 0L) {
                            this.isProcessing = false;
                            break;
                        }
                        localWanted = this.wanted;
                    }
                    finally {
                        this.lock.unlock();
                    }
                    while (true) {
                        Object item;
                        if (localWanted <= 0L || (item = this.resultsQueue.poll()) == null) continue block8;
                        this.onNext(item);
                        --localWanted;
                        ++processedCount;
                    }
                    break;
                }
                if (completed) {
                    this.onComplete();
                } else if (getNextBatch) {
                    this.getNextBatch();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void getNextBatch() {
            this.log("getNextBatch");
            this.lock.lock();
            boolean mustRead = false;
            try {
                if (!this.isReading) {
                    this.isReading = true;
                    mustRead = true;
                }
            }
            finally {
                this.lock.unlock();
            }
            if (mustRead) {
                this.batchCursor.setBatchSize(this.getBatchSize());
                this.batchCursor.next(new SingleResultCallback<List<TResult>>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void onResult(List<TResult> result, Throwable t) {
                        AsyncBatchCursorSubscription.this.lock.lock();
                        try {
                            AsyncBatchCursorSubscription.this.isReading = false;
                            if (t == null && result == null) {
                                AsyncBatchCursorSubscription.this.cursorCompleted = true;
                            }
                        }
                        finally {
                            AsyncBatchCursorSubscription.this.lock.unlock();
                        }
                        if (t != null) {
                            AsyncBatchCursorSubscription.this.onError(t);
                        } else {
                            if (result != null) {
                                AsyncBatchCursorSubscription.this.resultsQueue.addAll(result);
                            }
                            AsyncBatchCursorSubscription.this.processResultsQueue();
                        }
                    }
                });
            }
        }

        private void getBatchCursor() {
            this.log("getBatchCursor");
            MongoIterablePublisher.this.mongoIterable.batchSize(this.getBatchSize());
            MongoIterablePublisher.this.mongoIterable.batchCursor(new SingleResultCallback<AsyncBatchCursor<TResult>>(){

                public void onResult(AsyncBatchCursor<TResult> result, Throwable t) {
                    if (t != null) {
                        AsyncBatchCursorSubscription.this.onError(t);
                    } else if (result != null) {
                        AsyncBatchCursorSubscription.this.batchCursor = result;
                        AsyncBatchCursorSubscription.this.getNextBatch();
                    } else {
                        AsyncBatchCursorSubscription.this.onError((Throwable)new MongoException("Unexpected error, no AsyncBatchCursor returned from the MongoIterable."));
                    }
                }
            });
        }

        private int getBatchSize() {
            long requested = this.wanted;
            if (requested <= 1L) {
                return 2;
            }
            if (requested < Integer.MAX_VALUE) {
                return (int)requested;
            }
            return Integer.MAX_VALUE;
        }
    }
}

