/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.fabric.executor;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.neo4j.bolt.protocol.common.message.AccessMode;
import org.neo4j.cypher.internal.FullyParsedQuery;
import org.neo4j.cypher.internal.compiler.helpers.SignatureResolver;
import org.neo4j.cypher.internal.evaluator.StaticEvaluation;
import org.neo4j.cypher.internal.frontend.phases.InternalSyntaxUsageStats;
import org.neo4j.cypher.internal.frontend.phases.ProcedureSignatureResolver;
import org.neo4j.exceptions.InvalidSemanticsException;
import org.neo4j.fabric.config.FabricConfig;
import org.neo4j.fabric.eval.Catalog;
import org.neo4j.fabric.eval.UseEvaluation;
import org.neo4j.fabric.executor.CallInTransactionsExecutor;
import org.neo4j.fabric.executor.EffectiveQueryType;
import org.neo4j.fabric.executor.FabricSecondaryException;
import org.neo4j.fabric.executor.FragmentResult;
import org.neo4j.fabric.executor.Location;
import org.neo4j.fabric.executor.QueryStatementLifecycles;
import org.neo4j.fabric.executor.QueryTypes;
import org.neo4j.fabric.executor.SingleQueryFragmentExecutor;
import org.neo4j.fabric.executor.StandardQueryExecutor;
import org.neo4j.fabric.planning.FabricPlan;
import org.neo4j.fabric.planning.FabricPlanner;
import org.neo4j.fabric.planning.Fragment;
import org.neo4j.fabric.stream.Prefetcher;
import org.neo4j.fabric.stream.Record;
import org.neo4j.fabric.stream.Records;
import org.neo4j.fabric.stream.StatementResult;
import org.neo4j.fabric.stream.StatementResults;
import org.neo4j.fabric.stream.summary.MergedQueryStatistics;
import org.neo4j.fabric.stream.summary.MergedSummary;
import org.neo4j.fabric.stream.summary.Summary;
import org.neo4j.fabric.transaction.FabricTransaction;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.GqlStatusObject;
import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.internal.kernel.api.Procedures;
import org.neo4j.kernel.api.query.ExecutingQuery;
import org.neo4j.kernel.database.NormalizedDatabaseName;
import org.neo4j.kernel.impl.query.NotificationConfiguration;
import org.neo4j.kernel.impl.query.QueryRoutingMonitor;
import org.neo4j.logging.InternalLog;
import org.neo4j.logging.InternalLogProvider;
import org.neo4j.monitoring.Monitors;
import org.neo4j.values.virtual.MapValue;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.jdk.javaapi.CollectionConverters;

public class FabricExecutor {
    public static final String WRITING_IN_READ_NOT_ALLOWED_MSG = "Writing in read access mode not allowed";
    private final FabricConfig.DataStream dataStreamConfig;
    private final FabricPlanner planner;
    private final UseEvaluation useEvaluation;
    private final InternalLog log;
    private final QueryStatementLifecycles statementLifecycles;
    private final Executor fabricWorkerExecutor;
    private final QueryRoutingMonitor queryRoutingMonitor;
    private final InternalSyntaxUsageStats internalSyntaxUsageStats;

    public FabricExecutor(FabricConfig config, FabricPlanner planner, UseEvaluation useEvaluation, InternalLogProvider internalLog, QueryStatementLifecycles statementLifecycles, Executor fabricWorkerExecutor, Monitors monitors, InternalSyntaxUsageStats internalSyntaxUsageStats) {
        this.dataStreamConfig = config.getDataStream();
        this.planner = planner;
        this.useEvaluation = useEvaluation;
        this.log = internalLog.getLog(this.getClass());
        this.statementLifecycles = statementLifecycles;
        this.fabricWorkerExecutor = fabricWorkerExecutor;
        this.queryRoutingMonitor = (QueryRoutingMonitor)monitors.newMonitor(QueryRoutingMonitor.class, new String[0]);
        this.internalSyntaxUsageStats = internalSyntaxUsageStats;
    }

