/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.fabric.executor;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.neo4j.bolt.runtime.AccessMode;
import org.neo4j.bolt.v41.messaging.RoutingContext;
import org.neo4j.configuration.helpers.NormalizedDatabaseName;
import org.neo4j.cypher.internal.FullyParsedQuery;
import org.neo4j.cypher.internal.ast.CatalogName;
import org.neo4j.cypher.internal.ast.GraphSelection;
import org.neo4j.exceptions.InvalidSemanticsException;
import org.neo4j.fabric.config.FabricConfig;
import org.neo4j.fabric.eval.Catalog;
import org.neo4j.fabric.eval.CatalogManager;
import org.neo4j.fabric.eval.UseEvaluation;
import org.neo4j.fabric.executor.EffectiveQueryType;
import org.neo4j.fabric.executor.ExecutionOptions;
import org.neo4j.fabric.executor.FabricException;
import org.neo4j.fabric.executor.FabricExecutionStatementResultImpl;
import org.neo4j.fabric.executor.FabricKernelTransaction;
import org.neo4j.fabric.executor.FabricSecondaryException;
import org.neo4j.fabric.executor.FabricStatementLifecycles;
import org.neo4j.fabric.executor.Location;
import org.neo4j.fabric.executor.QueryTypes;
import org.neo4j.fabric.executor.RemoteExecutionPlanDescription;
import org.neo4j.fabric.executor.TaggingPlanDescriptionWrapper;
import org.neo4j.fabric.planning.FabricPlan;
import org.neo4j.fabric.planning.FabricPlanner;
import org.neo4j.fabric.planning.FabricQuery;
import org.neo4j.fabric.planning.Fragment;
import org.neo4j.fabric.planning.QueryType;
import org.neo4j.fabric.stream.CompletionDelegatingOperator;
import org.neo4j.fabric.stream.Prefetcher;
import org.neo4j.fabric.stream.Record;
import org.neo4j.fabric.stream.Records;
import org.neo4j.fabric.stream.StatementResult;
import org.neo4j.fabric.stream.StatementResults;
import org.neo4j.fabric.stream.summary.MergedQueryStatistics;
import org.neo4j.fabric.stream.summary.MergedSummary;
import org.neo4j.fabric.stream.summary.Summary;
import org.neo4j.fabric.transaction.FabricTransaction;
import org.neo4j.fabric.transaction.TransactionMode;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.database.NamedDatabaseId;
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
import org.neo4j.values.AnyValue;
import org.neo4j.values.storable.Values;
import org.neo4j.values.virtual.MapValue;
import org.neo4j.values.virtual.MapValueBuilder;
import org.neo4j.values.virtual.PathValue;
import org.neo4j.values.virtual.VirtualNodeValue;
import org.neo4j.values.virtual.VirtualRelationshipValue;
import org.neo4j.values.virtual.VirtualValues;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import scala.collection.JavaConverters;
import scala.collection.immutable.List;

public class FabricExecutor {
    private final FabricConfig.DataStream dataStreamConfig;
    private final FabricPlanner planner;
    private final UseEvaluation useEvaluation;
    private final CatalogManager catalogManager;
    private final Log log;
    private final FabricStatementLifecycles statementLifecycles;
    private final Executor fabricWorkerExecutor;
    private final Optional<NormalizedDatabaseName> fabricDatabaseName;

    public FabricExecutor(FabricConfig config, FabricPlanner planner, UseEvaluation useEvaluation, CatalogManager catalogManager, LogProvider internalLog, FabricStatementLifecycles statementLifecycles, Executor fabricWorkerExecutor) {
        this.fabricDatabaseName = config.getFabricDatabaseName();
        this.dataStreamConfig = config.getDataStream();
        this.planner = planner;
        this.useEvaluation = useEvaluation;
        this.catalogManager = catalogManager;
        this.log = internalLog.getLog(this.getClass());
        this.statementLifecycles = statementLifecycles;
        this.fabricWorkerExecutor = fabricWorkerExecutor;
    }

