package com.microsoft.azure.cosmosdb.rx.internal.query;

import com.microsoft.azure.cosmosdb.BridgeInternal;
import com.microsoft.azure.cosmosdb.ChangeFeedOptions;
import com.microsoft.azure.cosmosdb.FeedOptions;
import com.microsoft.azure.cosmosdb.FeedOptionsBase;
import com.microsoft.azure.cosmosdb.FeedResponse;
import com.microsoft.azure.cosmosdb.Resource;
import com.microsoft.azure.cosmosdb.rx.internal.RxDocumentServiceRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:com/microsoft/azure/cosmosdb/rx/internal/query/Paginator.class */
public class Paginator {
    private static final Logger logger = LoggerFactory.getLogger(Paginator.class);

    public static <T extends Resource> Observable<FeedResponse<T>> getPatinatedChangeFeedQueryResultAsObservable(ChangeFeedOptions changeFeedOptions, Func2<String, Integer, RxDocumentServiceRequest> func2, Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> func1, Class<T> cls, int i) {
        return getPatinatedQueryResultAsObservable(changeFeedOptions, func2, func1, cls, -1, i, true);
    }

    public static <T extends Resource> Observable<FeedResponse<T>> getPatinatedQueryResultAsObservable(FeedOptions feedOptions, Func2<String, Integer, RxDocumentServiceRequest> func2, Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> func1, Class<T> cls, int i) {
        return getPatinatedQueryResultAsObservable(feedOptions, func2, func1, cls, -1, i);
    }

    public static <T extends Resource> Observable<FeedResponse<T>> getPatinatedQueryResultAsObservable(FeedOptions feedOptions, Func2<String, Integer, RxDocumentServiceRequest> func2, Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> func1, Class<T> cls, int i, int i2) {
        return getPatinatedQueryResultAsObservable(feedOptions, func2, func1, cls, i, i2, false);
    }

    private static <T extends Resource> Observable<FeedResponse<T>> getPatinatedQueryResultAsObservable(FeedOptionsBase feedOptionsBase, Func2<String, Integer, RxDocumentServiceRequest> func2, Func1<RxDocumentServiceRequest, Observable<FeedResponse<T>>> func1, Class<T> cls, int i, int i2, boolean z) {
        return Observable.defer(() -> {
            try {
                logger.debug("Querying/Reading " + cls + "s");
                int min = Math.min(i2, i == -1 ? i2 : i);
                return ((Observable) func1.call((RxDocumentServiceRequest) func2.call(feedOptionsBase.getRequestContinuation(), Integer.valueOf(min)))).single().concatMap(feedResponse -> {
                    logger.debug("got first page");
                    final BehaviorSubject create = BehaviorSubject.create();
                    Observable concatMap = create.asObservable().concatMap(new Func1<FeedResponse<T>, Observable<FeedResponse<T>>>() { // from class: com.microsoft.azure.cosmosdb.rx.internal.query.Paginator.1
                        int innerTop;
                        int innerRequestPageSize;

                        {
                            this.innerTop = i;
                            this.innerRequestPageSize = min;
                        }

                        public Observable<FeedResponse<T>> call(FeedResponse<T> feedResponse) {
                            if (this.innerTop != -1) {
                                this.innerTop -= this.innerRequestPageSize;
                                this.innerRequestPageSize = Math.min(min, this.innerTop);
                            }
                            if (this.innerTop == 0) {
                                Observable empty = Observable.empty();
                                BehaviorSubject behaviorSubject = create;
                                return empty.doOnCompleted(() -> {
                                    behaviorSubject.onCompleted();
                                });
                            }
                            String responseContinuation = feedResponse.getResponseContinuation();
                            if (responseContinuation == null || (z && BridgeInternal.noChanges(feedResponse))) {
                                Paginator.logger.debug("continuation token is null");
                                Observable empty2 = Observable.empty();
                                BehaviorSubject behaviorSubject2 = create;
                                return empty2.doOnCompleted(() -> {
                                    behaviorSubject2.onCompleted();
                                });
                            }
                            Paginator.logger.debug("going for next page continuation token is not null");
                            Observable observable = (Observable) func1.call((RxDocumentServiceRequest) func2.call(responseContinuation, Integer.valueOf(this.innerRequestPageSize)));
                            BehaviorSubject behaviorSubject3 = create;
                            return observable.doOnNext(feedResponse2 -> {
                                Paginator.logger.trace("in do on next");
                                behaviorSubject3.onNext(feedResponse2);
                            });
                        }
                    });
                    logger.debug("invoking subject on next");
                    create.onNext(feedResponse);
                    logger.debug("after invoking subject on next");
                    return Observable.just(feedResponse).concatWith(concatMap);
                });
            } catch (Exception e) {
                logger.debug("Failure in querying/reading " + cls + "s due to [{}]", e.getMessage(), e);
                return Observable.error(e);
            }
        });
    }
}