    public StatementResult run(FabricTransaction fabricTransaction, String statement, MapValue parameters) {
        ExecutingQuery.TransactionBinding transactionBinding = fabricTransaction.transactionBinding();
        QueryStatementLifecycles.StatementLifecycle lifecycle = this.statementLifecycles.create(fabricTransaction.getTransactionInfo(), statement, parameters, transactionBinding);
        lifecycle.startProcessing();
        Procedures procedures = fabricTransaction.contextlessProcedures();
        ProcedureSignatureResolver signatureResolver = SignatureResolver.from((Procedures)procedures);
        StaticEvaluation.StaticEvaluator evaluator = StaticEvaluation.from((Procedures)procedures);
        try {
            String defaultGraphName = fabricTransaction.getTransactionInfo().getSessionDatabaseReference().alias().name();
            Catalog catalog = fabricTransaction.getCatalogSnapshot();
            FabricPlanner.PlannerInstance plannerInstance = this.planner.instance(signatureResolver, statement, parameters, defaultGraphName, catalog, this.internalSyntaxUsageStats, fabricTransaction.cancellationChecker());
            FabricPlan plan = plannerInstance.plan();
            Fragment query = plan.query();
            lifecycle.doneFabricProcessing(plan, plannerInstance.query().options().offset().offset());
            AccessMode accessMode = fabricTransaction.getTransactionInfo().getAccessMode();
            if (plan.debugOptions().logPlan()) {
                this.log.debug(String.format("Fabric plan: %s", Fragment.pretty().asString(query)));
            }
            StatementResult statementResult = fabricTransaction.execute(ctx -> {
                UseEvaluation.Instance useEvaluator = this.useEvaluation.instance(evaluator, plannerInstance.signatureResolver(), statement, catalog);
                FabricStatementExecution execution = plan.debugOptions().logRecords() ? new FabricLoggingStatementExecution(plan, plannerInstance, useEvaluator, parameters, accessMode, (FabricTransaction.FabricExecutionContext)ctx, this.log, lifecycle, this.dataStreamConfig, fabricTransaction.getTransactionInfo().getQueryExecutionConfiguration().notificationFilters()) : new FabricStatementExecution(plan, plannerInstance, useEvaluator, parameters, accessMode, (FabricTransaction.FabricExecutionContext)ctx, lifecycle, this.dataStreamConfig, fabricTransaction.getTransactionInfo().getQueryExecutionConfiguration().notificationFilters());
                return execution.run();
            });
            return StatementResults.withErrorMapping(statementResult, FabricSecondaryException.class, FabricSecondaryException::getPrimaryException);
        }
        catch (RuntimeException e) {
            lifecycle.endFailure(e);
            throw e;
        }
    }

    public long clearQueryCachesForDatabase(String databaseName) {
        return this.planner.queryCache().clearByContext(databaseName);
    }

