/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.columnar.client.java;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.api.manager.CoreBucketAndScope;
import com.couchbase.client.core.cnc.Context;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.error.CoreErrorCodeAndMessageException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.ErrorCodeAndMessage;
import com.couchbase.client.core.error.InvalidArgumentException;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.AnalyticsErrorContext;
import com.couchbase.client.core.error.context.ErrorContext;
import com.couchbase.client.core.error.context.GenericRequestErrorContext;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.Response;
import com.couchbase.client.core.msg.analytics.AnalyticsChunkTrailer;
import com.couchbase.client.core.msg.analytics.AnalyticsRequest;
import com.couchbase.client.core.msg.analytics.AnalyticsResponse;
import com.couchbase.client.core.retry.AuthErrorDecider;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import com.couchbase.client.core.retry.RetryAction;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.util.CbObjects;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.Deadline;
import com.couchbase.client.core.util.Golang;
import com.couchbase.columnar.client.java.BackoffCalculator;
import com.couchbase.columnar.client.java.Environment;
import com.couchbase.columnar.client.java.InvalidCredentialException;
import com.couchbase.columnar.client.java.QueryException;
import com.couchbase.columnar.client.java.QueryMetadata;
import com.couchbase.columnar.client.java.QueryOptions;
import com.couchbase.columnar.client.java.QueryPriority;
import com.couchbase.columnar.client.java.QueryResult;
import com.couchbase.columnar.client.java.Row;
import com.couchbase.columnar.client.java.TimeoutException;
import com.couchbase.columnar.client.java.codec.Deserializer;
import com.couchbase.columnar.client.java.internal.ReactorHelper;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

class QueryExecutor {
    private static final Logger log = LoggerFactory.getLogger(QueryExecutor.class);
    private static final double dispatchTimeoutFactor = 1.5;
    private static final BackoffCalculator backoff = new BackoffCalculator(Duration.ofMillis(100L), Duration.ofMinutes(1L));
    private final Core core;
    private final Environment environment;
    private final ColumnarRetryStrategy columnarRetryStrategy;
    private static final int DEFAULT_STREAM_BUFFER_ROWS = 16;

    public QueryExecutor(Core core, Environment environment, ConnectionString connectionString) {
        this.core = Objects.requireNonNull(core);
        this.environment = Objects.requireNonNull(environment);
        Duration dispatchTimeout = Duration.ofNanos((long)((double)core.environment().timeoutConfig().connectTimeout().toNanos() * 1.5));
        this.columnarRetryStrategy = new ColumnarRetryStrategy(dispatchTimeout, connectionString);
    }

    QueryResult queryBuffered(String statement, Consumer<QueryOptions> optionsCustomizer, @Nullable CoreBucketAndScope scope) {
        return this.doWithRetry(optionsCustomizer, (options, remainingTimeout) -> QueryExecutor.blockAndRewriteStackTrace(QueryExecutor.analyticsQueryAsync(this.core, this.analyticsRequest(statement, options, remainingTimeout, scope), (Deserializer)CbObjects.defaultIfNull((Object)options.deserializer(), (Object)this.environment.deserializer()))));
    }

    QueryMetadata queryStreaming(String statement, Consumer<QueryOptions> optionsCustomizer, @Nullable CoreBucketAndScope scope, Consumer<Row> rowConsumer) {
        return this.doWithRetry(optionsCustomizer, (options, remainingTimeout) -> QueryExecutor.analyticsQueryBlockingStreaming(this.core, this.analyticsRequest(statement, options, remainingTimeout, scope), (Deserializer)CbObjects.defaultIfNull((Object)options.deserializer(), (Object)this.environment.deserializer()), rowConsumer));
    }

    private <R> R doWithRetry(Consumer<QueryOptions> optionsCustomizer, QueryStrategy<R> strategy) {
        QueryOptions builder = new QueryOptions();
        optionsCustomizer.accept(builder);
        QueryOptions.Unmodifiable builtOpts = builder.build();
        Duration remainingTimeout = this.resolveTimeout(builtOpts);
        Deadline retryDeadline = Deadline.of((Duration)remainingTimeout);
        CoreErrorCodeAndMessageException prevError = null;
        int attempt = 0;
        while (true) {
            try {
                return strategy.apply(builtOpts, remainingTimeout);
            }
            catch (RuntimeException t) {
                CoreErrorCodeAndMessageException currentError;
                if (t instanceof CoreErrorCodeAndMessageException && (currentError = (CoreErrorCodeAndMessageException)t).retriable()) {
                    Duration delay = backoff.delayForAttempt(attempt++);
                    remainingTimeout = retryDeadline.remaining().orElse(Duration.ZERO);
                    if (remainingTimeout.compareTo(delay) <= 0) {
                        throw QueryExecutor.notEnoughTimeToRetry(attempt, currentError);
                    }
                    log.debug("Query attempt {} failed; retrying after {}. {}", new Object[]{attempt, delay, QueryExecutor.context((CouchbaseException)currentError)});
                    QueryExecutor.sleep(delay);
                    prevError = currentError;
                    continue;
                }
                throw QueryExecutor.translateException(t, prevError);
            }
            break;
        }
    }