    public StatementResult run(FabricTransaction fabricTransaction, String statement, MapValue parameters) {
        FabricStatementLifecycles.StatementLifecycle lifecycle = this.statementLifecycles.create(fabricTransaction.getTransactionInfo(), statement, parameters);
        lifecycle.startProcessing();
        fabricTransaction.setLastSubmittedStatement(lifecycle);
        try {
            String defaultGraphName = fabricTransaction.getTransactionInfo().getSessionDatabaseId().name();
            FabricPlanner.PlannerInstance plannerInstance = this.planner.instance(statement, parameters, defaultGraphName);
            UseEvaluation.Instance useEvaluator = this.useEvaluation.instance(statement);
            FabricPlan plan = plannerInstance.plan();
            Fragment query = plan.query();
            lifecycle.doneFabricProcessing(plan);
            AccessMode accessMode = fabricTransaction.getTransactionInfo().getAccessMode();
            RoutingContext routingContext = fabricTransaction.getTransactionInfo().getRoutingContext();
            if (plan.debugOptions().logPlan()) {
                this.log.debug(String.format("Fabric plan: %s", Fragment.pretty().asString(query)));
            }
            StatementResult statementResult = fabricTransaction.execute(ctx -> {
                FabricStatementExecution execution = plan.debugOptions().logRecords() ? new FabricLoggingStatementExecution(plan, plannerInstance, useEvaluator, parameters, accessMode, routingContext, (FabricTransaction.FabricExecutionContext)ctx, this.log, lifecycle, this.dataStreamConfig) : new FabricStatementExecution(plan, plannerInstance, useEvaluator, parameters, accessMode, routingContext, (FabricTransaction.FabricExecutionContext)ctx, lifecycle, this.dataStreamConfig);
                return execution.run();
            });
            StatementResult resultWithErrorMapping = StatementResults.withErrorMapping(statementResult, FabricSecondaryException.class, FabricSecondaryException::getPrimaryException);
            return new FabricExecutionStatementResultImpl(resultWithErrorMapping, failure -> this.rollbackOnFailure(fabricTransaction, (Throwable)failure));
        }
        catch (RuntimeException e) {
            lifecycle.endFailure(e);
            this.rollbackOnFailure(fabricTransaction, e);
            throw e;
        }
    }

    public long clearQueryCachesForDatabase(String databaseName) {
        return this.planner.queryCache().clearByContext(databaseName);
    }

    private void rollbackOnFailure(FabricTransaction fabricTransaction, Throwable failure) {
        block2: {
            try {
                fabricTransaction.rollback();
            }
            catch (Exception rollbackFailure) {
                if (rollbackFailure == failure) break block2;
                failure.addSuppressed(rollbackFailure);
            }
        }
    }

    public boolean isPeriodicCommit(String query) {
        return this.planner.isPeriodicCommit(query);
    }

    @Deprecated
    public InternalTransaction forceKernelTxCreation(FabricTransaction fabricTransaction) {
        try {
            NamedDatabaseId dbId = fabricTransaction.getTransactionInfo().getSessionDatabaseId();
            String dbName = dbId.name();
            Catalog.Graph graph = this.catalogManager.currentCatalog().resolve(CatalogName.apply((String)dbName, (List)List.empty()));
            Location.Local location = (Location.Local)this.catalogManager.locationOf(dbId, graph, false, false);
            CompletableFuture internalTransaction = new CompletableFuture();
            fabricTransaction.execute(ctx -> {
                FabricKernelTransaction fabricKernelTransaction = ctx.getLocal().getOrCreateTx(location, TransactionMode.MAYBE_WRITE);
                internalTransaction.complete(fabricKernelTransaction.getInternalTransaction());
                return StatementResults.initial();
            });
            return (InternalTransaction)internalTransaction.get();
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to force open local transaction", e);
        }
    }

