/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.service.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.events.request.IndividualReplicaGetFailedEvent;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.error.CommonExceptions;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DefaultErrorUtil;
import com.couchbase.client.core.error.DocumentUnretrievableException;
import com.couchbase.client.core.error.context.AggregateErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.GetRequest;
import com.couchbase.client.core.msg.kv.GetResponse;
import com.couchbase.client.core.msg.kv.ReplicaGetRequest;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.util.Validators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Internal
public class ReplicaHelper {
    private ReplicaHelper() {
        throw new AssertionError((Object)"not instantiable");
    }

    public static Mono<GetReplicaResponse> getAnyReplicaReactive(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan) {
        RequestSpan getAnySpan = core.context().environment().requestTracer().requestSpan("get_any_replica", parentSpan);
        return ReplicaHelper.getAllReplicasReactive(core, collectionIdentifier, documentId, timeout, retryStrategy, clientContext, getAnySpan).next().doFinally(signalType -> getAnySpan.end());
    }

    public static Flux<GetReplicaResponse> getAllReplicasReactive(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan) {
        Validators.notNullOrEmpty(documentId, "Id", () -> ReducedKeyValueErrorContext.create(documentId, collectionIdentifier));
        CoreEnvironment env = core.context().environment();
        RequestSpan getAllSpan = env.requestTracer().requestSpan("get_all_replicas", parentSpan);
        getAllSpan.setAttribute("db.system", "couchbase");
        return Reactor.toMono(() -> ReplicaHelper.getAllReplicasRequests(core, collectionIdentifier, documentId, clientContext, retryStrategy, timeout, getAllSpan)).flux().flatMap(Flux::fromStream).flatMap(request -> Reactor.wrap(request, ReplicaHelper.get(core, request), true).onErrorResume(t -> {
            env.eventBus().publish(new IndividualReplicaGetFailedEvent(request.context()));
            return Mono.empty();
        }).map(response -> new GetReplicaResponse((GetResponse)response, request instanceof ReplicaGetRequest))).doFinally(signalType -> getAllSpan.end());
    }

    public static <R> CompletableFuture<List<CompletableFuture<R>>> getAllReplicasAsync(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, Function<GetReplicaResponse, R> responseMapper) {
        CoreEnvironment env = core.context().environment();
        RequestSpan getAllSpan = env.requestTracer().requestSpan("get_all_replicas", parentSpan);
        getAllSpan.setAttribute("db.system", "couchbase");
        return ((CompletableFuture)ReplicaHelper.getAllReplicasRequests(core, collectionIdentifier, documentId, clientContext, retryStrategy, timeout, getAllSpan).thenApply(stream -> stream.map(request -> ((CompletableFuture)ReplicaHelper.get(core, request).thenApply(response -> new GetReplicaResponse((GetResponse)response, request instanceof ReplicaGetRequest))).thenApply(responseMapper)).collect(Collectors.toList()))).whenComplete((completableFutures, throwable) -> {
            AtomicInteger toComplete = new AtomicInteger(completableFutures.size());
            for (CompletableFuture cf : completableFutures) {
                cf.whenComplete((a, b) -> {
                    if (toComplete.decrementAndGet() == 0) {
                        getAllSpan.end();
                    }
                });
            }
        });
    }