    private static TimeoutException notEnoughTimeToRetry(int attempt, CoreErrorCodeAndMessageException t) {
        TimeoutException timeoutException = new TimeoutException("Query attempt " + attempt + " failed, and there's not enough time left to try again. " + t.context().exportAsString(Context.ExportFormat.JSON));
        timeoutException.addSuppressed(QueryExecutor.translateException((RuntimeException)t));
        return timeoutException;
    }

    private static Object context(final CouchbaseException e) {
        return new Object(){

            public String toString() {
                ErrorContext ctx = e.context();
                return ctx == null ? "{}" : ctx.exportAsString(Context.ExportFormat.JSON);
            }
        };
    }

    private static void sleep(Duration d) {
        try {
            TimeUnit.MILLISECONDS.sleep(d.toMillis());
        }
        catch (InterruptedException e) {
            throw ReactorHelper.propagateAsCancellation(e);
        }
    }

    private static RuntimeException translateException(RuntimeException e, @Nullable Exception suppressMe) {
        RuntimeException result = QueryExecutor.translateException(e);
        if (suppressMe != null) {
            result.addSuppressed(suppressMe);
        }
        return result;
    }

    private static RuntimeException translateException(RuntimeException e) {
        if (e instanceof CoreErrorCodeAndMessageException) {
            CoreErrorCodeAndMessageException t = (CoreErrorCodeAndMessageException)e;
            if (t.hasCode(20000)) {
                return new InvalidCredentialException(t.context());
            }
            if (t.hasCode(21002)) {
                AnalyticsErrorContext ctx;
                if (t.context() instanceof AnalyticsErrorContext && (ctx = (AnalyticsErrorContext)t.context()).requestContext().request().idempotent()) {
                    return QueryExecutor.newSafeTimeoutException((Context)t.context());
                }
                return QueryExecutor.newAmbiguousTimeoutException((Context)t.context());
            }
            ErrorCodeAndMessage primary = t.errors().stream().filter(it -> !it.retry()).findFirst().orElse((ErrorCodeAndMessage)t.errors().get(0));
            return new QueryException(primary, t.context());
        }
        if (e instanceof com.couchbase.client.core.error.TimeoutException) {
            return e instanceof UnambiguousTimeoutException ? QueryExecutor.newSafeTimeoutException((Context)((CouchbaseException)e).context()) : QueryExecutor.newAmbiguousTimeoutException((Context)((CouchbaseException)e).context());
        }
        if (e instanceof InvalidArgumentException) {
            return new IllegalArgumentException(e.getMessage(), QueryExecutor.hide((CouchbaseException)e));
        }
        if (e instanceof CouchbaseException) {
            return QueryExecutor.hide((CouchbaseException)e);
        }
        return e;
    }

    Duration resolveTimeout(QueryOptions.Unmodifiable opts) {
        Duration customTimeout = opts.timeout();
        return customTimeout != null ? customTimeout : this.environment.timeoutConfig().analyticsTimeout();
    }

    AnalyticsRequest analyticsRequest(String statement, QueryOptions.Unmodifiable opts, Duration timeout, @Nullable CoreBucketAndScope scope) {
        Objects.requireNonNull(statement);
        Duration serverTimeout = timeout.plus(Duration.ofSeconds(5L));
        ObjectNode query = Mapper.createObjectNode();
        query.put("statement", statement);
        query.put("timeout", Golang.encodeDurationToMs((Duration)serverTimeout));
        if (scope != null) {
            query.put("query_context", "default:`" + scope.bucketName() + "`.`" + scope.scopeName() + "`");
        }
        opts.injectParams(query);
        byte[] queryBytes = query.toString().getBytes(StandardCharsets.UTF_8);
        String clientContextId = query.get("client_context_id").asText();
        RequestSpan span = null;
        int numericPriority = opts.priority() == QueryPriority.HIGH ? -1 : 0;
        CoreContext ctx = this.core.context();
        AnalyticsRequest request = new AnalyticsRequest(timeout, ctx, (RetryStrategy)this.columnarRetryStrategy, ctx.authenticator(), queryBytes, numericPriority, opts.readOnly(), clientContextId, statement, span, null, null, false, 1);
        request.context().clientContext(opts.clientContext());
        return request;
    }

