/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.queryhandling;

import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.responsetypes.MultipleInstancesResponseType;
import org.axonframework.messaging.responsetypes.OptionalResponseType;
import org.axonframework.messaging.responsetypes.PublisherResponseType;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.LegacyUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.queryhandling.DefaultQueryUpdateEmitterSpanFactory;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.QueryUpdateEmitterSpanFactory;
import org.axonframework.queryhandling.SinkWrapper;
import org.axonframework.queryhandling.SinksManyWrapper;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.UpdateHandlerRegistration;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class SimpleQueryUpdateEmitter
implements QueryUpdateEmitter {
    private static final Logger logger = LoggerFactory.getLogger(SimpleQueryUpdateEmitter.class);
    private static final String QUERY_UPDATE_TASKS_RESOURCE_KEY = "/update-tasks";
    private final MessageMonitor<? super SubscriptionQueryUpdateMessage> updateMessageMonitor;
    private final QueryUpdateEmitterSpanFactory spanFactory;
    private final ConcurrentMap<SubscriptionQueryMessage<?, ?, ?>, SinkWrapper<?>> updateHandlers = new ConcurrentHashMap();
    private final List<MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage>> dispatchInterceptors = new CopyOnWriteArrayList<MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage>>();

    protected SimpleQueryUpdateEmitter(Builder builder) {
        builder.validate();
        this.updateMessageMonitor = builder.updateMessageMonitor;
        this.spanFactory = builder.spanFactory;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public boolean queryUpdateHandlerRegistered(@Nonnull SubscriptionQueryMessage<?, ?, ?> query) {
        return this.updateHandlers.keySet().stream().anyMatch(m -> m.identifier().equals(query.identifier()));
    }

    @Override
    public <U> UpdateHandlerRegistration registerUpdateHandler(@Nonnull SubscriptionQueryMessage<?, ?, ?> query, int updateBufferSize) {
        Sinks.Many sink = Sinks.many().replay().limit(updateBufferSize);
        SinksManyWrapper sinksManyWrapper = new SinksManyWrapper(sink);
        Runnable removeHandler = () -> this.updateHandlers.remove(query);
        Registration registration = () -> {
            removeHandler.run();
            return true;
        };
        this.updateHandlers.put(query, sinksManyWrapper);
        Flux updateMessageFlux = sink.asFlux().doOnCancel(removeHandler).doOnTerminate(removeHandler);
        return new UpdateHandlerRegistration(registration, (Flux<SubscriptionQueryUpdateMessage>)updateMessageFlux, sinksManyWrapper::complete);
    }

    @Override
    public <U> void emit(@Nonnull Predicate<SubscriptionQueryMessage<?, ?, U>> filter, @Nonnull SubscriptionQueryUpdateMessage update) {
        SubscriptionQueryUpdateMessage updateMessage = this.spanFactory.propagateContext(update);
        Span span = this.spanFactory.createUpdateScheduleEmitSpan(updateMessage);
        span.run(() -> {
            Span doEmitSpan = this.spanFactory.createUpdateEmitSpan(updateMessage);
            this.runOnAfterCommitOrNow(doEmitSpan.wrapRunnable(() -> this.doEmit(filter, this.intercept(this.spanFactory.propagateContext(updateMessage)))));
        });
    }

    private SubscriptionQueryUpdateMessage intercept(SubscriptionQueryUpdateMessage message) {
        return message;
    }

    @Override
    public void complete(@Nonnull Predicate<SubscriptionQueryMessage<?, ?, ?>> filter) {
        this.runOnAfterCommitOrNow(() -> this.doComplete(filter));
    }

    @Override
    public void completeExceptionally(@Nonnull Predicate<SubscriptionQueryMessage<?, ?, ?>> filter, @Nonnull Throwable cause) {
        this.runOnAfterCommitOrNow(() -> this.doCompleteExceptionally(filter, cause));
    }

    @Nonnull
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage> interceptor) {
        this.dispatchInterceptors.add(interceptor);
        return () -> this.dispatchInterceptors.remove(interceptor);
    }

    private <U> void doEmit(Predicate<SubscriptionQueryMessage<?, ?, U>> filter, SubscriptionQueryUpdateMessage update) {
        this.updateHandlers.keySet().stream().filter(this.payloadMatchesQueryResponseType(update.payloadType())).filter(sqm -> filter.test((SubscriptionQueryMessage)sqm)).forEach(query -> Optional.ofNullable((SinkWrapper)this.updateHandlers.get(query)).ifPresent(uh -> this.doEmit((SubscriptionQueryMessage<?, ?, ?>)query, (SinkWrapper<?>)uh, update)));
    }

    private Predicate<SubscriptionQueryMessage<?, ?, ?>> payloadMatchesQueryResponseType(Class<?> payloadType) {
        return sqm -> {
            if (sqm.updatesResponseType() instanceof MultipleInstancesResponseType) {
                return payloadType.isArray() || Iterable.class.isAssignableFrom(payloadType);
            }
            if (sqm.updatesResponseType() instanceof OptionalResponseType) {
                return Optional.class.isAssignableFrom(payloadType);
            }
            if (sqm.updatesResponseType() instanceof PublisherResponseType) {
                return Publisher.class.isAssignableFrom(payloadType);
            }
            return sqm.updatesResponseType().getExpectedResponseType().isAssignableFrom(payloadType);
        };
    }

    private void doEmit(SubscriptionQueryMessage<?, ?, ?> query, SinkWrapper<?> updateHandler, SubscriptionQueryUpdateMessage update) {
        MessageMonitor.MonitorCallback monitorCallback = this.updateMessageMonitor.onMessageIngested(update);
        try {
            updateHandler.next(update);
            monitorCallback.reportSuccess();
        }
        catch (Exception e) {
            logger.info("An error occurred while trying to emit an update to a query '{}'. The subscription will be cancelled. Exception summary: {}", (Object)query.type(), (Object)e.toString());
            monitorCallback.reportFailure(e);
            this.updateHandlers.remove(query);
            this.emitError(query, e, updateHandler);
        }
    }

    private void doComplete(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter) {
        this.updateHandlers.keySet().stream().filter(filter).forEach(query -> Optional.ofNullable((SinkWrapper)this.updateHandlers.get(query)).ifPresent(updateHandler -> {
            try {
                updateHandler.complete();
            }
            catch (Exception e) {
                this.emitError((SubscriptionQueryMessage<?, ?, ?>)query, e, (SinkWrapper<?>)updateHandler);
            }
        }));
    }

    private void emitError(SubscriptionQueryMessage<?, ?, ?> query, Throwable cause, SinkWrapper<?> updateHandler) {
        try {
            updateHandler.error(cause);
        }
        catch (Exception e) {
            logger.error("An error happened while trying to inform update handler about the error. Query: {}", query);
        }
    }

    private void doCompleteExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> filter, Throwable cause) {
        this.updateHandlers.keySet().stream().filter(filter).forEach(query -> Optional.ofNullable((SinkWrapper)this.updateHandlers.get(query)).ifPresent(updateHandler -> this.emitError((SubscriptionQueryMessage<?, ?, ?>)query, cause, (SinkWrapper<?>)updateHandler)));
    }

    private void runOnAfterCommitOrNow(Runnable queryUpdateTask) {
        if (this.inStartedPhaseOfUnitOfWork()) {
            LegacyUnitOfWork<?> unitOfWork = CurrentUnitOfWork.get();
            unitOfWork.getOrComputeResource(this.toString() + QUERY_UPDATE_TASKS_RESOURCE_KEY, resourceKey -> {
                ArrayList queryUpdateTasks = new ArrayList();
                unitOfWork.afterCommit(uow -> queryUpdateTasks.forEach(Runnable::run));
                return queryUpdateTasks;
            }).add(queryUpdateTask);
        } else {
            queryUpdateTask.run();
        }
    }

    private boolean inStartedPhaseOfUnitOfWork() {
        return CurrentUnitOfWork.isStarted() && LegacyUnitOfWork.Phase.STARTED.equals((Object)CurrentUnitOfWork.get().phase());
    }

    @Override
    public Set<SubscriptionQueryMessage<?, ?, ?>> activeSubscriptions() {
        return Collections.unmodifiableSet(this.updateHandlers.keySet());
    }

    public static class Builder {
        private MessageMonitor<? super SubscriptionQueryUpdateMessage> updateMessageMonitor = NoOpMessageMonitor.INSTANCE;
        private QueryUpdateEmitterSpanFactory spanFactory = DefaultQueryUpdateEmitterSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build();

        public Builder updateMessageMonitor(@Nonnull MessageMonitor<? super SubscriptionQueryUpdateMessage> updateMessageMonitor) {
            BuilderUtils.assertNonNull(updateMessageMonitor, "MessageMonitor may not be null");
            this.updateMessageMonitor = updateMessageMonitor;
            return this;
        }

        public Builder spanFactory(@Nonnull QueryUpdateEmitterSpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public SimpleQueryUpdateEmitter build() {
            return new SimpleQueryUpdateEmitter(this);
        }

        protected void validate() throws AxonConfigurationException {
        }
    }
}

