/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.reactive;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.neo4j.bolt.connection.TelemetryApi;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.BaseSession;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.GqlStatusError;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.observation.DriverObservationProvider;
import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.observation.util.ObservationUtil;
import org.neo4j.driver.internal.reactive.RxUtils;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.ReactiveResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class AbstractReactiveSession<S> {
    protected final NetworkSession session;

    public AbstractReactiveSession(NetworkSession session) {
        this.session = session;
    }

    protected abstract S createTransaction(UnmanagedTransaction var1);

    protected abstract Publisher<Void> closeTransaction(S var1, boolean var2, Observation var3);

    Publisher<S> doBeginTransaction(TransactionConfig config, ApiTelemetryWork apiTelemetryWork, Observation parentObservation) {
        return this.doBeginTransaction(config, null, apiTelemetryWork, parentObservation);
    }

    protected Publisher<S> doBeginTransaction(TransactionConfig config, String txType, ApiTelemetryWork apiTelemetryWork, Observation parentObservation) {
        return RxUtils.createSingleItemPublisher(() -> {
            CompletableFuture txFuture = new CompletableFuture();
            this.session.beginTransactionAsync(config, txType, apiTelemetryWork, parentObservation).whenComplete((tx, completionError) -> {
                if (tx != null) {
                    txFuture.complete(this.createTransaction((UnmanagedTransaction)tx));
                } else {
                    this.releaseConnectionBeforeReturning(txFuture, (Throwable)completionError);
                }
            });
            return txFuture;
        }, () -> new IllegalStateException("Unexpected condition, begin transaction call has completed successfully with transaction being null"), tx -> Mono.fromDirect(this.closeTransaction(tx, false, parentObservation)).subscribe());
    }

    private Publisher<S> beginTransaction(AccessMode mode, TransactionConfig config, ApiTelemetryWork apiTelemetryWork, Observation parentObservation) {
        return RxUtils.createSingleItemPublisher(() -> {
            CompletableFuture txFuture = new CompletableFuture();
            this.session.beginTransactionAsync(mode, config, apiTelemetryWork, parentObservation).whenComplete((tx, completionError) -> {
                if (tx != null) {
                    txFuture.complete(this.createTransaction((UnmanagedTransaction)tx));
                } else {
                    this.releaseConnectionBeforeReturning(txFuture, (Throwable)completionError);
                }
            });
            return txFuture;
        }, () -> new IllegalStateException("Unexpected condition, begin transaction call has completed successfully with transaction being null"), tx -> Mono.fromDirect(this.closeTransaction(tx, false, parentObservation)).subscribe());
    }

    protected <T> Publisher<T> runTransaction(AccessMode mode, Function<S, ? extends Publisher<T>> work, TransactionConfig config, Class<? extends BaseSession> sessionType, DriverObservationProvider observationProvider) {
        Observation executeObservation = observationProvider.sessionExecute(sessionType, mode);
        work = work.andThen(publisher -> Flux.from((Publisher)publisher).handle((value, sink) -> {
            if (value instanceof org.neo4j.driver.reactivestreams.ReactiveResult) {
                String message = String.format("%s is not a valid return value, it should be consumed before producing a return value", org.neo4j.driver.reactivestreams.ReactiveResult.class.getName());
                sink.error((Throwable)new ClientException(GqlStatusError.UNKNOWN.getStatus(), GqlStatusError.UNKNOWN.getStatusDescription(message), "N/A", message, GqlStatusError.DIAGNOSTIC_RECORD, null));
                return;
            }
            if (value instanceof ReactiveResult) {
                String message = String.format("%s is not a valid return value, it should be consumed before producing a return value", ReactiveResult.class.getName());
                sink.error((Throwable)new ClientException(GqlStatusError.UNKNOWN.getStatus(), GqlStatusError.UNKNOWN.getStatusDescription(message), "N/A", message, GqlStatusError.DIAGNOSTIC_RECORD, null));
                return;
            }
            sink.next(value);
        }));
        ApiTelemetryWork apiTelemetryWork = new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION);
        Flux repeatableWork = Flux.usingWhen(this.beginTransaction(mode, config, apiTelemetryWork, executeObservation), work, tx -> this.closeTransaction(tx, true, executeObservation), (tx, error) -> this.closeTransaction(tx, false, executeObservation), tx -> this.closeTransaction(tx, false, executeObservation)).contextWrite(executeObservation::writeReactiveContext);
        return ObservationUtil.observeStreams(executeObservation, this.session.retryLogic().retryRx(repeatableWork));
    }

    private <T> void releaseConnectionBeforeReturning(CompletableFuture<T> returnFuture, Throwable completionError) {
        Throwable error = Futures.completionExceptionCause(completionError);
        if (error instanceof TransactionNestingException) {
            returnFuture.completeExceptionally(error);
        } else {
            this.session.releaseConnectionAsync().whenComplete((ignored, closeError) -> returnFuture.completeExceptionally(Futures.combineErrors(error, closeError)));
        }
    }

    public Set<Bookmark> lastBookmarks() {
        return this.session.lastBookmarks();
    }

    protected <T> Publisher<T> run(Query query, TransactionConfig config, Function<RxResultCursor, T> cursorToResult, Observation parentObservation) {
        CompletableFuture cursorPublishFuture = new CompletableFuture();
        AtomicReference cursorReference = new AtomicReference();
        return RxUtils.createSingleItemPublisher(() -> this.runAsStage(query, config, cursorPublishFuture, parentObservation).thenApply(cursor -> {
            cursorReference.set(cursor);
            return cursor;
        }).thenApply(cursorToResult), () -> new IllegalStateException("Unexpected condition, run call has completed successfully with result being null"), value -> {
            if (value != null) {
                ((RxResultCursor)cursorReference.get()).rollback().whenComplete((unused, throwable) -> {
                    if (throwable != null) {
                        cursorPublishFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        cursorPublishFuture.complete(null);
                    }
                });
            }
        }).doOnNext(value -> cursorPublishFuture.complete((RxResultCursor)cursorReference.get())).doOnError(cursorPublishFuture::completeExceptionally);
    }

    private CompletionStage<RxResultCursor> runAsStage(Query query, TransactionConfig config, CompletionStage<RxResultCursor> finalStage, Observation parentObservation) {
        CompletionStage<Object> cursorStage;
        try {
            cursorStage = this.session.runRx(query, config, finalStage, parentObservation);
        }
        catch (Throwable t) {
            cursorStage = CompletableFuture.failedFuture(t);
        }
        return cursorStage.handle((cursor, throwable) -> {
            if (throwable != null) {
                return this.releaseConnectionAndRethrow((Throwable)throwable);
            }
            Throwable runError = cursor.getRunError();
            if (runError != null) {
                return this.releaseConnectionAndRethrow(runError);
            }
            return CompletableFuture.completedFuture(cursor);
        }).thenCompose(Function.identity());
    }

    private <T> CompletionStage<T> releaseConnectionAndRethrow(Throwable throwable) {
        return this.session.releaseConnectionAsync().handle((ignored, releaseThrowable) -> {
            if (releaseThrowable != null) {
                throw Futures.combineErrors(throwable, releaseThrowable);
            }
            if (throwable instanceof RuntimeException) {
                RuntimeException e = (RuntimeException)throwable;
                throw e;
            }
            throw new CompletionException(throwable);
        });
    }

    protected <T> Publisher<T> doClose(Observation parentObservation) {
        return RxUtils.createEmptyPublisher(() -> this.session.closeAsync(parentObservation));
    }
}

