/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.transaction;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.api.query.CoreQueryContext;
import com.couchbase.client.core.api.query.CoreQueryMetaData;
import com.couchbase.client.core.api.query.CoreQueryOptions;
import com.couchbase.client.core.api.query.CoreQueryOptionsTransactions;
import com.couchbase.client.core.api.query.CoreQueryResult;
import com.couchbase.client.core.api.query.CoreReactiveQueryResult;
import com.couchbase.client.core.classic.query.ClassicCoreReactiveQueryResult;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.TextNode;
import com.couchbase.client.core.error.transaction.RetryTransactionException;
import com.couchbase.client.core.error.transaction.TransactionOperationFailedException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionCommitAmbiguousException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionExpiredException;
import com.couchbase.client.core.error.transaction.internal.CoreTransactionFailedException;
import com.couchbase.client.core.msg.query.QueryChunkRow;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.retry.RetryReason;
import com.couchbase.client.core.retry.reactor.DefaultRetry;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.RetryContext;
import com.couchbase.client.core.transaction.CoreTransactionAttemptContext;
import com.couchbase.client.core.transaction.CoreTransactionContext;
import com.couchbase.client.core.transaction.CoreTransactionResult;
import com.couchbase.client.core.transaction.config.CoreMergedTransactionConfig;
import com.couchbase.client.core.transaction.config.CoreTransactionOptions;
import com.couchbase.client.core.transaction.config.CoreTransactionsConfig;
import com.couchbase.client.core.transaction.forwards.Supported;
import com.couchbase.client.core.transaction.support.SpanWrapper;
import com.couchbase.client.core.transaction.threadlocal.TransactionMarker;
import com.couchbase.client.core.transaction.util.DebugUtil;
import com.couchbase.client.core.transaction.util.QueryUtil;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

@Stability.Internal
public class CoreTransactionsReactive {
    static final int MAX_ATTEMPTS = 100;
    private final Core core;
    private final CoreTransactionsConfig config;

    public CoreTransactionsReactive(Core core, CoreTransactionsConfig config) {
        this.core = Objects.requireNonNull(core);
        this.config = Objects.requireNonNull(config);
    }

