/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.FlowControl;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.impl.AsyncRegistration;
import io.axoniq.axonserver.connector.query.QueryDefinition;
import io.axoniq.axonserver.connector.query.QueryHandler;
import io.axoniq.axonserver.connector.query.SubscriptionQueryResult;
import io.axoniq.axonserver.grpc.SerializedObject;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.LockSupport;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.query.QueryConverter;
import org.axonframework.axonserver.connector.query.QueryResponseMessageStream;
import org.axonframework.axonserver.connector.query.QueryUpdateMessageStream;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.queryhandling.QueryHandlerName;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.distributed.QueryBusConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonServerQueryBusConnector
implements QueryBusConnector {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AxonServerConnection connection;
    private final String clientId;
    private final String componentName;
    private final LocalSegmentAdapter localSegmentAdapter;
    private final Map<QueryHandlerName, io.axoniq.axonserver.connector.Registration> subscriptions = new ConcurrentHashMap<QueryHandlerName, io.axoniq.axonserver.connector.Registration>();
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final Duration queryInProgressAwait = Duration.ofSeconds(5L);
    private QueryBusConnector.Handler incomingHandler;

    public AxonServerQueryBusConnector(@Nonnull AxonServerConnection connection, @Nonnull AxonServerConfiguration configuration) {
        this.connection = Objects.requireNonNull(connection, "The AxonServerConnection must not be null.");
        Objects.requireNonNull(configuration, "The AxonServerConfiguration must not be null.");
        this.clientId = configuration.getClientId();
        this.componentName = configuration.getComponentName();
        this.localSegmentAdapter = new LocalSegmentAdapter();
    }

    public void start() {
        this.shutdownLatch.initialize();
        logger.trace("The AxonServerQueryBusConnector started.");
    }

    public CompletableFuture<Void> subscribe(@Nonnull QueryHandlerName name) {
        logger.debug("Subscribing to query handler [{}] with response type [{}]", (Object)name.queryName(), (Object)name.responseName());
        QueryDefinition definition = new QueryDefinition(name.queryName().name(), name.responseName().name());
        io.axoniq.axonserver.connector.Registration registration = this.connection.queryChannel().registerQueryHandler((QueryHandler)this.localSegmentAdapter, new QueryDefinition[]{definition});
        CompletableFuture<Void> registrationComplete = new CompletableFuture<Void>();
        if (registration instanceof AsyncRegistration) {
            AsyncRegistration asyncRegistration = (AsyncRegistration)registration;
            asyncRegistration.onAck(() -> registrationComplete.complete(null));
        } else {
            registrationComplete.complete(null);
        }
        this.subscriptions.put(name, registration);
        return registrationComplete;
    }

    public boolean unsubscribe(@Nonnull QueryHandlerName name) {
        io.axoniq.axonserver.connector.Registration subscription = this.subscriptions.remove(name);
        if (subscription != null) {
            subscription.cancel();
            return true;
        }
        return false;
    }

    public void onIncomingQuery(@Nonnull QueryBusConnector.Handler handler) {
        this.incomingHandler = Objects.requireNonNull(handler, "The incoming query handler must not be null.");
    }

    @Nonnull
    public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage query, @Nullable ProcessingContext context) {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new queries as this bus is being shut down");
        try (ShutdownLatch.ActivityHandle queryInTransit = this.shutdownLatch.registerActivity();){
            ResultStream resultStream = this.connection.queryChannel().query(QueryConverter.convertQueryMessage(query, this.clientId, this.componentName));
            MessageStream messageStream = new QueryResponseMessageStream((ResultStream<QueryResponse>)resultStream).onClose(() -> ((ShutdownLatch.ActivityHandle)queryInTransit).end());
            return messageStream;
        }
    }

    @Nonnull
    public MessageStream<QueryResponseMessage> subscriptionQuery(@Nonnull SubscriptionQueryMessage query, @Nullable ProcessingContext context, int updateBufferSize) {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new queries as this bus is being shut down");
        try (ShutdownLatch.ActivityHandle queryInTransit = this.shutdownLatch.registerActivity();){
            SubscriptionQueryResult result = this.connection.queryChannel().subscriptionQuery(QueryConverter.convertQueryMessage((QueryMessage)query, this.clientId, this.componentName), SerializedObject.getDefaultInstance(), updateBufferSize, Math.min(updateBufferSize / 4, 8));
            MessageStream messageStream = MessageStream.fromFuture((CompletableFuture)result.initialResult().thenApply(QueryConverter::convertQueryResponse)).concatWith((MessageStream)new QueryUpdateMessageStream((ResultStream<QueryUpdate>)result.updates())).onClose(() -> ((ShutdownLatch.ActivityHandle)queryInTransit).end());
            return messageStream;
        }
    }

    public CompletableFuture<Void> disconnect() {
        if (this.connection.isConnected()) {
            logger.trace("Disconnecting the AxonServerQueryBusConnector.");
            this.connection.queryChannel().prepareDisconnect();
        }
        if (!this.localSegmentAdapter.awaitTermination(this.queryInProgressAwait)) {
            logger.info("Awaited termination of queries in progress without success. Going to cancel remaining queries in progress.");
            this.localSegmentAdapter.cancel();
        }
        return FutureUtils.emptyCompletedFuture();
    }

    public CompletableFuture<Void> shutdownDispatching() {
        logger.trace("Shutting down dispatching of AxonServerQueryBusConnector.");
        return this.shutdownLatch.initiateShutdown();
    }

    public void describeTo(@Nonnull ComponentDescriptor descriptor) {
        descriptor.describeProperty("connection", (Object)this.connection);
        descriptor.describeProperty("clientId", this.clientId);
        descriptor.describeProperty("componentName", this.componentName);
    }

    private class LocalSegmentAdapter
    implements QueryHandler {
        private final Map<String, Runnable> queriesInProgress = new ConcurrentHashMap<String, Runnable>();

        private LocalSegmentAdapter() {
        }

        public void handle(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
            this.stream(query, responseHandler).request(Long.MAX_VALUE);
        }

        public FlowControl stream(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
            final MessageStream result = AxonServerQueryBusConnector.this.incomingHandler.query(QueryConverter.convertQueryRequest(query));
            Runnable previous = this.queriesInProgress.put(query.getMessageIdentifier(), () -> ((MessageStream)result).close());
            if (previous != null) {
                previous.run();
            }
            result.onClose(this.queriesInProgress.remove(query.getMessageIdentifier())).onAvailable(() -> {
                while (result.hasNextAvailable()) {
                    Optional next = result.next();
                    next.ifPresent(i -> responseHandler.send((Object)QueryConverter.convertQueryResponseMessage(query.getMessageIdentifier(), (QueryResponseMessage)i.message())));
                }
                if (result.isCompleted()) {
                    result.error().ifPresentOrElse(error -> responseHandler.completeWithError(ErrorCategory.QUERY_EXECUTION_ERROR, error.getMessage()), () -> ((ReplyChannel)responseHandler).complete());
                }
            });
            return new FlowControl(){

                public void request(long requested) {
                }

                public void cancel() {
                    result.close();
                }
            };
        }

        public io.axoniq.axonserver.connector.Registration registerSubscriptionQuery(SubscriptionQuery query, QueryHandler.UpdateHandler sendUpdate) {
            Registration registration = AxonServerQueryBusConnector.this.incomingHandler.registerUpdateHandler(QueryConverter.convertSubscriptionQueryMessage(query), (QueryBusConnector.UpdateCallback)new AxonServerUpdateCallback(sendUpdate));
            return () -> {
                registration.cancel();
                return FutureUtils.emptyCompletedFuture();
            };
        }

        private boolean awaitTermination(Duration timeout) {
            Instant startAwait = Instant.now();
            Instant endAwait = startAwait.plusSeconds(timeout.getSeconds());
            while (Instant.now().isBefore(endAwait) && !this.queriesInProgress.isEmpty()) {
                this.queriesInProgress.values().stream().findFirst().ifPresent(queryInProgress -> {
                    while (Instant.now().isBefore(endAwait)) {
                        LockSupport.parkNanos(10000000L);
                    }
                });
            }
            return this.queriesInProgress.isEmpty();
        }

        private void cancel() {
            this.queriesInProgress.values().iterator().forEachRemaining(Runnable::run);
        }
    }

    class AxonServerUpdateCallback
    implements QueryBusConnector.UpdateCallback {
        private final QueryHandler.UpdateHandler updateHandler;

        public AxonServerUpdateCallback(QueryHandler.UpdateHandler updateHandler) {
            this.updateHandler = updateHandler;
        }

        @Nonnull
        public CompletableFuture<Void> sendUpdate(@Nonnull SubscriptionQueryUpdateMessage update) {
            this.updateHandler.sendUpdate(QueryConverter.convertQueryUpdate(update));
            return FutureUtils.emptyCompletedFuture();
        }

        public CompletableFuture<Void> complete() {
            this.updateHandler.complete();
            return FutureUtils.emptyCompletedFuture();
        }

        public CompletableFuture<Void> completeExceptionally(@Nonnull Throwable error) {
            this.updateHandler.sendUpdate(QueryConverter.convertQueryUpdate(AxonServerQueryBusConnector.this.clientId, error));
            this.updateHandler.complete();
            return FutureUtils.emptyCompletedFuture();
        }
    }
}