    private static CompletableFuture<QueryResult> analyticsQueryAsync(Core core, AnalyticsRequest request, Deserializer deserializer) {
        return QueryExecutor.analyticsQueryInternal(core, request).flatMap(response -> response.rows().map(it -> new Row(it.data(), deserializer)).collectList().flatMap(rows -> response.trailer().map(trailer -> new QueryResult(response.header(), (List<Row>)rows, (AnalyticsChunkTrailer)trailer)))).timeout(request.timeout(), QueryExecutor.potentialTimeoutException(request)).toFuture();
    }

    private static Mono<AnalyticsResponse> analyticsQueryInternal(Core core, AnalyticsRequest request) {
        core.send((Request)request);
        return Reactor.wrap((Request)request, (CompletableFuture)request.response(), (boolean)true).doOnNext(ignored -> request.context().logicallyComplete()).doOnError(err -> request.context().logicallyComplete(err));
    }

    private static QueryMetadata analyticsQueryBlockingStreaming(Core core, AnalyticsRequest request, Deserializer deserializer, Consumer<Row> callback) {
        Deadline deadline = Deadline.of((Duration)request.timeout());
        AnalyticsResponse response = (AnalyticsResponse)QueryExecutor.analyticsQueryInternal(core, request).blockOptional().get();
        Mono wholeStreamDeadlineAsMono = Mono.never().timeout(deadline.remaining().orElse(Duration.ZERO), QueryExecutor.potentialTimeoutException(request));
        Flux rows = response.rows().map(r -> new Row(r.data(), deserializer)).takeUntilOther((Publisher)wholeStreamDeadlineAsMono);
        ReactorHelper.forEachBlocking(rows, 16, callback);
        return new QueryMetadata(response.header(), (AnalyticsChunkTrailer)response.trailer().blockOptional().get());
    }

    private static TimeoutException newSafeTimeoutException(String message, Context context) {
        return new TimeoutException(message + " " + context.exportAsString(Context.ExportFormat.JSON));
    }

    private static TimeoutException newSafeTimeoutException(Context context) {
        return QueryExecutor.newSafeTimeoutException("The operation timed out. No data was changed on the server.", context);
    }

    private static TimeoutException newAmbiguousTimeoutException(Context context) {
        String message = "The operation timed out. It is unknown whether data was changed on the server.";
        return new TimeoutException(message + " " + context.exportAsString(Context.ExportFormat.JSON));
    }

    private static <T> Mono<T> potentialTimeoutException(AnalyticsRequest request) {
        return Mono.defer(() -> Mono.error(() -> request.idempotent() ? QueryExecutor.newSafeTimeoutException((Context)request.context()) : QueryExecutor.newAmbiguousTimeoutException((Context)request.context())));
    }

    private static RuntimeException hide(CouchbaseException e) {
        Throwable cause = e.getCause() instanceof CouchbaseException ? QueryExecutor.hide((CouchbaseException)e.getCause()) : e.getCause();
        RuntimeException r = new RuntimeException(e.getClass().getSimpleName() + ": " + e.getMessage(), cause);
        r.setStackTrace(e.getStackTrace());
        for (Throwable t : e.getSuppressed()) {
            if (t instanceof CouchbaseException) {
                r.addSuppressed(QueryExecutor.hide((CouchbaseException)t));
                continue;
            }
            r.addSuppressed(t);
        }
        return r;
    }

