/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction.cleanup;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupAttemptEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupEndRunEvent;
import com.couchbase.client.core.cnc.events.transaction.TransactionCleanupStartRunEvent;
import com.couchbase.client.core.cnc.tracing.RequestTracerAndDecorator;
import com.couchbase.client.core.cnc.tracing.TracingAttribute;
import com.couchbase.client.core.cnc.tracing.TracingDecorator;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.transaction.internal.ThreadStopRequestedException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.transaction.atr.ActiveTransactionRecordIds;
import com.couchbase.client.core.transaction.cleanup.AccessErrorException;
import com.couchbase.client.core.transaction.cleanup.ActiveTransactionRecordStats;
import com.couchbase.client.core.transaction.cleanup.CleanupRequest;
import com.couchbase.client.core.transaction.cleanup.ClientRecord;
import com.couchbase.client.core.transaction.cleanup.ClientRecordDetails;
import com.couchbase.client.core.transaction.cleanup.TransactionsCleaner;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecord;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecordUtil;
import com.couchbase.client.core.transaction.components.ActiveTransactionRecords;
import com.couchbase.client.core.transaction.components.CasMode;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.support.SpanWrapperUtil;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.MonoBridge;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@Stability.Internal
public class LostCleanupDistributed {
    private static final Logger LOGGER = LoggerFactory.getLogger((String)"com.couchbase.transactions.cleanup.lost");
    private final Core core;
    private final ClientRecord clientRecord;
    private final CoreTransactionsConfig config;
    private final Supplier<TransactionsCleaner> cleanerSupplier;
    private volatile boolean stop = false;
    private Disposable cleanupThreadLauncher;
    private final Duration actualCleanupWindow;
    private final String clientUuid = UUID.randomUUID().toString();
    private final String bp;
    private final Set<CollectionIdentifier> cleanupSet = ConcurrentHashMap.newKeySet();
    private final Map<CollectionIdentifier, Disposable> actuallyBeingCleaned = new ConcurrentHashMap<CollectionIdentifier, Disposable>();
    private static final Duration DEFAULT_SAFETY_MARGIN = Duration.ofMillis(1500L);

    public LostCleanupDistributed(Core core, CoreTransactionsConfig config, Supplier<TransactionsCleaner> cleanerSupplier) {
        this.core = Objects.requireNonNull(core);
        this.clientRecord = config.clientRecordFactory().create(core);
        this.config = Objects.requireNonNull(config);
        this.cleanerSupplier = Objects.requireNonNull(cleanerSupplier);
        this.actualCleanupWindow = config.cleanupConfig().cleanupWindow();
        this.bp = "Client " + this.clientUuid.substring(0, 5);
        this.start();
    }

    public void addToCleanupSet(CollectionIdentifier coll) {
        this.cleanupSet.add(coll);
    }

    public Set<CollectionIdentifier> cleanupSet() {
        return new HashSet<CollectionIdentifier>(this.cleanupSet);
    }

