/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.queryhandling;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.core.Context;
import org.axonframework.messaging.core.MessageStream;
import org.axonframework.messaging.core.QualifiedName;
import org.axonframework.messaging.core.QueueMessageStream;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.core.unitofwork.UnitOfWork;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
import org.axonframework.messaging.queryhandling.DuplicateQueryHandlerSubscriptionException;
import org.axonframework.messaging.queryhandling.NoHandlerForQueryException;
import org.axonframework.messaging.queryhandling.QueryBus;
import org.axonframework.messaging.queryhandling.QueryExecutionException;
import org.axonframework.messaging.queryhandling.QueryHandler;
import org.axonframework.messaging.queryhandling.QueryMessage;
import org.axonframework.messaging.queryhandling.QueryResponseMessage;
import org.axonframework.messaging.queryhandling.SubscriptionQueryAlreadyRegisteredException;
import org.axonframework.messaging.queryhandling.SubscriptionQueryUpdateMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleQueryBus
implements QueryBus {
    private static final Logger logger = LoggerFactory.getLogger(SimpleQueryBus.class);
    private static final Context.ResourceKey<List<Runnable>> UPDATE_TASKS_KEY = Context.ResourceKey.withLabel("update-tasks");
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final ConcurrentMap<QualifiedName, QueryHandler> subscriptions = new ConcurrentHashMap<QualifiedName, QueryHandler>();
    private final ConcurrentMap<QueryMessage, QueueMessageStream<SubscriptionQueryUpdateMessage>> updateHandlers = new ConcurrentHashMap<QueryMessage, QueueMessageStream<SubscriptionQueryUpdateMessage>>();

    public SimpleQueryBus(@Nonnull UnitOfWorkFactory unitOfWorkFactory) {
        this.unitOfWorkFactory = Objects.requireNonNull(unitOfWorkFactory, "The UnitOfWorkFactory must be provided.");
    }

    @Override
    public QueryBus subscribe(@Nonnull QualifiedName queryName, @Nonnull QueryHandler queryHandler) {
        logger.debug("Subscribing query handler for name [{}].", (Object)queryName);
        QueryHandler existingHandler = this.subscriptions.putIfAbsent(queryName, queryHandler);
        if (existingHandler != null && existingHandler != queryHandler) {
            throw new DuplicateQueryHandlerSubscriptionException(queryName, existingHandler, queryHandler);
        }
        return this;
    }

    @Override
    @Nonnull
    public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage query, @Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Dispatching direct-query for query name [{}].", (Object)query.type().name());
        }
        try {
            MessageStream.Single responseStream = this.handle(query, this.handlerFor(query)).get();
            return SimpleQueryBus.containsResponseOrUserException(responseStream) ? responseStream : MessageStream.empty().cast();
        }
        catch (Exception e) {
            return MessageStream.failed(e);
        }
    }

    @Nonnull
    private CompletableFuture<MessageStream<QueryResponseMessage>> handle(@Nonnull QueryMessage query, @Nonnull QueryHandler handler) {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling query [{} {name={}]", (Object)query.identifier(), (Object)query.type());
        }
        UnitOfWork unitOfWork = this.unitOfWorkFactory.create();
        return unitOfWork.executeWithResult(context -> {
            MessageStream<QueryResponseMessage> result;
            try {
                result = handler.handle(query, (ProcessingContext)context);
            }
            catch (Exception e) {
                result = MessageStream.failed(e);
            }
            return CompletableFuture.completedFuture(result);
        });
    }

    private static boolean containsResponseOrUserException(MessageStream<QueryResponseMessage> responseStream) {
        return !responseStream.isCompleted() || responseStream.error().map(e -> !(e instanceof NoHandlerForQueryException)).orElse(false) != false;
    }

    @Override
    @Nonnull
    public MessageStream<QueryResponseMessage> subscriptionQuery(@Nonnull QueryMessage query, @Nullable ProcessingContext context, int updateBufferSize) {
        MessageStream<SubscriptionQueryUpdateMessage> updates = this.subscribeToUpdates(query, updateBufferSize);
        MessageStream<QueryResponseMessage> initialResult = this.query(query, context);
        return initialResult.concatWith(updates.cast());
    }

    @Override
    @Nonnull
    public MessageStream<SubscriptionQueryUpdateMessage> subscribeToUpdates(@Nonnull QueryMessage query, int updateBufferSize) {
        if (this.hasHandlerFor(query.identifier())) {
            throw new SubscriptionQueryAlreadyRegisteredException(query.identifier());
        }
        QueueMessageStream output = new QueueMessageStream(new ArrayBlockingQueue(updateBufferSize));
        QueueMessageStream previous = this.updateHandlers.put(query, output);
        if (previous != null) {
            previous.close();
        }
        return output.onClose(() -> this.updateHandlers.remove(query, output));
    }

    private boolean hasHandlerFor(String queryId) {
        return this.updateHandlers.keySet().stream().anyMatch(m -> m.identifier().equals(queryId));
    }

    @Nonnull
    private QueryHandler handlerFor(@Nonnull QueryMessage query) {
        QualifiedName handlerName = query.type().qualifiedName();
        if (!this.subscriptions.containsKey(handlerName)) {
            throw NoHandlerForQueryException.forBus(query);
        }
        return (QueryHandler)this.subscriptions.get(handlerName);
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> emitUpdate(@Nonnull Predicate<QueryMessage> filter, @Nonnull Supplier<SubscriptionQueryUpdateMessage> updateSupplier, @Nullable ProcessingContext context) {
        return this.runAfterCommitOrImmediately(context, () -> this.emitUpdate(filter, updateSupplier));
    }

    private void emitUpdate(Predicate<QueryMessage> filter, Supplier<SubscriptionQueryUpdateMessage> updateSupplier) {
        Map<QueryMessage, QueueMessageStream> matchingHandlers = this.updateHandlers.entrySet().stream().filter(entry -> filter.test((QueryMessage)entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (matchingHandlers.isEmpty()) {
            return;
        }
        SubscriptionQueryUpdateMessage update = updateSupplier.get();
        matchingHandlers.forEach((query, updateHandler) -> {
            try {
                if (!updateHandler.offer(update, Context.empty())) {
                    updateHandler.completeExceptionally(new QueryExecutionException("Subscription update buffer overflow", null));
                    this.updateHandlers.remove(query, updateHandler);
                }
            }
            catch (Exception e) {
                logger.info("An error occurred while trying to emit an update to a query '{}'. The subscription will be cancelled. Exception summary: {}", (Object)query.type(), (Object)e.toString());
                updateHandler.completeExceptionally(e);
                this.updateHandlers.remove(query, updateHandler);
            }
        });
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> completeSubscriptions(@Nonnull Predicate<QueryMessage> filter, @Nullable ProcessingContext context) {
        return this.runAfterCommitOrImmediately(context, () -> this.completeSubscriptions(filter));
    }

    private void completeSubscriptions(Predicate<QueryMessage> filter) {
        this.updateHandlers.entrySet().stream().filter(entry -> filter.test((QueryMessage)entry.getKey())).forEach(entry -> {
            QueueMessageStream updateHandler = (QueueMessageStream)entry.getValue();
            try {
                updateHandler.complete();
            }
            catch (Exception e) {
                updateHandler.completeExceptionally(e);
            }
            this.updateHandlers.remove(entry.getKey(), updateHandler);
        });
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> completeSubscriptionsExceptionally(@Nonnull Predicate<QueryMessage> filter, @Nonnull Throwable cause, @Nullable ProcessingContext context) {
        return this.runAfterCommitOrImmediately(context, () -> this.completeSubscriptionsExceptionally(filter, cause));
    }

    private void completeSubscriptionsExceptionally(Predicate<QueryMessage> filter, Throwable cause) {
        this.updateHandlers.entrySet().stream().filter(entry -> filter.test((QueryMessage)entry.getKey())).forEach(entry -> this.emitError((QueueMessageStream)entry.getValue(), cause, (QueryMessage)entry.getKey()));
    }

    @Nonnull
    private CompletableFuture<Void> runAfterCommitOrImmediately(@Nullable ProcessingContext context, @Nonnull Runnable updateTask) {
        if (context != null) {
            context.computeResourceIfAbsent(UPDATE_TASKS_KEY, () -> {
                ArrayList subscriptionQueryTask = new ArrayList();
                context.runOnAfterCommit(c -> subscriptionQueryTask.forEach(Runnable::run));
                return subscriptionQueryTask;
            }).add(updateTask);
        } else {
            updateTask.run();
        }
        return FutureUtils.emptyCompletedFuture();
    }

    private void emitError(QueueMessageStream<SubscriptionQueryUpdateMessage> updateHandler, Throwable cause, QueryMessage query) {
        try {
            updateHandler.completeExceptionally(cause);
        }
        catch (Exception e) {
            logger.error("An error happened while trying to inform an update handler about the error. Query: {}", (Object)query);
        }
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("unitOfWorkFactory", this.unitOfWorkFactory);
        descriptor.describeProperty("subscriptions", this.subscriptions);
        descriptor.describeProperty("updateHandlers", this.updateHandlers);
    }
}