    private class FabricLoggingStatementExecution
    extends FabricStatementExecution {
        private final AtomicInteger step;
        private final InternalLog log;

        FabricLoggingStatementExecution(FabricPlan plan, FabricPlanner.PlannerInstance plannerInstance, UseEvaluation.Instance useEvaluator, MapValue params, AccessMode accessMode, FabricTransaction.FabricExecutionContext ctx, InternalLog log, QueryStatementLifecycles.StatementLifecycle lifecycle, FabricConfig.DataStream dataStreamConfig, NotificationConfiguration notificationConfiguration) {
            super(plan, plannerInstance, useEvaluator, params, accessMode, ctx, lifecycle, dataStreamConfig, notificationConfiguration);
            this.step = new AtomicInteger(0);
            this.log = log;
        }

        @Override
        SingleQueryFragmentExecutor.Tracer tracer() {
            return new SingleQueryFragmentExecutor.Tracer(){

                @Override
                public SingleQueryFragmentExecutor.RecordTracer remoteQueryStart(Location.Remote location, String queryString) {
                    String id = FabricLoggingStatementExecution.this.executionId();
                    FabricLoggingStatementExecution.this.trace(id, "remote " + FabricLoggingStatementExecution.nameString(location), FabricLoggingStatementExecution.this.compact(queryString));
                    return fragmentResult -> FabricLoggingStatementExecution.this.doTraceRecords(id, fragmentResult);
                }

                @Override
                public SingleQueryFragmentExecutor.RecordTracer localQueryStart(Location.Local location, FullyParsedQuery query) {
                    String id = FabricLoggingStatementExecution.this.executionId();
                    FabricLoggingStatementExecution.this.trace(id, "local " + FabricLoggingStatementExecution.nameString(location), FabricLoggingStatementExecution.this.compact(query.description()));
                    return fragmentResult -> FabricLoggingStatementExecution.this.doTraceRecords(id, fragmentResult);
                }
            };
        }

        private static String nameString(Location location) {
            Stream<String> namespace = location.databaseReference().namespace().map(NormalizedDatabaseName::name).stream();
            Stream<String> name = Stream.of(location.databaseReference().alias().name());
            return Stream.concat(namespace, name).collect(Collectors.joining("."));
        }

        private String compact(String in) {
            return in.replaceAll("\\r?\\n", " ").replaceAll("\\s+", " ");
        }

        private FragmentResult doTraceRecords(String id, FragmentResult fragmentResult) {
            Flux records = fragmentResult.records().doOnNext(record -> {
                String rec = IntStream.range(0, record.size()).mapToObj(i -> record.getValue(i).toString()).collect(Collectors.joining(", ", "[", "]"));
                this.trace(id, "output", rec);
            }).doOnError(err -> {
                String rec = err.getClass().getSimpleName() + ": " + err.getMessage();
                this.trace(id, "error", rec);
            }).doOnCancel(() -> this.trace(id, "cancel", "cancel")).doOnComplete(() -> this.trace(id, "complete", "complete"));
            return new FragmentResult((Flux<Record>)records, fragmentResult.planDescription(), fragmentResult.executionType());
        }

        private void trace(String id, String event, String data) {
            this.log.debug(String.format("%s: %s: %s", id, event, data));
        }

        private String executionId() {
            String stmtId = this.idString(this.hashCode());
            String step = this.idString(this.step.getAndIncrement());
            return String.format("%s/%s", stmtId, step);
        }

        private String idString(int code) {
            return String.format("%08X", code);
        }
    }

    private class FabricStatementExecution {
        private final FabricPlan plan;
        private final FabricPlanner.PlannerInstance plannerInstance;
        private final UseEvaluation.Instance useEvaluator;
        private final MapValue queryParams;
        private final FabricTransaction.FabricExecutionContext ctx;
        private final MergedQueryStatistics statistics = new MergedQueryStatistics();
        private final Set<Notification> notifications = ConcurrentHashMap.newKeySet();
        private final Set<GqlStatusObject> gqlStatusObjects = ConcurrentHashMap.newKeySet();
        private final AtomicReference<Collection<GqlStatusObject>> lastAddedGqlStatusObjects = new AtomicReference();
        private final QueryStatementLifecycles.StatementLifecycle lifecycle;
        private final Prefetcher prefetcher;
        private final AccessMode accessMode;
        private final NotificationConfiguration notificationConfiguration;

        FabricStatementExecution(FabricPlan plan, FabricPlanner.PlannerInstance plannerInstance, UseEvaluation.Instance useEvaluator, MapValue queryParams, AccessMode accessMode, FabricTransaction.FabricExecutionContext ctx, QueryStatementLifecycles.StatementLifecycle lifecycle, FabricConfig.DataStream dataStreamConfig, NotificationConfiguration notificationConfiguration) {
            this.plan = plan;
            this.plannerInstance = plannerInstance;
            this.useEvaluator = useEvaluator;
            this.queryParams = queryParams;
            this.ctx = ctx;
            this.lifecycle = lifecycle;
            this.prefetcher = new Prefetcher(dataStreamConfig);
            this.accessMode = accessMode;
            this.notificationConfiguration = notificationConfiguration;
        }