    public Mono<Void> shutdown(Duration timeout) {
        return Mono.fromCallable(() -> {
            Map<CollectionIdentifier, Disposable> map = this.actuallyBeingCleaned;
            synchronized (map) {
                HashSet<CollectionIdentifier> removeFromClientRecords = new HashSet<CollectionIdentifier>(this.actuallyBeingCleaned.keySet());
                this.stop = true;
                LOGGER.info("{} stopping lost cleanup process, {} threads running", (Object)this.bp, (Object)this.actuallyBeingCleaned.keySet().size());
                long start = System.nanoTime();
                while (true) {
                    if (Duration.ofNanos(System.nanoTime() - start).compareTo(timeout) > 0) {
                        LOGGER.warn("Exceeded timeout of {}ms while waiting for transactions cleanup thread to finish", (Object)timeout.toMillis());
                        break;
                    }
                    if (this.actuallyBeingCleaned.isEmpty()) break;
                    try {
                        Thread.sleep(50L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                return removeFromClientRecords;
            }
        }).flatMap(removeFromClientRecords -> this.clientRecord.removeClientFromClientRecord(this.clientUuid, (Set<CollectionIdentifier>)removeFromClientRecords).then().onErrorResume(err -> {
            LOGGER.warn("{} failed to remove from cleanup set with err: {}", (Object)this.bp, err);
            return Mono.empty();
        })).doOnTerminate(() -> LOGGER.info("{} stopped lost cleanup process and removed client from client records", (Object)this.bp));
    }

    private static List<String> atrsToHandle(int indexOfThisClient, int numActiveClients, int numAtrs) {
        List<String> allAtrs = ActiveTransactionRecordIds.allAtrs(numAtrs);
        ArrayList<String> out = new ArrayList<String>();
        for (int i = indexOfThisClient; i < allAtrs.size(); i += numActiveClients) {
            out.add(allAtrs.get(i));
        }
        return out;
    }

    private RequestTracerAndDecorator tracer() {
        return this.core.context().coreResources().requestTracerAndDecorator();
    }

    private TracingDecorator tip() {
        return this.core.context().coreResources().tracingDecorator();
    }

    public Flux<TransactionCleanupAttemptEvent> handleATRCleanup(String bp, CollectionIdentifier atrCollection, String atrId, ActiveTransactionRecordStats stats, Duration safetyMargin, SpanWrapper pspan) {
        return Flux.defer(() -> {
            long start = System.nanoTime();
            AtomicLong timeToFetchAtr = new AtomicLong(0L);
            AtomicReference<CasMode> casMode = new AtomicReference<CasMode>(CasMode.UNKNOWN);
            SpanWrapper span = SpanWrapperUtil.createOp(null, this.tracer(), atrCollection, atrId, "transaction_cleanup_atr", pspan);
            TransactionsCleaner cleaner = this.cleanerSupplier.get();
            return cleaner.hooks().beforeAtrGet.apply(atrId).then(ActiveTransactionRecord.getAtr(this.core, atrCollection, atrId, this.core.context().environment().timeoutConfig().kvTimeout(), span)).flatMap(atr -> {
                timeToFetchAtr.set(System.nanoTime());
                if (atr.isPresent()) {
                    casMode.set(((ActiveTransactionRecords)atr.get()).casMode());
                    return Mono.just((Object)((ActiveTransactionRecords)atr.get()));
                }
                return Mono.empty();
            }).doOnError(err -> {
                LOGGER.debug("{} Got error '{}' while getting ATR {}/", new Object[]{bp, err, ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId)});
                stats.errored = Optional.of(err);
            }).flatMapMany(atr -> {
                Collection expired;
                stats.numEntries = atr.entries().size();
                stats.exists = true;
                stats.errored = Optional.empty();
                stats.expired = expired = (Collection)atr.entries().stream().filter(v -> v.hasExpired(safetyMargin.toMillis())).collect(Collectors.toList());
                this.tip().provideAttr(TracingAttribute.TRANSACTION_ATR_ENTRIES_COUNT, span.span(), stats.numEntries);
                this.tip().provideAttr(TracingAttribute.TRANSACTION_ATR_ENTRIES_EXPIRED, span.span(), stats.expired.size());
                return Flux.fromIterable((Iterable)expired).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup());
            }).concatMap(atrEntry -> {
                LOGGER.trace("{} Found expired attempt {}, expires after {}, age {} (started {}, now {})", new Object[]{bp, atrEntry.attemptId(), atrEntry.expiresAfterMillis().orElse(-1), atrEntry.ageMillis(), atrEntry.timestampStartMillis().orElse(0L), atrEntry.cas() / 1000000L});
                stats.expiredEntryCleanupTotalAttempts.incrementAndGet();
                CleanupRequest req = CleanupRequest.fromAtrEntry(atrCollection, atrEntry);
                return cleaner.performCleanup(req, false, span).onErrorResume(err -> {
                    stats.expiredEntryCleanupFailedAttempts.incrementAndGet();
                    return Mono.empty();
                });
            }).doOnError(err -> span.finish((Throwable)err)).doOnTerminate(() -> {
                if (LOGGER.isTraceEnabled()) {
                    long now = System.nanoTime();
                    LOGGER.trace("{} processed ATR {} after {}\u00b5s ({} fetching ATR), CAS={}: {}", new Object[]{bp, ActiveTransactionRecordUtil.getAtrDebug(atrCollection, atrId), TimeUnit.NANOSECONDS.toMicros(now - start), TimeUnit.NANOSECONDS.toMicros(timeToFetchAtr.get() - start), casMode.get(), stats});
                }
                span.finish();
            }).onErrorResume(err -> Mono.empty());
        });
    }

    private void start() {
        this.periodicallyCheckCleanupSet();
    }

    Mono<Void> createThreadForCollectionIfNeeded(CollectionIdentifier coll) {
        return Mono.defer(() -> {
            Map<CollectionIdentifier, Disposable> map = this.actuallyBeingCleaned;
            synchronized (map) {
                if (this.stop) {
                    return Mono.empty();
                }
                if (!this.actuallyBeingCleaned.containsKey(coll)) {
                    String collDebug = RedactableArgument.redactMeta(coll.bucket() + "." + coll.scope().orElse("-") + "." + coll.collection().orElse("-")).toString();
                    LOGGER.info("{} will start cleaning lost transactions on collection {}", (Object)this.bp, (Object)collDebug);
                    Disposable thread = this.perCollectionThread(coll).onErrorResume(err -> {
                        if (!(err instanceof ThreadStopRequestedException)) {
                            LOGGER.warn("{} {} lost transactions thread has ended on error {} (will be retried)", new Object[]{this.bp, collDebug, DebugUtil.dbg(err)});
                            return Mono.empty();
                        }
                        return Mono.empty();
                    }).doOnTerminate(() -> {
                        LOGGER.debug("{} {} lost transactions thread has ended", (Object)this.bp, (Object)collDebug);
                        this.actuallyBeingCleaned.remove(coll);
                    }).subscribe();
                    this.actuallyBeingCleaned.put(coll, thread);
                }
            }
            return Mono.empty();
        });
    }

    private void periodicallyCheckCleanupSet() {
        this.cleanupThreadLauncher = Reactor.safeInterval(Duration.ZERO, Duration.ofSeconds(1L), this.core.context().environment().transactionsSchedulers().schedulerCleanup()).concatMap(v -> Flux.fromIterable(this.cleanupSet)).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).concatMap(this::createThreadForCollectionIfNeeded).doOnCancel(() -> LOGGER.info("{} has been told to cancel", (Object)this.bp)).subscribe(v -> LOGGER.warn("{} lost transactions cleanup thread(s) ending", (Object)this.bp), err -> {
            if (err instanceof ThreadStopRequestedException) {
                LOGGER.info("{} lost transactions cleanup told to stop", (Object)this.bp);
            } else {
                LOGGER.warn("{} lost transactions cleanup ended with exception " + err, (Object)this.bp);
            }
        });
    }