    private class FabricLoggingStatementExecution
    extends FabricStatementExecution {
        private final AtomicInteger step;
        private final Log log;

        FabricLoggingStatementExecution(FabricPlan plan, FabricPlanner.PlannerInstance plannerInstance, UseEvaluation.Instance useEvaluator, MapValue params, AccessMode accessMode, RoutingContext routingContext, FabricTransaction.FabricExecutionContext ctx, Log log, FabricStatementLifecycles.StatementLifecycle lifecycle, FabricConfig.DataStream dataStreamConfig) {
            super(plan, plannerInstance, useEvaluator, params, accessMode, routingContext, ctx, lifecycle, dataStreamConfig);
            this.step = new AtomicInteger(0);
            this.log = log;
        }

        @Override
        FragmentResult runLocalQueryAt(Location.Local location, TransactionMode transactionMode, FullyParsedQuery query, MapValue parameters, Flux<Record> input) {
            String id = this.executionId();
            this.trace(id, "local " + location.getGraphId(), this.compact(query.description()));
            return this.traceRecords(id, super.runLocalQueryAt(location, transactionMode, query, parameters, input));
        }

        @Override
        FragmentResult runRemoteQueryAt(Location.Remote location, TransactionMode transactionMode, String queryString, MapValue parameters) {
            String id = this.executionId();
            this.trace(id, "remote " + location.getGraphId(), this.compact(queryString));
            return this.traceRecords(id, super.runRemoteQueryAt(location, transactionMode, queryString, parameters));
        }

        private String compact(String in) {
            return in.replaceAll("\\r?\\n", " ").replaceAll("\\s+", " ");
        }

        private FragmentResult traceRecords(String id, FragmentResult fragmentResult) {
            Flux records = fragmentResult.records.doOnNext(record -> {
                String rec = IntStream.range(0, record.size()).mapToObj(i -> record.getValue(i).toString()).collect(Collectors.joining(", ", "[", "]"));
                this.trace(id, "output", rec);
            }).doOnError(err -> {
                String rec = err.getClass().getSimpleName() + ": " + err.getMessage();
                this.trace(id, "error", rec);
            }).doOnCancel(() -> this.trace(id, "cancel", "cancel")).doOnComplete(() -> this.trace(id, "complete", "complete"));
            return new FragmentResult((Flux<Record>)records, fragmentResult.planDescription, fragmentResult.executionType);
        }

        private void trace(String id, String event, String data) {
            this.log.debug(String.format("%s: %s: %s", id, event, data));
        }

        private String executionId() {
            String stmtId = this.idString(this.hashCode());
            String step = this.idString(this.step.getAndIncrement());
            return String.format("%s/%s", stmtId, step);
        }

        private String idString(int code) {
            return String.format("%08X", code);
        }
    }

    private class FragmentResult {
        private final Flux<Record> records;
        private final Mono<ExecutionPlanDescription> planDescription;
        private final Mono<QueryExecutionType> executionType;

        FragmentResult(Flux<Record> records, Mono<ExecutionPlanDescription> planDescription, Mono<QueryExecutionType> executionType) {
            this.records = records;
            this.planDescription = planDescription;
            this.executionType = executionType;
        }
    }

    private class FabricStatementExecution {
        private final FabricPlan plan;
        private final FabricPlanner.PlannerInstance plannerInstance;
        private final UseEvaluation.Instance useEvaluator;
        private final MapValue queryParams;
        private final FabricTransaction.FabricExecutionContext ctx;
        private final MergedQueryStatistics statistics = new MergedQueryStatistics();
        private final Set<Notification> notifications = ConcurrentHashMap.newKeySet();
        private final FabricStatementLifecycles.StatementLifecycle lifecycle;
        private final Prefetcher prefetcher;
        private final AccessMode accessMode;
        private final RoutingContext routingContext;

        FabricStatementExecution(FabricPlan plan, FabricPlanner.PlannerInstance plannerInstance, UseEvaluation.Instance useEvaluator, MapValue queryParams, AccessMode accessMode, RoutingContext routingContext, FabricTransaction.FabricExecutionContext ctx, FabricStatementLifecycles.StatementLifecycle lifecycle, FabricConfig.DataStream dataStreamConfig) {
            this.plan = plan;
            this.plannerInstance = plannerInstance;
            this.useEvaluator = useEvaluator;
            this.queryParams = queryParams;
            this.ctx = ctx;
            this.lifecycle = lifecycle;
            this.prefetcher = new Prefetcher(dataStreamConfig);
            this.accessMode = accessMode;
            this.routingContext = routingContext;
        }