    public Mono<CoreTransactionResult> executeTransaction(Mono<CoreTransactionAttemptContext> createAttempt, CoreMergedTransactionConfig config, CoreTransactionContext overall, Function<CoreTransactionAttemptContext, Mono<Void>> transactionLogic, boolean singleQueryTransactionMode) {
        AtomicReference<Long> startTime = new AtomicReference<Long>();
        return createAttempt.publishOn(this.core.context().environment().transactionsSchedulers().schedulerBlocking()).doOnSubscribe(v -> {
            if (startTime.get() == null) {
                startTime.set(System.nanoTime());
            }
        }).doOnNext(ctx -> {
            overall.incAttempts();
            ctx.LOGGER.info(ctx.attemptId(), "starting attempt %d/%s/%s", overall.numAttempts(), ctx.transactionId(), ctx.attemptId());
        }).flatMap(ctx -> ((Mono)transactionLogic.apply((CoreTransactionAttemptContext)ctx)).contextWrite(reactiveContext -> {
            TransactionMarker marker = new TransactionMarker((CoreTransactionAttemptContext)ctx);
            return reactiveContext.put(TransactionMarker.class, (Object)marker);
        }).onErrorResume(err -> Mono.error((Throwable)ctx.convertToOperationFailedIfNeeded((Throwable)err, singleQueryTransactionMode))).then(ctx.implicitCommit(singleQueryTransactionMode)).onErrorResume(err -> ctx.lambdaEnd(this.core().transactionsCleanup(), (Throwable)err, singleQueryTransactionMode)).then(ctx.lambdaEnd(this.core().transactionsCleanup(), null, singleQueryTransactionMode)).then(ctx.transactionEnd(null, singleQueryTransactionMode)).onErrorResume(err -> {
            if (err instanceof RetryTransactionException) {
                return Mono.error((Throwable)err);
            }
            if (err instanceof CoreTransactionFailedException) {
                return Mono.error((Throwable)err);
            }
            return ctx.transactionEnd((Throwable)err, singleQueryTransactionMode);
        })).retryWhen(this.executeCreateRetryWhen(overall, startTime)).doOnNext(v -> overall.finish(null)).doOnError(err -> overall.finish((Throwable)err)).doOnTerminate(() -> {
            long elapsed = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - (Long)startTime.get());
            overall.LOGGER.info("finished txn in %dus", elapsed);
        });
    }

    private Retry executeCreateRetryWhen(CoreTransactionContext overall, AtomicReference<Long> startTime) {
        Predicate<RetryContext> predicate = context -> {
            Throwable exception = context.exception();
            return exception instanceof RetryTransactionException;
        };
        return DefaultRetry.create(predicate).exponentialBackoff(Duration.ofMillis(1L), Duration.ofMillis(100L)).doOnRetry(v -> {
            Duration ofLastAttempt = Duration.ofNanos(System.nanoTime() - (Long)startTime.get());
            overall.LOGGER.info("<>", "retrying transaction after backoff %dmillis", v.backoff().toMillis());
            overall.incrementRetryAttempts(ofLastAttempt, RetryReason.UNKNOWN);
        }).jitter(Jitter.random()).retryMax(100L).toReactorRetry();
    }

    public CoreTransactionAttemptContext createAttemptContext(CoreTransactionContext overall, CoreMergedTransactionConfig config, String attemptId) {
        return config.attemptContextFactory().create(this.core, overall, config, attemptId, this, Optional.of(overall.span()));
    }

    public Mono<CoreTransactionResult> run(Function<CoreTransactionAttemptContext, Mono<?>> transactionLogic, @Nullable CoreTransactionOptions perConfig) {
        return Mono.defer(() -> {
            CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(this.config, Optional.ofNullable(perConfig));
            CoreTransactionContext overall = new CoreTransactionContext(this.core.context(), UUID.randomUUID().toString(), merged, this.core.transactionsCleanup());
            overall.LOGGER.info(CoreTransactionsReactive.configDebug(this.config, perConfig, this.core));
            Mono createAttempt = Mono.fromCallable(() -> {
                String attemptId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, merged, attemptId);
            });
            Function<CoreTransactionAttemptContext, Mono<Void>> runLogic = ctx -> Mono.defer(() -> (Mono)transactionLogic.apply((CoreTransactionAttemptContext)ctx)).then();
            return this.executeTransaction((Mono<CoreTransactionAttemptContext>)createAttempt, merged, overall, runLogic, false);
        });
    }

    private void logElidedStacktrace(CoreTransactionAttemptContext ctx, Throwable err) {
        ctx.LOGGER.info(ctx.attemptId(), DebugUtil.createElidedStacktrace(err));
    }

    private static String configDebug(CoreTransactionsConfig config, @Nullable CoreTransactionOptions perConfig, Core core) {
        StringBuilder sb = new StringBuilder();
        sb.append("SDK version: ");
        sb.append(core.context().environment().clientVersion().orElse("-"));
        sb.append(" config: ");
        sb.append("atrs=");
        sb.append(config.numAtrs());
        sb.append(", metadataCollection=");
        sb.append(config.metadataCollection());
        sb.append(", expiry=");
        if (perConfig != null) {
            sb.append(perConfig.timeout().orElse(config.transactionExpirationTime()).toMillis());
        } else {
            sb.append(config.transactionExpirationTime().toMillis());
        }
        sb.append("ms durability=");
        sb.append((Object)config.durabilityLevel());
        if (perConfig != null) {
            sb.append(" per-txn config=");
            sb.append(" durability=");
            sb.append(perConfig.durabilityLevel());
        }
        sb.append(", supported=");
        sb.append(Supported.SUPPORTED);
        return sb.toString();
    }

    public CoreTransactionsConfig config() {
        return this.config;
    }

    public Core core() {
        return this.core;
    }

    public Mono<CoreReactiveQueryResult> query(String statement, @Nullable CoreQueryContext queryContext, CoreQueryOptions queryOptions, Optional<RequestSpan> parentSpan, Function<Throwable, RuntimeException> errorConverter) {
        return Mono.defer(() -> {
            CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(this.config, Optional.empty());
            CoreTransactionContext overall = new CoreTransactionContext(this.core.context(), UUID.randomUUID().toString(), merged, this.core.transactionsCleanup());
            overall.LOGGER.info(CoreTransactionsReactive.configDebug(this.config, null, this.core));
            Mono createAttempt = Mono.fromCallable(() -> {
                String attemptId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, merged, attemptId);
            });
            AtomicReference qr = new AtomicReference();
            Function<CoreTransactionAttemptContext, Mono<Void>> runLogic = ctx -> Mono.defer(() -> ctx.doQueryOperation("single query streaming", statement, parentSpan.map(SpanWrapper::new).orElse(null), (sidx, lockToken, span) -> {
                span.attribute("db.couchbase.transaction.single_query", true);
                return ctx.queryWrapperLocked(0, queryContext, statement, queryOptions, "query", false, true, null, null, (SpanWrapper)span, true, null, true).doOnNext(ret -> qr.set(ret));
            }).then());
            final Function<Throwable, RuntimeException> errorHandler = CoreTransactionsReactive.singleQueryHandleErrorDuringRowStreaming(overall, errorConverter);
            return this.executeTransaction((Mono<CoreTransactionAttemptContext>)createAttempt, merged, overall, runLogic, true).then(Mono.defer(() -> {
                final ClassicCoreReactiveQueryResult orig = (ClassicCoreReactiveQueryResult)qr.get();
                if (orig == null) {
                    return Mono.error((Throwable)new CoreTransactionFailedException(new IllegalStateException("No query has been run"), overall.LOGGER, overall.transactionId()));
                }
                return Mono.just((Object)new CoreReactiveQueryResult(){

                    @Override
                    public Flux<QueryChunkRow> rows() {
                        return orig.rows().onErrorResume(err -> Mono.error((Throwable)((Throwable)errorHandler.apply(err))));
                    }

                    @Override
                    public Mono<CoreQueryMetaData> metaData() {
                        return orig.metaData();
                    }

                    @Override
                    public NodeIdentifier lastDispatchedTo() {
                        return orig.lastDispatchedTo();
                    }
                });
            }));
        });
    }

    private static Function<Throwable, RuntimeException> singleQueryHandleErrorDuringRowStreaming(CoreTransactionContext overall, Function<Throwable, RuntimeException> errorConverter) {
        return err -> {
            RuntimeException converted = QueryUtil.convertQueryError(err);
            overall.LOGGER.warn("", "got error on rows stream %s, converted from %s", DebugUtil.dbg(converted), DebugUtil.dbg(err));
            RuntimeException ret = converted;
            if (converted instanceof TransactionOperationFailedException) {
                TransactionOperationFailedException tof = (TransactionOperationFailedException)converted;
                switch (tof.toRaise()) {
                    case TRANSACTION_FAILED_POST_COMMIT: {
                        ret = new CoreTransactionFailedException(tof, overall.LOGGER, overall.transactionId());
                        break;
                    }
                    case TRANSACTION_EXPIRED: {
                        String msg = "Transaction has expired configured timeout of " + overall.expirationTime().toMillis() + "ms.  The transaction is not committed.";
                        ret = new CoreTransactionExpiredException((Throwable)err, overall.LOGGER, overall.transactionId(), msg);
                        break;
                    }
                    case TRANSACTION_COMMIT_AMBIGUOUS: {
                        String msg = "It is ambiguous whether the transaction committed";
                        ret = new CoreTransactionCommitAmbiguousException((Throwable)err, overall.LOGGER, overall.transactionId(), msg);
                        break;
                    }
                    default: {
                        ret = new CoreTransactionFailedException((Throwable)err, overall.LOGGER, overall.transactionId());
                    }
                }
            }
            return (RuntimeException)errorConverter.apply(ret);
        };
    }

    public Mono<CoreQueryResult> queryBlocking(String statement, @Nullable CoreQueryContext qc, CoreQueryOptions queryOptions, Optional<RequestSpan> parentSpan) {
        return Mono.defer(() -> {
            CoreQueryOptionsTransactions optionsCopy = new CoreQueryOptionsTransactions(queryOptions);
            optionsCopy.put("tximplicit", TextNode.valueOf("true"));
            CoreMergedTransactionConfig merged = new CoreMergedTransactionConfig(this.config, parentSpan.map(CoreTransactionOptions::create));
            CoreTransactionContext overall = new CoreTransactionContext(this.core.context(), UUID.randomUUID().toString(), merged, this.core.transactionsCleanup());
            overall.LOGGER.info(CoreTransactionsReactive.configDebug(this.config, null, this.core));
            Mono createAttempt = Mono.fromCallable(() -> {
                String attemptId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, merged, attemptId);
            });
            AtomicReference qr = new AtomicReference();
            Function<CoreTransactionAttemptContext, Mono<Void>> runLogic = ctx -> Mono.defer(() -> ctx.queryBlocking(statement, qc, optionsCopy, true).doOnNext(ret -> qr.set(ret)).then());
            return this.executeTransaction((Mono<CoreTransactionAttemptContext>)createAttempt, merged, overall, runLogic, true).then(Mono.defer(() -> {
                if (qr.get() != null) {
                    return Mono.just((Object)((CoreQueryResult)qr.get()));
                }
                return Mono.error((Throwable)new CoreTransactionFailedException(new IllegalStateException("No query has been run"), overall.LOGGER, overall.transactionId()));
            }));
        });
    }
}

