/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.ClassBasedMessageTypeResolver;
import org.axonframework.messaging.IllegalPayloadAccessException;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageTypeResolver;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.DefaultSubscriptionQueryResult;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.GenericQueryResponseMessage;
import org.axonframework.queryhandling.GenericStreamingQueryMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class DefaultQueryGateway
implements QueryGateway {
    private final QueryBus queryBus;
    private final List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors;
    private final MessageTypeResolver messageTypeResolver;

    protected DefaultQueryGateway(Builder builder) {
        builder.validate();
        this.queryBus = builder.queryBus;
        this.dispatchInterceptors = builder.dispatchInterceptors;
        this.messageTypeResolver = builder.messageTypeResolver;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public <R, Q> CompletableFuture<R> query(@Nonnull Q query, @Nonnull ResponseType<R> responseType) {
        QueryMessage<Q, R> queryMessage = this.asQueryMessage(query, responseType);
        CompletableFuture queryResponse = this.queryBus.query(this.processInterceptors(queryMessage));
        CompletableFuture result = new CompletableFuture();
        result.whenComplete((r, e) -> {
            if (!queryResponse.isDone()) {
                queryResponse.cancel(true);
            }
        });
        ((CompletableFuture)queryResponse.exceptionally(cause -> this.asResponseMessage(responseType.responseMessagePayloadType(), (Throwable)cause))).thenAccept(queryResponseMessage -> {
            try {
                if (queryResponseMessage.isExceptional()) {
                    result.completeExceptionally(queryResponseMessage.exceptionResult());
                } else {
                    result.complete(queryResponseMessage.getPayload());
                }
            }
            catch (Exception e) {
                result.completeExceptionally(e);
            }
        });
        return result;
    }

    @Deprecated
    private <R> QueryResponseMessage<R> asResponseMessage(Class<R> declaredType, Throwable exception) {
        return new GenericQueryResponseMessage<R>(this.messageTypeResolver.resolve(exception.getClass()), exception, declaredType);
    }

    @Override
    public <R, Q> Publisher<R> streamingQuery(Q query, Class<R> responseType) {
        return Mono.fromSupplier(() -> this.asStreamingQueryMessage(query, responseType)).flatMapMany(queryMessage -> this.queryBus.streamingQuery(this.processInterceptors(queryMessage))).map(Message::getPayload);
    }

    private <R, Q> StreamingQueryMessage<Q, R> asStreamingQueryMessage(Q query, Class<R> responseType) {
        return query instanceof Message ? new GenericStreamingQueryMessage((Message)query, responseType) : new GenericStreamingQueryMessage<Q, R>(this.messageTypeResolver.resolve(query), query, responseType);
    }

    @Override
    public <R, Q> Stream<R> scatterGather(@Nonnull Q query, @Nonnull ResponseType<R> responseType, long timeout, @Nonnull TimeUnit timeUnit) {
        QueryMessage<Q, R> queryMessage = this.asQueryMessage(query, responseType);
        return this.queryBus.scatterGather(this.processInterceptors(queryMessage), timeout, timeUnit).map(Message::getPayload);
    }

    private <R, Q> QueryMessage<Q, R> asQueryMessage(Q query, ResponseType<R> responseType) {
        return query instanceof Message ? new GenericQueryMessage((Message)query, responseType) : new GenericQueryMessage<Q, R>(this.messageTypeResolver.resolve(query), query, responseType);
    }

    @Override
    public <Q, I, U> SubscriptionQueryResult<I, U> subscriptionQuery(@Nonnull Q query, @Nonnull ResponseType<I> initialResponseType, @Nonnull ResponseType<U> updateResponseType, int updateBufferSize) {
        SubscriptionQueryMessage<Q, I, U> subscriptionQueryMessage = this.asSubscriptionQueryMessage(query, initialResponseType, updateResponseType);
        SubscriptionQueryMessage<Q, I, U> interceptedQuery = this.processInterceptors(subscriptionQueryMessage);
        SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> result = this.queryBus.subscriptionQuery(interceptedQuery, updateBufferSize);
        return this.getSubscriptionQueryResult(result);
    }

    private <Q, I, U> SubscriptionQueryMessage<Q, I, U> asSubscriptionQueryMessage(Q query, ResponseType<I> initialResponseType, ResponseType<U> updateResponseType) {
        return query instanceof Message ? new GenericSubscriptionQueryMessage((Message)query, initialResponseType, updateResponseType) : new GenericSubscriptionQueryMessage<Q, I, U>(this.messageTypeResolver.resolve(query), query, initialResponseType, updateResponseType);
    }

    private <I, U> DefaultSubscriptionQueryResult<I, U> getSubscriptionQueryResult(SubscriptionQueryResult<QueryResponseMessage<I>, SubscriptionQueryUpdateMessage<U>> result) {
        return new DefaultSubscriptionQueryResult(result.initialResult().filter(initialResult -> Objects.nonNull(initialResult.getPayload())).map(Message::getPayload).onErrorMap(e -> e instanceof IllegalPayloadAccessException ? e.getCause() : e), result.updates().filter(update -> Objects.nonNull(update.getPayload())).map(Message::getPayload), result);
    }

    @Override
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super QueryMessage<?, ?>> interceptor) {
        this.dispatchInterceptors.add(interceptor);
        return () -> this.dispatchInterceptors.remove(interceptor);
    }

    private <Q, R, T extends QueryMessage<Q, R>> T processInterceptors(T query) {
        Object intercepted = query;
        for (MessageDispatchInterceptor<QueryMessage<?, ?>> interceptor : this.dispatchInterceptors) {
            intercepted = interceptor.handle((QueryMessage<?, ?>)intercepted);
        }
        return intercepted;
    }

    public static class Builder {
        private QueryBus queryBus;
        private List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors = new CopyOnWriteArrayList();
        private MessageTypeResolver messageTypeResolver = new ClassBasedMessageTypeResolver();

        public Builder queryBus(@Nonnull QueryBus queryBus) {
            BuilderUtils.assertNonNull(queryBus, "QueryBus may not be null");
            this.queryBus = queryBus;
            return this;
        }

        public Builder dispatchInterceptors(MessageDispatchInterceptor<? super QueryMessage<?, ?>> ... dispatchInterceptors) {
            return this.dispatchInterceptors(Arrays.asList(dispatchInterceptors));
        }

        public Builder dispatchInterceptors(List<MessageDispatchInterceptor<? super QueryMessage<?, ?>>> dispatchInterceptors) {
            this.dispatchInterceptors = dispatchInterceptors != null && !dispatchInterceptors.isEmpty() ? new CopyOnWriteArrayList(dispatchInterceptors) : new CopyOnWriteArrayList();
            return this;
        }

        public Builder messageNameResolver(MessageTypeResolver messageTypeResolver) {
            BuilderUtils.assertNonNull(messageTypeResolver, "MessageNameResolver may not be null");
            this.messageTypeResolver = messageTypeResolver;
            return this;
        }

        public DefaultQueryGateway build() {
            return new DefaultQueryGateway(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.queryBus, "The QueryBus is a hard requirement and should be provided");
        }
    }
}

