/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.grpc.netty.shaded.io.netty.util.internal.OutOfDirectMemoryError;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.query.QuerySerializer;
import org.axonframework.axonserver.connector.query.StreamableFluxResponse;
import org.axonframework.axonserver.connector.query.StreamableInstanceResponse;
import org.axonframework.axonserver.connector.query.StreamableMultiInstanceResponse;
import org.axonframework.axonserver.connector.query.StreamableResponse;
import org.axonframework.axonserver.connector.util.ExceptionSerializer;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.messaging.responsetypes.MultipleInstancesResponseType;
import org.axonframework.queryhandling.GenericStreamingQueryMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryBusSpanFactory;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.StreamingQueryMessage;
import org.axonframework.util.ClasspathResolver;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

class QueryProcessingTask
implements Runnable,
FlowControl {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int DIRECT_QUERY_NUMBER_OF_RESULTS = 1;
    private final QueryBus localSegment;
    private final QueryRequest queryRequest;
    private final ReplyChannel<QueryResponse> responseHandler;
    private final QuerySerializer serializer;
    private final String clientId;
    private final AtomicReference<StreamableResponse> streamableResultRef = new AtomicReference();
    private final AtomicLong requestedBeforeInit = new AtomicLong();
    private final AtomicBoolean cancelledBeforeInit = new AtomicBoolean();
    private final boolean supportsStreaming;
    private final Supplier<Boolean> reactorOnClassPath;
    private final QueryBusSpanFactory spanFactory;

    QueryProcessingTask(QueryBus localSegment, QueryRequest queryRequest, ReplyChannel<QueryResponse> responseHandler, QuerySerializer serializer, String clientId, QueryBusSpanFactory spanFactory) {
        this(localSegment, queryRequest, responseHandler, serializer, clientId, ClasspathResolver::projectReactorOnClasspath, spanFactory);
    }

    QueryProcessingTask(QueryBus localSegment, QueryRequest queryRequest, ReplyChannel<QueryResponse> responseHandler, QuerySerializer serializer, String clientId, Supplier<Boolean> reactorOnClassPath, QueryBusSpanFactory spanFactory) {
        this.localSegment = localSegment;
        this.queryRequest = queryRequest;
        this.responseHandler = responseHandler;
        this.serializer = serializer;
        this.clientId = clientId;
        this.supportsStreaming = this.supportsStreaming(queryRequest);
        this.reactorOnClassPath = reactorOnClassPath;
        this.spanFactory = spanFactory;
    }

    @Override
    public void run() {
        try {
            logger.debug("Will process query [{}]", (Object)this.queryRequest.getQuery());
            QueryMessage queryMessage = this.serializer.deserializeRequest(this.queryRequest);
            this.spanFactory.createQueryProcessingSpan(queryMessage).run(() -> {
                if (ProcessingInstructionHelper.numberOfResults(this.queryRequest.getProcessingInstructionsList()) == 1L) {
                    if (this.supportsStreaming && this.reactorOnClassPath.get().booleanValue()) {
                        this.streamingQuery(queryMessage);
                    } else {
                        this.directQuery(queryMessage);
                    }
                } else {
                    this.scatterGather(queryMessage);
                }
            });
        }
        catch (OutOfDirectMemoryError | RuntimeException e) {
            this.sendError(e);
            logger.warn("Query Processor had an exception when processing query [{}]", (Object)this.queryRequest.getQuery(), (Object)e);
        }
    }

    public void request(long requested) {
        if (requested <= 0L) {
            return;
        }
        if (!this.requestIfInitialized(requested)) {
            this.requestedBeforeInit.getAndUpdate(current -> {
                try {
                    return Math.addExact(requested, current);
                }
                catch (ArithmeticException e) {
                    return Long.MAX_VALUE;
                }
            });
            this.requestIfInitialized(this.requestedBeforeInit.get());
        }
    }

    public void cancel() {
        StreamableResponse result = this.streamableResultRef.get();
        if (result != null) {
            result.cancel();
        } else {
            this.cancelledBeforeInit.set(true);
        }
    }

    public boolean resultPending() {
        return this.streamableResultRef.get() == null;
    }

    private <Q, R> void streamingQuery(QueryMessage<Q, R> originalQueryMessage) {
        GenericStreamingQueryMessage streamingQueryMessage = new GenericStreamingQueryMessage(originalQueryMessage, originalQueryMessage.getQueryName(), originalQueryMessage.getResponseType().getExpectedResponseType());
        Publisher resultPublisher = this.localSegment.streamingQuery((StreamingQueryMessage)streamingQueryMessage);
        this.setResult(this.streamableFluxResult(resultPublisher));
    }

    private <Q, R, T> void directQuery(QueryMessage<Q, R> queryMessage) {
        this.localSegment.query(queryMessage).whenComplete((result, e) -> {
            if (e != null) {
                this.sendError((Throwable)e);
            } else {
                try {
                    StreamableResponse streamableResponse = this.supportsStreaming && queryMessage.getResponseType() instanceof MultipleInstancesResponseType ? this.streamableMultiInstanceResult((QueryResponseMessage)result, queryMessage.getResponseType().getExpectedResponseType()) : this.streamableInstanceResult((QueryResponseMessage<?>)result);
                    this.setResult(streamableResponse);
                }
                catch (Throwable t) {
                    this.sendError(t);
                }
            }
        });
    }

    private void setResult(StreamableResponse result) {
        this.streamableResultRef.set(result);
        if (this.cancelledBeforeInit.get()) {
            this.cancel();
        } else {
            this.request(this.requestedBeforeInit.get());
        }
    }

    private <Q, R> void scatterGather(QueryMessage<Q, R> originalQueryMessage) {
        Stream result = this.localSegment.scatterGather(originalQueryMessage, ProcessingInstructionHelper.timeout(this.queryRequest.getProcessingInstructionsList()), TimeUnit.MILLISECONDS);
        result.forEach(r -> this.responseHandler.send((Object)this.serializer.serializeResponse((QueryResponseMessage<?>)r, this.queryRequest.getMessageIdentifier())));
        this.responseHandler.complete();
    }

    private <R> StreamableResponse streamableFluxResult(Publisher<QueryResponseMessage<R>> resultPublisher) {
        return new StreamableFluxResponse(Flux.from(resultPublisher), this.responseHandler, this.serializer, this.queryRequest.getMessageIdentifier(), this.clientId);
    }

    private <R> StreamableMultiInstanceResponse<R> streamableMultiInstanceResult(QueryResponseMessage<List<R>> result, Class<R> responseType) {
        return new StreamableMultiInstanceResponse<R>(result, responseType, this.responseHandler, this.serializer, this.queryRequest.getMessageIdentifier());
    }

    private StreamableInstanceResponse streamableInstanceResult(QueryResponseMessage<?> result) {
        return new StreamableInstanceResponse(result, this.responseHandler, this.serializer, this.queryRequest.getMessageIdentifier());
    }

    private boolean supportsStreaming(QueryRequest queryRequest) {
        boolean axonServerSupportsStreaming = ProcessingInstructionHelper.axonServerSupportsQueryStreaming(queryRequest.getProcessingInstructionsList());
        boolean clientSupportsStreaming = ProcessingInstructionHelper.clientSupportsQueryStreaming(queryRequest.getProcessingInstructionsList());
        return axonServerSupportsStreaming && clientSupportsStreaming;
    }

    private boolean requestIfInitialized(long requested) {
        StreamableResponse result = this.streamableResultRef.get();
        if (result != null) {
            result.request(requested);
            return true;
        }
        return false;
    }

    private void sendError(Throwable t) {
        ErrorMessage ex = ExceptionSerializer.serialize(this.clientId, t);
        QueryResponse response = QueryResponse.newBuilder().setErrorCode(ErrorCode.getQueryExecutionErrorCode(t).errorCode()).setErrorMessage(ex).setRequestIdentifier(this.queryRequest.getMessageIdentifier()).build();
        this.responseHandler.sendLast((Object)response);
    }
}