        StatementResult run() {
            Flux records;
            java.util.List columns;
            List filteredNotifications = ((Iterable)this.plan.notifications().filter(arg_0 -> ((NotificationConfiguration)this.notificationConfiguration).includes(arg_0))).toList();
            this.notifications.addAll(CollectionConverters.asJava((Seq)filteredNotifications));
            this.gqlStatusObjects.addAll(CollectionConverters.asJava((Seq)filteredNotifications));
            this.lifecycle.startExecution(false);
            Fragment query = this.plan.query();
            if (this.plan.executionType() == FabricPlan.EXPLAIN() && this.plan.inCompositeContext()) {
                this.lifecycle.endSuccess();
                return StatementResults.create(CollectionConverters.asJava(query.outputColumns()), (Flux<Record>)Flux.empty(), (Mono<Summary>)Mono.just((Object)new MergedSummary((Mono<ExecutionPlanDescription>)Mono.just((Object)this.plan.query().description()), this.statistics, this.notifications, this.gqlStatusObjects, this.lastAddedGqlStatusObjects)), (Mono<QueryExecutionType>)Mono.just((Object)EffectiveQueryType.queryExecutionType(this.plan, this.accessMode)));
            }
            FragmentResult fragmentResult = this.run(query, null);
            if (query.producesResults()) {
                columns = CollectionConverters.asJava(query.outputColumns());
                records = fragmentResult.records();
            } else {
                columns = Collections.emptyList();
                records = fragmentResult.records().then(Mono.empty()).flux();
            }
            Mono summary = Mono.just((Object)new MergedSummary(fragmentResult.planDescription(), this.statistics, this.notifications, this.gqlStatusObjects, this.lastAddedGqlStatusObjects));
            return StatementResults.create(columns, (Flux<Record>)records.doOnComplete(this.lifecycle::endSuccess).doOnCancel(this.lifecycle::endSuccess).doOnError(this.lifecycle::endFailure), (Mono<Summary>)summary, fragmentResult.executionType());
        }

        FragmentResult run(Fragment fragment, Record argument) {
            if (fragment instanceof Fragment.Init) {
                return this.runInit();
            }
            if (fragment instanceof Fragment.Apply) {
                Fragment.Apply apply = (Fragment.Apply)fragment;
                if (apply.inTransactionsParameters().isEmpty()) {
                    return this.runApply(apply, argument);
                }
                return this.runCallInTransactions(apply, argument);
            }
            if (fragment instanceof Fragment.Union) {
                return this.runUnion((Fragment.Union)fragment, argument);
            }
            if (fragment instanceof Fragment.Exec) {
                return this.runExec((Fragment.Exec)fragment, argument);
            }
            throw this.notImplemented("Invalid query fragment", fragment);
        }

        FragmentResult runInit() {
            return new FragmentResult((Flux<Record>)Flux.just((Object)Records.empty()), (Mono<ExecutionPlanDescription>)Mono.empty(), (Mono<QueryExecutionType>)Mono.empty());
        }

        FragmentResult runApply(Fragment.Apply apply, Record argument) {
            FragmentResult input = this.run(apply.input(), argument);
            Function<Record, Publisher> runInner = apply.inner().outputColumns().isEmpty() ? record -> this.runAndProduceOnlyRecord(apply.inner(), (Record)record) : record -> this.runAndProduceJoinedResult(apply.inner(), (Record)record);
            Flux resultRecords = input.records().flatMap(runInner, FabricExecutor.this.dataStreamConfig.getConcurrency(), 1);
            Mono executionType = Mono.just((Object)EffectiveQueryType.queryExecutionType(this.plan, this.accessMode));
            return new FragmentResult((Flux<Record>)resultRecords, (Mono<ExecutionPlanDescription>)Mono.empty(), (Mono<QueryExecutionType>)executionType);
        }

