/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.search.core;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.search.SearchQueryRequest;
import com.couchbase.client.core.message.search.SearchQueryResponse;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.bucket.api.Utils;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.error.CannotRetryException;
import com.couchbase.client.java.search.SearchQuery;
import com.couchbase.client.java.search.result.AsyncSearchQueryResult;
import com.couchbase.client.java.search.result.impl.DefaultAsyncSearchQueryResult;
import com.couchbase.client.java.util.OnSubscribeDeferAndWatch;
import com.couchbase.client.java.util.retry.RetryBuilder;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action4;
import rx.functions.Func1;

public class SearchQueryExecutor {
    private static CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(SearchQueryExecutor.class);
    private static final int HTTP_TOO_MANY_REQUESTS = 429;
    private static final int HTTP_PRECONDITION_FAILED = 421;
    private final CouchbaseEnvironment environment;
    private final ClusterFacade core;
    private final String bucket;
    private final String username;
    private final String password;
    private final int upperRetryLimit;
    private final int lowerRetryLimit;

    public SearchQueryExecutor(CouchbaseEnvironment environment, ClusterFacade core, String bucket, String username, String password) {
        this.environment = environment;
        this.core = core;
        this.bucket = bucket;
        this.username = username;
        this.password = password;
        this.upperRetryLimit = Integer.parseInt(System.getProperty("com.couchbase.search.upperRetryLimit", "500"));
        this.lowerRetryLimit = Integer.parseInt(System.getProperty("com.couchbase.search.lowerRetryLimit", "50"));
    }

    public Observable<AsyncSearchQueryResult> execute(final SearchQuery query, final long timeout, final TimeUnit timeUnit) {
        return OnSubscribeDeferAndWatch.deferAndWatch(new Func1<Subscriber, Observable<SearchQueryResponse>>(){

            public Observable<SearchQueryResponse> call(Subscriber subscriber) {
                SearchQueryRequest request = new SearchQueryRequest(query.indexName(), query.export().toString(), SearchQueryExecutor.this.bucket, SearchQueryExecutor.this.username, SearchQueryExecutor.this.password);
                Utils.addRequestSpan(SearchQueryExecutor.this.environment, request, "search");
                request.subscriber(subscriber);
                return Utils.applyTimeout(SearchQueryExecutor.this.core.send(request), request, SearchQueryExecutor.this.environment, timeout, timeUnit);
            }
        }).flatMap((Func1)new Func1<SearchQueryResponse, Observable<SearchQueryResponse>>(){

            public Observable<SearchQueryResponse> call(SearchQueryResponse r) {
                if (SearchQueryExecutor.shouldRetry(r.statusCode())) {
                    return Observable.error((Throwable)new RetryableException(r));
                }
                return Observable.just((Object)r);
            }
        }).retryWhen((Func1)RetryBuilder.anyOf(RetryableException.class).max(10).delay(Delay.exponential(TimeUnit.MILLISECONDS, this.upperRetryLimit, this.lowerRetryLimit)).doOnRetry(new Action4<Integer, Throwable, Long, TimeUnit>(){

            public void call(Integer attempt, Throwable error, Long delay, TimeUnit delayUnit) {
                LOGGER.debug("Retrying {} because of {} (attempt {}, delay {} {})", new Object[]{query.export(), error.getMessage(), attempt, delay, delayUnit});
            }
        }).build()).map((Func1)new Func1<SearchQueryResponse, AsyncSearchQueryResult>(){

            public AsyncSearchQueryResult call(SearchQueryResponse response) {
                if (response.status().isSuccess()) {
                    JsonObject json = JsonObject.fromJson(response.payload());
                    return DefaultAsyncSearchQueryResult.fromJson(json);
                }
                if (response.payload().contains("index not found")) {
                    return DefaultAsyncSearchQueryResult.fromIndexNotFound(query.indexName());
                }
                if (response.status() == ResponseStatus.INVALID_ARGUMENTS) {
                    return DefaultAsyncSearchQueryResult.fromHttp400(response.payload());
                }
                if (response.statusCode() == 421) {
                    return DefaultAsyncSearchQueryResult.fromHttp412();
                }
                throw new CouchbaseException("Could not query search index, " + (Object)((Object)response.status()) + ": " + response.payload());
            }
        }).onErrorResumeNext((Func1)new Func1<Throwable, Observable<? extends AsyncSearchQueryResult>>(){

            public Observable<? extends AsyncSearchQueryResult> call(Throwable throwable) {
                if (throwable instanceof CannotRetryException && throwable.getCause() != null && throwable.getCause() instanceof RetryableException) {
                    RetryableException x = (RetryableException)throwable.getCause();
                    if (x.response().statusCode() == 429) {
                        return Observable.just((Object)DefaultAsyncSearchQueryResult.fromHttp429(x.response().payload()));
                    }
                    return Observable.error((Throwable)new CouchbaseException("Could not query search index, " + (Object)((Object)x.response().status()) + ": " + x.response().payload()));
                }
                return Observable.error((Throwable)throwable);
            }
        });
    }

    private static boolean shouldRetry(int statusCode) {
        return statusCode == 429;
    }

    class RetryableException
    extends CouchbaseException {
        private final SearchQueryResponse response;

        RetryableException(SearchQueryResponse response) {
            super("Retryable Error (" + response + ")");
            this.response = response;
        }

        public SearchQueryResponse response() {
            return this.response;
        }
    }
}

