/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MessageTypeResolver;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.GenericStreamingQueryMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryResponse;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryPriorityCalculator;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResponse;
import org.axonframework.queryhandling.SubscriptionQueryResponseMessages;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;

public class DefaultQueryGateway
implements QueryGateway {
    private final QueryBus queryBus;
    private final MessageTypeResolver messageTypeResolver;
    private final QueryPriorityCalculator priorityCalculator;

    public DefaultQueryGateway(@Nonnull QueryBus queryBus, @Nonnull MessageTypeResolver messageTypeResolver, @Nonnull QueryPriorityCalculator priorityCalculator) {
        this.queryBus = Objects.requireNonNull(queryBus, "The QueryBus must not be null.");
        this.messageTypeResolver = Objects.requireNonNull(messageTypeResolver, "The MessageTypeResolver must not be null.");
        this.priorityCalculator = Objects.requireNonNull(priorityCalculator, "The QueryPriorityCalculator must not be null.");
    }

    @Override
    @Nonnull
    public <R> CompletableFuture<R> query(@Nonnull Object query, @Nonnull Class<R> responseType, @Nullable ProcessingContext context) {
        QueryMessage queryMessage = this.asQueryMessage(query, ResponseTypes.instanceOf(responseType));
        MessageStream<QueryResponseMessage> resultStream = this.queryBus.query(queryMessage, context);
        CompletionStage resultFuture = ((CompletableFuture)resultStream.first().asCompletableFuture().thenApply(MessageStream.Entry::message)).thenApply(queryResponseMessage -> queryResponseMessage.payloadAs(responseType));
        ((CompletableFuture)resultFuture).whenComplete((r, e) -> {
            if (!resultStream.isCompleted()) {
                resultStream.close();
            }
        });
        return resultFuture;
    }

    @Override
    @Nonnull
    public <R> CompletableFuture<List<R>> queryMany(@Nonnull Object query, @Nonnull Class<R> responseType, @Nullable ProcessingContext context) {
        QueryMessage queryMessage = this.asQueryMessage(query, ResponseTypes.multipleInstancesOf(responseType));
        MessageStream<QueryResponseMessage> resultStream = this.queryBus.query(queryMessage, context);
        CompletableFuture<List<R>> resultFuture = resultStream.reduce(new ArrayList(), (list, entry) -> {
            list.add(((QueryResponseMessage)entry.message()).payloadAs(responseType));
            return list;
        });
        resultFuture.whenComplete((r, e) -> {
            if (!resultStream.isCompleted()) {
                resultStream.close();
            }
        });
        return resultFuture;
    }

    @Override
    @Nonnull
    public <R> Publisher<R> streamingQuery(@Nonnull Object query, @Nonnull Class<R> responseType, @Nullable ProcessingContext context) {
        return Mono.fromSupplier(() -> this.asStreamingQueryMessage(query, responseType)).flatMapMany(queryMessage -> this.queryBus.streamingQuery((StreamingQueryMessage)queryMessage, context)).mapNotNull(m -> m.payloadAs(responseType));
    }

    @Override
    @Nonnull
    public <R> Publisher<R> subscriptionQuery(@Nonnull Object query, @Nonnull Class<R> responseType, @Nullable ProcessingContext context) {
        SubscriptionQueryMessage queryMessage = this.asSubscriptionQueryMessage(query, ResponseTypes.instanceOf(responseType), ResponseTypes.instanceOf(responseType));
        return this.queryBus.subscriptionQuery(queryMessage, context, Queues.SMALL_BUFFER_SIZE).asFlux().mapNotNull(message -> message.payloadAs(responseType));
    }

    @Override
    @Nonnull
    public <I, U> SubscriptionQueryResponse<I, U> subscriptionQuery(@Nonnull Object query, @Nonnull Class<I> initialResponseType, @Nonnull Class<U> updateResponseType, @Nullable ProcessingContext context, int updateBufferSize) {
        SubscriptionQueryMessage queryMessage = this.asSubscriptionQueryMessage(query, ResponseTypes.instanceOf(initialResponseType), ResponseTypes.instanceOf(updateResponseType));
        SubscriptionQueryResponseMessages response = this.queryBus.subscriptionQuery(queryMessage, context, updateBufferSize);
        return new GenericSubscriptionQueryResponse<Object, Object>(response, message -> message.payloadAs(initialResponseType), message -> message.payloadAs(updateResponseType));
    }

    private QueryMessage asQueryMessage(Object query, ResponseType<?> responseType) {
        GenericQueryMessage genericQueryMessage;
        if (query instanceof Message) {
            Message message = (Message)query;
            genericQueryMessage = new GenericQueryMessage(message, responseType);
        } else {
            genericQueryMessage = new GenericQueryMessage(this.resolveType(query), query, responseType);
        }
        return genericQueryMessage;
    }

    private <R> StreamingQueryMessage asStreamingQueryMessage(Object query, Class<R> responseType) {
        GenericStreamingQueryMessage genericStreamingQueryMessage;
        if (query instanceof Message) {
            Message message = (Message)query;
            genericStreamingQueryMessage = new GenericStreamingQueryMessage(message, responseType);
        } else {
            genericStreamingQueryMessage = new GenericStreamingQueryMessage(this.resolveType(query), query, responseType);
        }
        return genericStreamingQueryMessage;
    }

    private <I, U> SubscriptionQueryMessage asSubscriptionQueryMessage(Object query, ResponseType<I> initialType, ResponseType<U> updateType) {
        return query instanceof Message ? new GenericSubscriptionQueryMessage((Message)query, initialType, updateType) : new GenericSubscriptionQueryMessage(this.resolveType(query), query, initialType, updateType);
    }

    private MessageType resolveType(Object query) {
        return this.messageTypeResolver.resolveOrThrow(query);
    }

    @Override
    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("queryBus", this.queryBus);
        descriptor.describeProperty("messageTypeResolver", this.messageTypeResolver);
        descriptor.describeProperty("priorityCalculator", this.priorityCalculator);
    }
}