        StatementResult run() {
            this.notifications.addAll(JavaConverters.seqAsJavaList(this.plan.notifications()));
            this.lifecycle.startExecution(false);
            Fragment query = this.plan.query();
            Flux columns = Flux.fromIterable((Iterable)JavaConverters.asJavaIterable(query.outputColumns()));
            if (this.plan.executionType() == FabricPlan.EXPLAIN() && this.plan.inFabricContext()) {
                this.lifecycle.endSuccess();
                return StatementResults.create((Flux<String>)columns, (Flux<Record>)Flux.empty(), (Mono<Summary>)Mono.just((Object)new MergedSummary((Mono<ExecutionPlanDescription>)Mono.just((Object)this.plan.query().description()), this.statistics, this.notifications)), (Mono<QueryExecutionType>)Mono.just((Object)EffectiveQueryType.queryExecutionType(this.plan, this.accessMode)));
            }
            FragmentResult fragmentResult = this.run(query, null);
            return StatementResults.create((Flux<String>)columns, (Flux<Record>)fragmentResult.records.doOnComplete(this.lifecycle::endSuccess).doOnCancel(this.lifecycle::endSuccess).doOnError(this.lifecycle::endFailure), (Mono<Summary>)Mono.just((Object)new MergedSummary(fragmentResult.planDescription, this.statistics, this.notifications)), fragmentResult.executionType);
        }

        FragmentResult run(Fragment fragment, Record argument) {
            if (fragment instanceof Fragment.Init) {
                return this.runInit();
            }
            if (fragment instanceof Fragment.Apply) {
                return this.runApply((Fragment.Apply)fragment, argument);
            }
            if (fragment instanceof Fragment.Union) {
                return this.runUnion((Fragment.Union)fragment, argument);
            }
            if (fragment instanceof Fragment.Exec) {
                return this.runExec((Fragment.Exec)fragment, argument);
            }
            throw this.notImplemented("Invalid query fragment", fragment);
        }

        FragmentResult runInit() {
            return new FragmentResult((Flux<Record>)Flux.just((Object)Records.empty()), (Mono<ExecutionPlanDescription>)Mono.empty(), (Mono<QueryExecutionType>)Mono.empty());
        }

        FragmentResult runApply(Fragment.Apply apply2, Record argument) {
            FragmentResult input = this.run(apply2.input(), argument);
            Flux resultRecords = input.records.flatMap(record -> this.run((Fragment)apply2.inner(), (Record)record).records.map(outputRecord -> Records.join(record, outputRecord)), FabricExecutor.this.dataStreamConfig.getConcurrency(), 1);
            Mono executionType = Mono.just((Object)EffectiveQueryType.queryExecutionType(this.plan, this.accessMode));
            return new FragmentResult((Flux<Record>)resultRecords, (Mono<ExecutionPlanDescription>)Mono.empty(), (Mono<QueryExecutionType>)executionType);
        }

        FragmentResult runUnion(Fragment.Union union, Record argument) {
            FragmentResult lhs = this.run(union.lhs(), argument);
            FragmentResult rhs = this.run(union.rhs(), argument);
            Flux merged = Flux.merge((Publisher[])new Publisher[]{lhs.records, rhs.records});
            Mono<QueryExecutionType> executionType = this.mergeExecutionType(lhs.executionType, rhs.executionType);
            if (union.distinct()) {
                return new FragmentResult((Flux<Record>)merged.distinct(), (Mono<ExecutionPlanDescription>)Mono.empty(), executionType);
            }
            return new FragmentResult((Flux<Record>)merged, (Mono<ExecutionPlanDescription>)Mono.empty(), executionType);
        }