    private Mono<Void> perCollectionThread(CollectionIdentifier collection) {
        return Mono.defer(() -> {
            String bp = "lost/" + RedactableArgument.redactMeta(collection.bucket() + "." + collection.scope().orElse("-") + "." + collection.collection().orElse("-")) + "/clientId=" + this.clientUuid.substring(0, 5);
            AtomicReference span = new AtomicReference();
            this.core.openBucket(collection.bucket());
            return Mono.fromRunnable(() -> {
                SpanWrapper created = SpanWrapperUtil.createOp(null, this.tracer(), collection, null, "transaction_cleanup_window", null);
                this.tip().provideAttr(TracingAttribute.TRANSACTION_CLEANUP_CLIENT_ID, created.span(), this.clientUuid);
                this.tip().provideAttr(TracingAttribute.TRANSACTION_CLEANUP_WINDOW, created.span(), this.config.cleanupConfig().cleanupWindow().toMillis());
                span.set(created);
            }).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).then(this.clientRecord.processClient(this.clientUuid, collection, this.config, (SpanWrapper)span.get())).flatMap(clientDetails -> {
                long startOfRun = System.nanoTime();
                HashMap atrStats = new HashMap();
                List<String> atrsHandledByThisClient = LostCleanupDistributed.atrsToHandle(clientDetails.indexOfThisClient(), clientDetails.numActiveClients(), this.config.numAtrs());
                this.tip().provideAttr(TracingAttribute.TRANSACTION_CLEANUP_NUM_ATRS, ((SpanWrapper)span.get()).span(), atrsHandledByThisClient.size());
                this.tip().provideAttr(TracingAttribute.TRANSACTION_CLEANUP_NUM_ACTIVE, ((SpanWrapper)span.get()).span(), clientDetails.numActiveClients());
                this.tip().provideAttr(TracingAttribute.TRANSACTION_CLEANUP_NUM_EXPIRED, ((SpanWrapper)span.get()).span(), clientDetails.numExpiredClients());
                long checkAtrEveryNNanos = Math.max(1L, this.actualCleanupWindow.toNanos() / (long)atrsHandledByThisClient.size());
                if (atrsHandledByThisClient.size() < this.config.numAtrs()) {
                    atrsHandledByThisClient.forEach(id -> {});
                } else {
                    LOGGER.trace("{} owns all {} ATRs and will check them over next {}mills, checking an ATR every {}nanos", new Object[]{bp, this.config.numAtrs(), this.actualCleanupWindow.toMillis(), checkAtrEveryNNanos});
                }
                TransactionCleanupStartRunEvent ev = new TransactionCleanupStartRunEvent(collection.bucket(), collection.scope().orElse("_default"), collection.collection().orElse("_default"), this.clientUuid, (ClientRecordDetails)clientDetails, this.actualCleanupWindow, atrsHandledByThisClient.size(), this.config.numAtrs(), Duration.ofMillis(checkAtrEveryNNanos));
                this.core.context().environment().eventBus().publish(ev);
                return Flux.zip((Publisher)Flux.fromIterable(atrsHandledByThisClient), Reactor.unsafeInterval(Duration.ofNanos(checkAtrEveryNNanos), this.core.context().environment().scheduler())).publishOn(this.core.context().environment().transactionsSchedulers().schedulerCleanup()).flatMap(v -> {
                    String atrId = (String)v.getT1();
                    LOGGER.trace("{} checking for lost txns in atr {}", (Object)bp, (Object)ActiveTransactionRecordUtil.getAtrDebug(collection, atrId));
                    ActiveTransactionRecordStats stats = new ActiveTransactionRecordStats();
                    Mono out = this.checkIfThreadStopped(collection).thenMany(this.handleATRCleanup(bp, collection, atrId, stats, DEFAULT_SAFETY_MARGIN, (SpanWrapper)span.get())).then(Mono.fromRunnable(() -> atrStats.put(atrId, stats))).thenReturn((Object)atrId);
                    MonoBridge mb = new MonoBridge(out, "", this, null);
                    return mb.external();
                }).onErrorResume(err -> {
                    if (err instanceof TimeoutException) {
                        HashSet<RetryReason> compare = new HashSet<RetryReason>();
                        compare.add(RetryReason.KV_COLLECTION_OUTDATED);
                        boolean collectionDeleted = ((TimeoutException)err).context().requestContext().retryReasons().equals(compare);
                        if (collectionDeleted) {
                            this.cleanupSet.remove(collection);
                            LOGGER.info("{} stopping cleanup on collection {} as it seems to be deleted", (Object)bp, (Object)collection);
                            return Mono.error((Throwable)err);
                        }
                    }
                    if (err instanceof ThreadStopRequestedException) {
                        return Mono.error((Throwable)err);
                    }
                    LOGGER.info("{} lost cleanup thread got error '{}', continuing", (Object)bp, err);
                    return Mono.empty();
                }).then().thenReturn((Object)Tuples.of(atrStats, (Object)ev, (Object)startOfRun));
            }).doOnNext(stats -> {
                Duration timeForRun = Duration.ofNanos(System.nanoTime() - (Long)stats.getT3());
                TransactionCleanupEndRunEvent ev = new TransactionCleanupEndRunEvent((TransactionCleanupStartRunEvent)stats.getT2(), (Map)stats.getT1(), timeForRun);
                this.core.context().environment().eventBus().publish(ev);
            }).doOnNext(v -> ((SpanWrapper)span.get()).finish()).doOnError(err -> ((SpanWrapper)span.get()).finish((Throwable)err)).retryWhen(Retry.allBut(ThreadStopRequestedException.class, AccessErrorException.class).exponentialBackoff(Duration.ofMillis(Math.min(1000L, this.config.cleanupConfig().cleanupWindow().toMillis())), this.config.cleanupConfig().cleanupWindow()).doOnRetry(v -> LOGGER.debug("{} retrying lost cleanup on error {} after {}", new Object[]{bp, DebugUtil.dbg(v.exception()), v.backoff()})).toReactorRetry()).repeat().then();
        });
    }

    private Mono<Void> checkIfThreadStopped(CollectionIdentifier collection) {
        return Mono.defer(() -> {
            if (this.stop) {
                LOGGER.info("{} Stopping background cleanup thread for lost transactions on {}", (Object)this.bp, (Object)collection);
                return Mono.error((Throwable)new ThreadStopRequestedException());
            }
            return Mono.empty();
        });
    }
}

