/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.service.kv;

import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.ReplicaNotConfiguredException;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.kv.ObserveViaSeqnoRequest;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.retry.reactor.Repeat;
import com.couchbase.client.core.service.kv.ObserveContext;
import com.couchbase.client.core.service.kv.ObserveItem;
import java.time.Duration;
import java.util.ArrayList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Observe {
    public static Mono<Void> poll(ObserveContext ctx) {
        if (ctx.persistTo() == ObservePersistTo.NONE && ctx.replicateTo() == ObserveReplicateTo.NONE) {
            return Mono.empty();
        }
        if (!ctx.environment().ioConfig().mutationTokensEnabled() || !ctx.mutationToken().isPresent()) {
            return Mono.error((Throwable)new FeatureNotAvailableException("To use PersistTo and/or ReplicateTo, mutation tokens must be enabled on the IO configuration"));
        }
        RequestSpan parentSpan = ctx.environment().requestTracer().requestSpan("observe", ctx.parentSpan());
        Flux observed = Flux.defer(() -> {
            BucketConfig config = ctx.core().clusterConfig().bucketConfig(ctx.collectionIdentifier().bucket());
            return Flux.just((Object)Observe.validateReplicas(config, ctx.persistTo(), ctx.replicateTo()));
        }).flatMap(replicas -> Observe.viaMutationToken(replicas, ctx, parentSpan));
        return Observe.maybeRetry((Flux<ObserveItem>)observed, ctx).timeout(ctx.timeout(), ctx.environment().scheduler()).doFinally(t -> parentSpan.end());
    }

    private static Flux<ObserveItem> viaMutationToken(int bucketReplicas, ObserveContext ctx, RequestSpan parent) {
        if (!ctx.mutationToken().isPresent()) {
            throw new IllegalStateException("MutationToken is not present, this is a bug!");
        }
        Duration timeout = ctx.timeout();
        RetryStrategy retryStrategy = ctx.retryStrategy();
        MutationToken mutationToken = ctx.mutationToken().get();
        String id = ctx.key();
        ArrayList<ObserveViaSeqnoRequest> requests = new ArrayList<ObserveViaSeqnoRequest>();
        if (ctx.persistTo() != ObservePersistTo.NONE) {
            RequestSpan span = ctx.environment().requestTracer().requestSpan("cb.observe", parent);
            requests.add(new ObserveViaSeqnoRequest(timeout, ctx, ctx.collectionIdentifier(), retryStrategy, 0, true, mutationToken.partitionUUID(), id, span));
        }
        if (ctx.persistTo().touchesReplica() || ctx.replicateTo().touchesReplica()) {
            for (int i = 1; i <= bucketReplicas; i = (int)((short)(i + 1))) {
                RequestSpan span = ctx.environment().requestTracer().requestSpan("cb.observe", parent);
                requests.add(new ObserveViaSeqnoRequest(timeout, ctx, ctx.collectionIdentifier(), retryStrategy, i, false, mutationToken.partitionUUID(), id, span));
            }
        }
        return Flux.fromIterable(requests).flatMap(request -> {
            ctx.core().send(request);
            return Reactor.wrap(request, request.response(), true).onErrorResume(t -> Mono.empty()).doFinally(signalType -> request.context().logicallyComplete());
        }).map(response -> ObserveItem.fromMutationToken(mutationToken, response));
    }

    private static Mono<Void> maybeRetry(Flux<ObserveItem> observedItems, ObserveContext ctx) {
        return observedItems.scan((Object)ObserveItem.empty(), ObserveItem::add).repeatWhen(Repeat.times(Long.MAX_VALUE).exponentialBackoff(Duration.ofNanos(10000L), Duration.ofMillis(100L))).skipWhile(status -> !status.check(ctx.persistTo(), ctx.replicateTo())).take(1L).then();
    }

    private static int validateReplicas(BucketConfig bucketConfig, ObservePersistTo persistTo, ObserveReplicateTo replicateTo) {
        if (!(bucketConfig instanceof CouchbaseBucketConfig)) {
            throw new FeatureNotAvailableException("Only couchbase buckets support PersistTo and/or ReplicateTo");
        }
        CouchbaseBucketConfig cbc = (CouchbaseBucketConfig)bucketConfig;
        int numReplicas = cbc.numberOfReplicas();
        if (cbc.ephemeral() && persistTo.value() != 0) {
            throw new FeatureNotAvailableException("Ephemeral Buckets do not support PersistTo");
        }
        if (replicateTo.touchesReplica() && replicateTo.value() > numReplicas) {
            throw new ReplicaNotConfiguredException("Not enough replicas configured on the bucket");
        }
        if (persistTo.touchesReplica() && persistTo.value() - 1 > numReplicas) {
            throw new ReplicaNotConfiguredException("Not enough replicas configured on the bucket");
        }
        return numReplicas;
    }

    public static enum ObserveReplicateTo {
        NONE(0),
        ONE(1),
        TWO(2),
        THREE(3);

        private final short value;

        private ObserveReplicateTo(short value) {
            this.value = value;
        }

        public short value() {
            return this.value;
        }

        public boolean touchesReplica() {
            return this.value > 0;
        }
    }

    public static enum ObservePersistTo {
        ACTIVE(-1),
        NONE(0),
        ONE(1),
        TWO(2),
        THREE(3),
        FOUR(4);

        private final short value;

        private ObservePersistTo(short value) {
            this.value = value;
        }

        public short value() {
            return this.value;
        }

        public boolean touchesReplica() {
            return this.value > 0;
        }
    }
}