        FragmentResult runExec(Fragment.Exec fragment, Record argument) {
            this.ctx.validateStatementType(fragment.statementType());
            Map<String, AnyValue> argumentValues = this.argumentValues(fragment, argument);
            MapValue parameters = this.addParamsFromRecord(this.queryParams, argumentValues, JavaConverters.mapAsJavaMap(fragment.parameters()));
            Catalog.Graph graph = this.evalUse(fragment.use().graphSelection(), argumentValues);
            TransactionMode transactionMode = this.getTransactionMode(fragment.queryType(), graph.toString());
            Location location = FabricExecutor.this.catalogManager.locationOf(this.ctx.getSessionDatabaseId(), graph, transactionMode.requiresWrite(), this.routingContext.isServerRoutingEnabled());
            if (location instanceof Location.Local) {
                Location.Local local = (Location.Local)location;
                FragmentResult input = this.run(fragment.input(), argument);
                if (fragment.executable()) {
                    FabricQuery.LocalQuery localQuery = this.plannerInstance.asLocal(fragment);
                    FragmentResult fragmentResult = this.runLocalQueryAt(local, transactionMode, localQuery.query(), parameters, input.records);
                    Mono<QueryExecutionType> executionType = this.mergeExecutionType(input.executionType, fragmentResult.executionType);
                    return new FragmentResult(fragmentResult.records, fragmentResult.planDescription, executionType);
                }
                return input;
            }
            if (location instanceof Location.Remote) {
                Location.Remote remote = (Location.Remote)location;
                FabricQuery.RemoteQuery remoteQuery = this.plannerInstance.asRemote(fragment);
                MapValue fullParams = this.addParams(parameters, JavaConverters.mapAsJavaMap(remoteQuery.extractedLiterals()));
                return this.runRemoteQueryAt(remote, transactionMode, remoteQuery.query(), fullParams);
            }
            throw this.notImplemented("Invalid graph location", location);
        }

        FragmentResult runLocalQueryAt(Location.Local location, TransactionMode transactionMode, FullyParsedQuery query, MapValue parameters, Flux<Record> input) {
            ExecutionOptions executionOptions = this.plan.inFabricContext() && !this.isFabricDatabase(location) ? new ExecutionOptions(location.getGraphId()) : new ExecutionOptions();
            StatementResult localStatementResult = this.ctx.getLocal().run(location, transactionMode, this.lifecycle, query, parameters, input, executionOptions);
            Flux records = localStatementResult.records().doOnComplete(() -> localStatementResult.summary().subscribe(this::updateSummary));
            Mono planDescription = localStatementResult.summary().map(Summary::executionPlanDescription).map(pd -> new TaggingPlanDescriptionWrapper((ExecutionPlanDescription)pd, location.getDatabaseName()));
            return new FragmentResult((Flux<Record>)records, (Mono<ExecutionPlanDescription>)planDescription, localStatementResult.executionType());
        }

        FragmentResult runRemoteQueryAt(Location.Remote location, TransactionMode transactionMode, String queryString, MapValue parameters) {
            ExecutionOptions executionOptions = this.plan.inFabricContext() ? new ExecutionOptions(location.getGraphId()) : new ExecutionOptions();
            this.lifecycle.startExecution(true);
            Mono<StatementResult> statementResult = this.ctx.getRemote().run(location, executionOptions, queryString, transactionMode, parameters);
            Flux records = statementResult.flatMapMany(sr -> sr.records().doOnComplete(() -> sr.summary().subscribe(this::updateSummary)));
            CompletionDelegatingOperator recordsWithCompletionDelegation = new CompletionDelegatingOperator((Flux<Record>)records, FabricExecutor.this.fabricWorkerExecutor);
            Flux<Record> prefetchedRecords = this.prefetcher.addPrefetch((Flux<Record>)recordsWithCompletionDelegation);
            Mono planDescription = statementResult.flatMap(StatementResult::summary).map(Summary::executionPlanDescription).map(remotePlanDescription -> new RemoteExecutionPlanDescription((ExecutionPlanDescription)remotePlanDescription, location));
            Mono executionType = Mono.just((Object)EffectiveQueryType.queryExecutionType(this.plan, this.accessMode));
            return new FragmentResult(prefetchedRecords, (Mono<ExecutionPlanDescription>)planDescription, (Mono<QueryExecutionType>)executionType);
        }

