/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.axonframework.common.Assert;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.QualifiedName;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWorkFactory;
import org.axonframework.queryhandling.DuplicateQueryHandlerSubscriptionException;
import org.axonframework.queryhandling.GenericSubscriptionQueryResponseMessages;
import org.axonframework.queryhandling.NoHandlerForQueryException;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryHandlerName;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SinkWrapper;
import org.axonframework.queryhandling.SinksManyWrapper;
import org.axonframework.queryhandling.SubscriptionQueryAlreadyRegisteredException;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResponseMessages;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class SimpleQueryBus
implements QueryBus {
    private static final Logger logger = LoggerFactory.getLogger(SimpleQueryBus.class);
    private static final Context.ResourceKey<List<Runnable>> UPDATE_TASKS_KEY = Context.ResourceKey.withLabel("update-tasks");
    private final UnitOfWorkFactory unitOfWorkFactory;
    private final ConcurrentMap<QueryHandlerName, QueryHandler> subscriptions = new ConcurrentHashMap<QueryHandlerName, QueryHandler>();
    private final ConcurrentMap<SubscriptionQueryMessage, SinkWrapper<SubscriptionQueryUpdateMessage>> updateHandlers = new ConcurrentHashMap<SubscriptionQueryMessage, SinkWrapper<SubscriptionQueryUpdateMessage>>();

    public SimpleQueryBus(@Nonnull UnitOfWorkFactory unitOfWorkFactory) {
        this.unitOfWorkFactory = Objects.requireNonNull(unitOfWorkFactory, "The UnitOfWorkFactory must be provided.");
    }

    @Override
    public QueryBus subscribe(@Nonnull QueryHandlerName handlerName, @Nonnull QueryHandler queryHandler) {
        logger.debug("Subscribing query handler for name [{}].", (Object)handlerName);
        QueryHandler existingHandler = this.subscriptions.putIfAbsent(handlerName, queryHandler);
        if (existingHandler != null && existingHandler != queryHandler) {
            throw new DuplicateQueryHandlerSubscriptionException(handlerName, existingHandler, queryHandler);
        }
        return this;
    }

    @Override
    @Nonnull
    public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage query, @Nullable ProcessingContext context) {
        if (logger.isDebugEnabled()) {
            logger.debug("Dispatching direct-query for query name [{}] and response [{}].", (Object)query.type().name(), query.responseType());
        }
        try {
            MessageStream.Single responseStream = this.handle(query, this.handlerFor(query)).get();
            return SimpleQueryBus.containsResponseOrUserException(responseStream) ? responseStream : MessageStream.empty().cast();
        }
        catch (Exception e) {
            return MessageStream.failed(e);
        }
    }

    @Nonnull
    private CompletableFuture<MessageStream<QueryResponseMessage>> handle(@Nonnull QueryMessage query, @Nonnull QueryHandler handler) {
        if (logger.isDebugEnabled()) {
            logger.debug("Handling query [{} {name={},response={}}]", new Object[]{query.identifier(), query.type(), query.responseType()});
        }
        UnitOfWork unitOfWork = this.unitOfWorkFactory.create();
        return unitOfWork.executeWithResult(context -> {
            MessageStream<QueryResponseMessage> result;
            try {
                result = handler.handle(query, (ProcessingContext)context);
            }
            catch (Exception e) {
                result = MessageStream.failed(e);
            }
            return CompletableFuture.completedFuture(result);
        });
    }

    private static boolean containsResponseOrUserException(MessageStream<QueryResponseMessage> responseStream) {
        return !responseStream.isCompleted() || responseStream.error().map(e -> !(e instanceof NoHandlerForQueryException)).orElse(false) != false;
    }

    @Override
    @Nonnull
    public SubscriptionQueryResponseMessages subscriptionQuery(@Nonnull SubscriptionQueryMessage query, @Nullable ProcessingContext context, int updateBufferSize) {
        Assert.isFalse(Publisher.class.isAssignableFrom(query.responseType().getExpectedResponseType()), () -> "Subscription Query query does not support Flux as a return type.");
        Assert.isFalse(Publisher.class.isAssignableFrom(query.updatesResponseType().getExpectedResponseType()), () -> "Subscription Query query does not support Flux as an update type.");
        Flux initialStream = Flux.defer(() -> this.query(query, context).asFlux()).map(MessageStream.Entry::message).doOnError(error -> logger.error("An error happened while trying to report an initial result. Query: {}", (Object)query, error));
        UpdateHandler updateHandler = this.subscribeToUpdates(query, updateBufferSize);
        return new GenericSubscriptionQueryResponseMessages((Flux<QueryResponseMessage>)initialStream, updateHandler.updates(), updateHandler::complete);
    }

    @Override
    @Nonnull
    public UpdateHandler subscribeToUpdates(@Nonnull SubscriptionQueryMessage query, int updateBufferSize) {
        if (this.hasHandlerFor(query.identifier())) {
            throw new SubscriptionQueryAlreadyRegisteredException(query.identifier());
        }
        Runnable removeHandler = () -> this.updateHandlers.remove(query);
        Sinks.Many sink = Sinks.many().replay().limit(updateBufferSize);
        SinksManyWrapper sinksManyWrapper = new SinksManyWrapper(sink);
        this.updateHandlers.put(query, sinksManyWrapper);
        Flux updateMessageFlux = sink.asFlux().doOnCancel(removeHandler).doOnTerminate(removeHandler);
        return new UpdateHandler((Flux<SubscriptionQueryUpdateMessage>)updateMessageFlux, removeHandler, sinksManyWrapper::complete);
    }

    private boolean hasHandlerFor(String queryId) {
        return this.updateHandlers.keySet().stream().anyMatch(m -> m.identifier().equals(queryId));
    }

    @Nonnull
    private QueryHandler handlerFor(@Nonnull QueryMessage query) {
        ResponseType<?> responseType = query.responseType();
        QueryHandlerName handlerName = new QueryHandlerName(query.type().qualifiedName(), new QualifiedName(responseType.getExpectedResponseType()));
        if (!this.subscriptions.containsKey(handlerName)) {
            throw NoHandlerForQueryException.forBus(query);
        }
        return (QueryHandler)this.subscriptions.get(handlerName);
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> emitUpdate(@Nonnull Predicate<SubscriptionQueryMessage> filter, @Nonnull Supplier<SubscriptionQueryUpdateMessage> updateSupplier, @Nullable ProcessingContext context) {
        return this.runAfterCommitOrImmediately(context, () -> this.emitUpdate(filter, updateSupplier));
    }

    private void emitUpdate(Predicate<SubscriptionQueryMessage> filter, Supplier<SubscriptionQueryUpdateMessage> updateSupplier) {
        Map<SubscriptionQueryMessage, SinkWrapper> matchingHandlers = this.updateHandlers.entrySet().stream().filter(entry -> filter.test((SubscriptionQueryMessage)entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (matchingHandlers.isEmpty()) {
            return;
        }
        SubscriptionQueryUpdateMessage update = updateSupplier.get();
        matchingHandlers.forEach((query, updateHandler) -> {
            try {
                updateHandler.next(update);
            }
            catch (Exception e) {
                logger.info("An error occurred while trying to emit an update to a query '{}'. The subscription will be cancelled. Exception summary: {}", (Object)query.type(), (Object)e.toString());
                this.updateHandlers.remove(query);
                this.emitError((SinkWrapper<?>)updateHandler, e, (SubscriptionQueryMessage)query);
            }
        });
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> completeSubscriptions(@Nonnull Predicate<SubscriptionQueryMessage> filter, @Nullable ProcessingContext context) {
        return this.runAfterCommitOrImmediately(context, () -> this.completeSubscriptions(filter));
    }

    private void completeSubscriptions(Predicate<SubscriptionQueryMessage> filter) {
        this.updateHandlers.entrySet().stream().filter(entry -> filter.test((SubscriptionQueryMessage)entry.getKey())).forEach(entry -> {
            SinkWrapper updateHandler = (SinkWrapper)entry.getValue();
            try {
                updateHandler.complete();
            }
            catch (Exception e) {
                this.emitError(updateHandler, e, (SubscriptionQueryMessage)entry.getKey());
            }
        });
    }

    @Override
    @Nonnull
    public CompletableFuture<Void> completeSubscriptionsExceptionally(@Nonnull Predicate<SubscriptionQueryMessage> filter, @Nonnull Throwable cause, @Nullable ProcessingContext context) {
        return this.runAfterCommitOrImmediately(context, () -> this.completeSubscriptionsExceptionally(filter, cause));
    }

    private void completeSubscriptionsExceptionally(Predicate<SubscriptionQueryMessage> filter, Throwable cause) {
        this.updateHandlers.entrySet().stream().filter(entry -> filter.test((SubscriptionQueryMessage)entry.getKey())).forEach(entry -> this.emitError((SinkWrapper)entry.getValue(), cause, (SubscriptionQueryMessage)entry.getKey()));
    }

    @Nonnull
    private CompletableFuture<Void> runAfterCommitOrImmediately(@Nullable ProcessingContext context, @Nonnull Runnable updateTask) {
        if (context != null) {
            context.computeResourceIfAbsent(UPDATE_TASKS_KEY, () -> {
                ArrayList subscriptionQueryTask = new ArrayList();
                context.runOnAfterCommit(c -> subscriptionQueryTask.forEach(Runnable::run));
                return subscriptionQueryTask;
            }).add(updateTask);
        } else {
            updateTask.run();
        }
        return FutureUtils.emptyCompletedFuture();
    }

    private void emitError(SinkWrapper<?> updateHandler, Throwable cause, SubscriptionQueryMessage query) {
        try {
            updateHandler.error(cause);
        }
        catch (Exception e) {
            logger.error("An error happened while trying to inform an update handler about the error. Query: {}", (Object)query);
        }
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("unitOfWorkFactory", this.unitOfWorkFactory);
        descriptor.describeProperty("subscriptions", this.subscriptions);
        descriptor.describeProperty("updateHandlers", this.updateHandlers);
    }
}

