/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import jakarta.annotation.Nonnull;
import java.util.List;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingException;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.EventProcessorBuilder;
import org.axonframework.eventhandling.EventProcessorOperations;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.TransactionalUnitOfWorkFactory;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;

public class SubscribingEventProcessor
implements EventProcessor {
    private final SubscribableMessageSource<? extends EventMessage<?>> messageSource;
    private final EventProcessingStrategy processingStrategy;
    private final TransactionalUnitOfWorkFactory transactionalUnitOfWorkFactory;
    private final EventProcessorOperations eventProcessorOperations;
    private volatile Registration eventBusRegistration;

    protected SubscribingEventProcessor(Builder builder) {
        builder.validate();
        this.messageSource = builder.messageSource;
        this.processingStrategy = builder.processingStrategy;
        this.transactionalUnitOfWorkFactory = new TransactionalUnitOfWorkFactory(builder.transactionManager);
        this.eventProcessorOperations = new EventProcessorOperations.Builder().name(builder.name()).eventHandlerInvoker(builder.eventHandlerInvoker()).errorHandler(builder.errorHandler()).spanFactory(builder.spanFactory()).messageMonitor(builder.messageMonitor()).build();
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public String getName() {
        return this.eventProcessorOperations.name();
    }

    @Override
    public List<MessageHandlerInterceptor<? super EventMessage<?>>> getHandlerInterceptors() {
        return this.eventProcessorOperations.handlerInterceptors();
    }

    @Override
    public void start() {
        if (this.eventBusRegistration != null) {
            return;
        }
        this.eventBusRegistration = this.messageSource.subscribe(eventMessages -> this.processingStrategy.handle((List<? extends EventMessage<?>>)eventMessages, this::process));
    }

    @Override
    public boolean isRunning() {
        return this.eventBusRegistration != null;
    }

    @Override
    public boolean isError() {
        return false;
    }

    protected void process(List<? extends EventMessage<?>> eventMessages) {
        try {
            UnitOfWork unitOfWork = this.transactionalUnitOfWorkFactory.create();
            this.eventProcessorOperations.processInUnitOfWork(eventMessages, unitOfWork);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new EventProcessingException("Exception occurred while processing events", e);
        }
    }

    @Override
    public void shutDown() {
        if (this.eventBusRegistration != null) {
            this.eventBusRegistration.cancel();
        }
        this.eventBusRegistration = null;
    }

    public SubscribableMessageSource<? extends EventMessage<?>> getMessageSource() {
        return this.messageSource;
    }

    @Override
    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super EventMessage<?>> handlerInterceptor) {
        return this.eventProcessorOperations.registerHandlerInterceptor(handlerInterceptor);
    }

    public static class Builder
    extends EventProcessorBuilder {
        private SubscribableMessageSource<? extends EventMessage<?>> messageSource;
        private EventProcessingStrategy processingStrategy = DirectEventProcessingStrategy.INSTANCE;
        private TransactionManager transactionManager = NoTransactionManager.INSTANCE;

        @Override
        public Builder name(@Nonnull String name) {
            super.name(name);
            return this;
        }

        @Override
        public Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override
        public Builder errorHandler(@Nonnull ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override
        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        @Override
        public Builder spanFactory(@Nonnull EventProcessorSpanFactory spanFactory) {
            super.spanFactory(spanFactory);
            return this;
        }

        public Builder messageSource(@Nonnull SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
            BuilderUtils.assertNonNull(messageSource, "SubscribableMessageSource may not be null");
            this.messageSource = messageSource;
            return this;
        }

        public Builder processingStrategy(@Nonnull EventProcessingStrategy processingStrategy) {
            BuilderUtils.assertNonNull(processingStrategy, "EventProcessingStrategy may not be null");
            this.processingStrategy = processingStrategy;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public SubscribingEventProcessor build() {
            return new SubscribingEventProcessor(this);
        }

        @Override
        protected void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The SubscribableMessageSource is a hard requirement and should be provided");
        }
    }
}