    private static <T> T blockAndRewriteStackTrace(CompletableFuture<T> future) {
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            throw ReactorHelper.propagateAsCancellation(e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof RuntimeException) {
                QueryExecutor.rewriteStackTrace(cause);
                throw (RuntimeException)cause;
            }
            throw new RuntimeException(cause);
        }
    }

    private static void rewriteStackTrace(Throwable t) {
        Exception suppressed = new Exception("The above exception was originally thrown by another thread at the following location.");
        suppressed.setStackTrace(t.getStackTrace());
        t.fillInStackTrace();
        t.addSuppressed(suppressed);
    }

    private class ColumnarRetryStrategy
    extends BestEffortRetryStrategy {
        private final Duration dispatchTimeout;
        private final Deadline bootstrapDeadline;
        private final String bootstrapTimeoutMessage;
        private final String dispatchTimeoutMessage;
        private final boolean maybeCouchbaseInternalNonProd;

        public ColumnarRetryStrategy(Duration dispatchTimeout, ConnectionString connectionString) {
            this.dispatchTimeout = Objects.requireNonNull(dispatchTimeout);
            this.bootstrapDeadline = Deadline.of((Duration)dispatchTimeout);
            this.bootstrapTimeoutMessage = "Failed to connect to cluster and get topology within " + dispatchTimeout + " (1.5 x connectTimeout). Check connection string. If connecting to a hosted service, check the admin console and make sure this machine's IP is in the list of allowed IPs.";
            this.dispatchTimeoutMessage = "Failed to dispatch request within " + dispatchTimeout + " (1.5 x connectTimeout). Check network status? Check cluster status?";
            this.maybeCouchbaseInternalNonProd = ((ConnectionString.UnresolvedSocket)connectionString.hosts().get(0)).host().endsWith(".nonprod-project-avengers.com");
        }

        private String tlsHandshakeErrorMessage(Throwable tlsHandshakeError) {
            String message = "A TLS handshake problem prevented the client from connecting to the server. Potential causes include the server (or an on-path attacker) presenting a certificate the client is not configured to trust. If connecting to a hosted service, make sure to use a relatively recent SDK version that has up-to-date certificates. Error message from the TLS engine: " + tlsHandshakeError;
            if (this.maybeCouchbaseInternalNonProd) {
                message = "It looks like you might be trying to connect to a Couchbase internal non-production hosted service. If this is true, please make sure you have configured the SDK to trust the non-prod certificate authority, like this:\n\nCluster cluster = Cluster.newInstance(\n  connectionString,\n  Credential.of(username, password),\n  clusterOptions -> clusterOptions\n    .security(it -> it.trustOnlyCertificates(Certificates.getNonProdCertificates()))\n);\n\nWe now return to your regularly scheduled exception message.\n\n" + message;
            }
            return message;
        }

        public CompletableFuture<RetryAction> shouldRetry(Request<? extends Response> request, RetryReason reason) {
            if (reason == RetryReason.AUTHENTICATION_ERROR) {
                Throwable tlsHandshakeError = AuthErrorDecider.getTlsHandshakeFailure((Core)QueryExecutor.this.core);
                if (tlsHandshakeError != null) {
                    return this.failFast(it -> new RuntimeException(this.tlsHandshakeErrorMessage(tlsHandshakeError) + " " + this.getErrorContext((Throwable)it, request), tlsHandshakeError));
                }
                return this.failFast(it -> new InvalidCredentialException(this.getErrorContext((Throwable)it, request)));
            }
            if (reason == RetryReason.GLOBAL_CONFIG_LOAD_IN_PROGRESS && this.bootstrapDeadline.exceeded()) {
                return this.failFast(it -> QueryExecutor.newSafeTimeoutException(this.bootstrapTimeoutMessage, (Context)this.getErrorContext((Throwable)it, request)));
            }
            if (reason == RetryReason.ENDPOINT_NOT_AVAILABLE && this.dispatchTimeoutExpired(request)) {
                return this.failFast(it -> QueryExecutor.newSafeTimeoutException(this.dispatchTimeoutMessage, (Context)this.getErrorContext((Throwable)it, request)));
            }
            return super.shouldRetry(request, reason);
        }

        private CompletableFuture<RetryAction> failFast(Function<Throwable, Throwable> exceptionTranslator) {
            return CompletableFuture.completedFuture(RetryAction.noRetry(exceptionTranslator));
        }

        private boolean dispatchTimeoutExpired(Request<?> request) {
            long nanosSinceCreation = System.nanoTime() - request.createdAt();
            return nanosSinceCreation > this.dispatchTimeout.toNanos();
        }

        private ErrorContext getErrorContext(Throwable t, Request<?> request) {
            CouchbaseException c;
            if (t instanceof CouchbaseException && (c = (CouchbaseException)t).context() != null) {
                return c.context();
            }
            return new GenericRequestErrorContext(request);
        }
    }

    private static interface QueryStrategy<R> {
        public R apply(QueryOptions.Unmodifiable var1, Duration var2);
    }
}

