package com.atlassian.bitbucket.internal.search.indexing.util;

import com.atlassian.bitbucket.internal.search.client.ElasticsearchClient;
import com.atlassian.bitbucket.internal.search.client.Requests;
import com.atlassian.bitbucket.internal.search.indexing.exceptions.IndexException;
import com.atlassian.bitbucket.search.mapping.MappingType;
import com.atlassian.elasticsearch.client.ES;
import com.atlassian.elasticsearch.client.search.Hit;
import com.atlassian.elasticsearch.client.search.SearchSourceBuilder;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/bitbucket-search-5.16.0.jar:com/atlassian/bitbucket/internal/search/indexing/util/ElderScroll.class */
public class ElderScroll {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ElderScroll.class);
    private final ElasticsearchClient elasticsearchClient;

    /* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/bitbucket-search-5.16.0.jar:com/atlassian/bitbucket/internal/search/indexing/util/ElderScroll$ScrollProducer.class */
    private class ScrollProducer implements Observable.OnSubscribe<Hit> {
        private final MappingType mappingType;
        private final Duration scrollIdLifetime;
        private final SearchSourceBuilder searchSource;
        private long fetchedItemCount;
        private String scrollId;
        private String scrollIdToDelete;
        private long totalItemCount = Long.MAX_VALUE;

        ScrollProducer(SearchSourceBuilder searchSourceBuilder, MappingType mappingType, Duration duration) {
            this.searchSource = searchSourceBuilder;
            this.mappingType = mappingType;
            this.scrollIdLifetime = duration;
        }

        @Override // rx.functions.Action1
        public void call(Subscriber<? super Hit> subscriber) {
            while (!subscriber.isUnsubscribed() && this.fetchedItemCount < this.totalItemCount) {
                try {
                    fetchMore(subscriber);
                } catch (Exception e) {
                    subscriber.onError(e);
                    return;
                } finally {
                    deleteScrollToken();
                }
            }
        }

        private void deleteScrollToken() {
            if (this.scrollIdToDelete != null && ((Boolean) Observables.consumeSingle(ElderScroll.this.elasticsearchClient.execute(ES.scroll().delete().id(this.scrollIdToDelete))).fold(exc -> {
                return true;
            }, scrollDeleteResponse -> {
                return Boolean.valueOf(!scrollDeleteResponse.isStatusSuccess());
            })).booleanValue()) {
                ElderScroll.logger.warn("Unable to remove the scroll token previously reserved. This is not strictly speaking a serious error, but has resource utilisation consequences for Elasticsearch. And should therefore not be allowed to happen often.");
            }
            this.scrollIdToDelete = null;
        }

        private void fetchMore(Subscriber<? super Hit> subscriber) {
            Observable map;
            if (this.scrollId == null) {
                map = ElderScroll.this.elasticsearchClient.execute(Requests.request(this.mappingType).search().source(this.searchSource).scroll(this.scrollIdLifetime)).map(searchResponse -> {
                    if (!searchResponse.isStatusSuccess()) {
                        throw new IndexException("The request to Elasticsearch failed with HTTP status code: " + searchResponse.getStatusCode());
                    }
                    this.totalItemCount = searchResponse.getTotalHitsNumber();
                    List hits = searchResponse.getHits();
                    if (this.totalItemCount > hits.size()) {
                        if (!searchResponse.getScrollId().isPresent()) {
                            throw new IndexException("No scrollId returned from search with scroll request and multiple pages");
                        }
                        this.scrollId = (String) searchResponse.getScrollId().get();
                        this.scrollIdToDelete = this.scrollId;
                    }
                    return hits;
                });
            } else {
                map = ElderScroll.this.elasticsearchClient.execute(ES.scroll().get().id(this.scrollId).scroll(this.scrollIdLifetime)).map(scrollGetResponse -> {
                    if (!scrollGetResponse.isStatusSuccess()) {
                        throw new IndexException("The request to Elasticsearch failed with HTTP status code: " + scrollGetResponse.getStatusCode());
                    }
                    if (scrollGetResponse.getScrollId().isPresent()) {
                        this.scrollId = (String) scrollGetResponse.getScrollId().get();
                        this.scrollIdToDelete = this.scrollId;
                    } else {
                        this.scrollId = null;
                    }
                    return scrollGetResponse.getHits();
                });
            }
            Observable defaultIfEmpty = map.defaultIfEmpty(Collections.emptyList());
            subscriber.getClass();
            Observables.consume(defaultIfEmpty, subscriber::onError, list -> {
                if (list.size() == 0) {
                    subscriber.onCompleted();
                    return;
                }
                subscriber.getClass();
                list.forEach((v1) -> {
                    r1.onNext(v1);
                });
                this.fetchedItemCount += list.size();
                if (this.fetchedItemCount >= this.totalItemCount) {
                    subscriber.onCompleted();
                }
            });
        }
    }

    public ElderScroll(@Nonnull ElasticsearchClient elasticsearchClient) {
        this.elasticsearchClient = (ElasticsearchClient) Objects.requireNonNull(elasticsearchClient, "elasticsearchClient");
    }

    public Observable<Hit> search(@Nonnull SearchSourceBuilder searchSourceBuilder, @Nonnull MappingType mappingType, @Nonnull Duration duration) {
        return Observable.create(subscriber -> {
            new ScrollProducer((SearchSourceBuilder) Objects.requireNonNull(searchSourceBuilder, "searchSource"), (MappingType) Objects.requireNonNull(mappingType, "mappingType"), (Duration) Objects.requireNonNull(duration, "scrollIdLifetime")).call((Subscriber<? super Hit>) subscriber);
        });
    }
}