    public static <R> CompletableFuture<R> getAnyReplicaAsync(Core core, CollectionIdentifier collectionIdentifier, String documentId, Duration timeout, RetryStrategy retryStrategy, Map<String, Object> clientContext, RequestSpan parentSpan, Function<GetReplicaResponse, R> responseMapper) {
        RequestSpan getAnySpan = core.context().environment().requestTracer().requestSpan("get_any_replica", parentSpan);
        CompletableFuture<List<CompletableFuture<R>>> listOfFutures = ReplicaHelper.getAllReplicasAsync(core, collectionIdentifier, documentId, timeout, retryStrategy, clientContext, getAnySpan, responseMapper);
        CompletableFuture anyReplicaFuture = new CompletableFuture();
        listOfFutures.whenComplete((futures, throwable) -> {
            if (throwable != null) {
                anyReplicaFuture.completeExceptionally((Throwable)throwable);
            }
            AtomicBoolean successCompleted = new AtomicBoolean(false);
            AtomicInteger totalCompleted = new AtomicInteger(0);
            List nestedContexts = Collections.synchronizedList(new ArrayList());
            futures.forEach(individual -> individual.whenComplete((result, error) -> {
                int completed = totalCompleted.incrementAndGet();
                if (error != null && error instanceof CompletionException && error.getCause() instanceof CouchbaseException) {
                    nestedContexts.add(((CouchbaseException)error.getCause()).context());
                }
                if (result != null && successCompleted.compareAndSet(false, true)) {
                    anyReplicaFuture.complete(result);
                }
                if (!successCompleted.get() && completed == futures.size()) {
                    anyReplicaFuture.completeExceptionally(new DocumentUnretrievableException(new AggregateErrorContext(nestedContexts)));
                }
            }));
        });
        return anyReplicaFuture.whenComplete((getReplicaResult, throwable) -> getAnySpan.end());
    }

    public static CompletableFuture<Stream<GetRequest>> getAllReplicasRequests(Core core, CollectionIdentifier collectionIdentifier, String documentId, Map<String, Object> clientContext, RetryStrategy retryStrategy, Duration timeout, RequestSpan parent) {
        Validators.notNullOrEmpty(documentId, "Id");
        CoreContext coreContext = core.context();
        CoreEnvironment environment = coreContext.environment();
        BucketConfig config = core.clusterConfig().bucketConfig(collectionIdentifier.bucket());
        if (config instanceof CouchbaseBucketConfig) {
            short s = ((CouchbaseBucketConfig)config).numberOfReplicas();
            ArrayList<GetRequest> requests = new ArrayList<GetRequest>(s + 1);
            RequestSpan span = environment.requestTracer().requestSpan("get", parent);
            GetRequest activeRequest = new GetRequest(documentId, timeout, coreContext, collectionIdentifier, retryStrategy, span);
            activeRequest.context().clientContext(clientContext);
            requests.add(activeRequest);
            for (short replica = 1; replica <= s; replica = (short)(replica + 1)) {
                RequestSpan replicaSpan = environment.requestTracer().requestSpan("get_replica", parent);
                ReplicaGetRequest replicaRequest = new ReplicaGetRequest(documentId, timeout, coreContext, collectionIdentifier, retryStrategy, replica, replicaSpan);
                replicaRequest.context().clientContext(clientContext);
                requests.add(replicaRequest);
            }
            return CompletableFuture.completedFuture(requests.stream());
        }
        if (config == null) {
            Duration duration = Duration.ofMillis(100L);
            CompletableFuture<Stream<GetRequest>> future = new CompletableFuture<Stream<GetRequest>>();
            coreContext.environment().timer().schedule(() -> ReplicaHelper.getAllReplicasRequests(core, collectionIdentifier, documentId, clientContext, retryStrategy, timeout.minus(retryDelay), parent).whenComplete((getRequestStream, throwable) -> {
                if (throwable != null) {
                    future.completeExceptionally((Throwable)throwable);
                } else {
                    future.complete((Stream<GetRequest>)getRequestStream);
                }
            }), duration);
            return future;
        }
        CompletableFuture<Stream<GetRequest>> completableFuture = new CompletableFuture<Stream<GetRequest>>();
        completableFuture.completeExceptionally(CommonExceptions.getFromReplicaNotCouchbaseBucket());
        return completableFuture;
    }

    private static CompletableFuture<GetResponse> get(Core core, GetRequest request) {
        core.send(request);
        return ((CompletableFuture)request.response().thenApply(response -> {
            if (!response.status().success()) {
                throw DefaultErrorUtil.keyValueStatusToException(request, response);
            }
            return response;
        })).whenComplete((t, e) -> request.context().logicallyComplete());
    }

    public static class GetReplicaResponse {
        private final GetResponse response;
        private final boolean fromReplica;

        public GetReplicaResponse(GetResponse response, boolean fromReplica) {
            this.response = Objects.requireNonNull(response);
            this.fromReplica = fromReplica;
        }

        public boolean isFromReplica() {
            return this.fromReplica;
        }

        public GetResponse getResponse() {
            return this.response;
        }
    }
}