        private Map<String, AnyValue> argumentValues(Fragment fragment, Record argument) {
            if (argument == null) {
                return Map.of();
            }
            return Records.asMap(argument, JavaConverters.seqAsJavaList(fragment.argumentColumns()));
        }

        private Catalog.Graph evalUse(GraphSelection selection, Map<String, AnyValue> record) {
            return this.useEvaluator.evaluate(selection, this.queryParams, record);
        }

        private MapValue addParamsFromRecord(MapValue params, Map<String, AnyValue> record, Map<String, String> bindings) {
            int resultSize = params.size() + bindings.size();
            if (resultSize == 0) {
                return VirtualValues.EMPTY_MAP;
            }
            MapValueBuilder builder = new MapValueBuilder(resultSize);
            params.foreach((arg_0, arg_1) -> ((MapValueBuilder)builder).add(arg_0, arg_1));
            bindings.forEach((var, par) -> builder.add(par, this.validateValue((AnyValue)record.get(var))));
            return builder.build();
        }

        private MapValue addParams(MapValue params, Map<String, Object> newValues) {
            int resultSize = params.size() + newValues.size();
            if (resultSize == 0 || newValues.size() == 0) {
                return params;
            }
            MapValueBuilder builder = new MapValueBuilder(resultSize);
            params.foreach((arg_0, arg_1) -> ((MapValueBuilder)builder).add(arg_0, arg_1));
            newValues.forEach((key, val) -> builder.add(key, (AnyValue)Values.of((Object)val)));
            return builder.build();
        }

        private AnyValue validateValue(AnyValue value) {
            if (value instanceof VirtualNodeValue) {
                throw new FabricException((Status)Status.Statement.TypeError, "Importing node values in remote subqueries is currently not supported", new Object[0]);
            }
            if (value instanceof VirtualRelationshipValue) {
                throw new FabricException((Status)Status.Statement.TypeError, "Importing relationship values in remote subqueries is currently not supported", new Object[0]);
            }
            if (value instanceof PathValue) {
                throw new FabricException((Status)Status.Statement.TypeError, "Importing path values in remote subqueries is currently not supported", new Object[0]);
            }
            return value;
        }

        private boolean isFabricDatabase(Location.Local location) {
            return FabricExecutor.this.fabricDatabaseName.map(name -> name.name().equals(location.getDatabaseName())).orElse(false);
        }

        private void updateSummary(Summary summary) {
            if (summary != null) {
                this.statistics.add(summary.getQueryStatistics());
                this.notifications.addAll(summary.getNotifications());
            }
        }

        private Mono<QueryExecutionType> mergeExecutionType(Mono<QueryExecutionType> lhs, Mono<QueryExecutionType> rhs) {
            return Mono.zip(lhs, rhs).map(both -> QueryTypes.merge((QueryExecutionType)both.getT1(), (QueryExecutionType)both.getT2())).switchIfEmpty(lhs).switchIfEmpty(rhs);
        }

        private RuntimeException notImplemented(String msg, Object object) {
            return this.notImplemented(msg, object.toString());
        }

        private RuntimeException notImplemented(String msg, String info) {
            return new InvalidSemanticsException(msg + ": " + info);
        }

        private TransactionMode getTransactionMode(QueryType queryType, String graph) {
            if (this.plan.executionType() == FabricPlan.EXPLAIN()) {
                return TransactionMode.DEFINITELY_READ;
            }
            AccessMode queryMode = EffectiveQueryType.effectiveAccessMode(this.accessMode, queryType);
            if (this.accessMode == AccessMode.WRITE) {
                if (queryMode == AccessMode.WRITE) {
                    return TransactionMode.DEFINITELY_WRITE;
                }
                return TransactionMode.MAYBE_WRITE;
            }
            if (queryMode == AccessMode.WRITE) {
                throw new FabricException((Status)Status.Statement.AccessMode, "Writing in read access mode not allowed. Attempted write to %s", graph);
            }
            return TransactionMode.DEFINITELY_READ;
        }
    }
}