        private Flux<Record> runAndProduceJoinedResult(Fragment fragment, Record record) {
            return this.run(fragment, record).records().map(outputRecord -> Records.join(record, outputRecord));
        }

        private Mono<Record> runAndProduceOnlyRecord(Fragment fragment, Record record) {
            return this.run(fragment, record).records().then(Mono.just((Object)record));
        }

        FragmentResult runUnion(Fragment.Union union, Record argument) {
            FragmentResult lhs = this.run(union.lhs(), argument);
            FragmentResult rhs = this.run(union.rhs(), argument);
            Flux merged = Flux.merge((Publisher[])new Publisher[]{lhs.records(), rhs.records()});
            Mono<QueryExecutionType> executionType = this.mergeExecutionType(lhs.executionType(), rhs.executionType());
            if (union.distinct()) {
                return new FragmentResult((Flux<Record>)merged.distinct(), (Mono<ExecutionPlanDescription>)Mono.empty(), executionType);
            }
            return new FragmentResult((Flux<Record>)merged, (Mono<ExecutionPlanDescription>)Mono.empty(), executionType);
        }

        FragmentResult runExec(Fragment.Exec fragment, Record argument) {
            return new StandardQueryExecutor(fragment, this.plannerInstance, FabricExecutor.this.fabricWorkerExecutor, this.ctx, this.useEvaluator, this.plan, this.queryParams, this.accessMode, this.notifications, this.gqlStatusObjects, this.lastAddedGqlStatusObjects, this.lifecycle, this.prefetcher, FabricExecutor.this.queryRoutingMonitor, this.statistics, this.tracer(), this::run).run(argument);
        }

        FragmentResult runCallInTransactions(Fragment.Apply fragment, Record argument) {
            Flux<Record> resultRecords = new CallInTransactionsExecutor(fragment, this.plannerInstance, FabricExecutor.this.fabricWorkerExecutor, this.ctx, this.useEvaluator, this.plan, this.queryParams, this.accessMode, this.notifications, this.gqlStatusObjects, this.lastAddedGqlStatusObjects, this.lifecycle, this.prefetcher, FabricExecutor.this.queryRoutingMonitor, this.statistics, this.tracer(), this::run).run(argument);
            Mono executionType = Mono.just((Object)EffectiveQueryType.queryExecutionType(this.plan, this.accessMode));
            return new FragmentResult(resultRecords, (Mono<ExecutionPlanDescription>)Mono.empty(), (Mono<QueryExecutionType>)executionType);
        }

        SingleQueryFragmentExecutor.Tracer tracer() {
            return new SingleQueryFragmentExecutor.Tracer(){

                @Override
                public SingleQueryFragmentExecutor.RecordTracer remoteQueryStart(Location.Remote location, String queryString) {
                    return fragmentResult -> fragmentResult;
                }

                @Override
                public SingleQueryFragmentExecutor.RecordTracer localQueryStart(Location.Local location, FullyParsedQuery query) {
                    return fragmentResult -> fragmentResult;
                }
            };
        }

        private Mono<QueryExecutionType> mergeExecutionType(Mono<QueryExecutionType> lhs, Mono<QueryExecutionType> rhs) {
            return Mono.zip(lhs, rhs).map(both -> QueryTypes.merge((QueryExecutionType)both.getT1(), (QueryExecutionType)both.getT2())).switchIfEmpty(lhs).switchIfEmpty(rhs);
        }

        private RuntimeException notImplemented(String msg, Object object) {
            return this.notImplemented(msg, object.toString());
        }

        private RuntimeException notImplemented(String msg, String info) {
            return new InvalidSemanticsException(msg + ": " + info);
        }
    }
}

