/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.config;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.annotation.AnnotationUtils;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.Component;
import org.axonframework.config.Configuration;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.MessageMonitorFactory;
import org.axonframework.config.ModuleConfiguration;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.config.SagaConfiguration;
import org.axonframework.config.SagaConfigurer;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.LoggingErrorHandler;
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.annotation.HandlerDefinition;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore;
import org.axonframework.monitoring.MessageMonitor;

public class EventProcessingModule
implements ModuleConfiguration,
EventProcessingConfiguration,
EventProcessingConfigurer {
    private final List<TypeProcessingGroupSelector> typeSelectors = new ArrayList<TypeProcessingGroupSelector>();
    private final List<InstanceProcessingGroupSelector> instanceSelectors = new ArrayList<InstanceProcessingGroupSelector>();
    private final List<SagaConfigurer<?>> sagaConfigurations = new ArrayList();
    private final List<Component<Object>> eventHandlerBuilders = new ArrayList<Component<Object>>();
    private final Map<String, Component<ListenerInvocationErrorHandler>> listenerInvocationErrorHandlers = new HashMap<String, Component<ListenerInvocationErrorHandler>>();
    private final Map<String, Component<ErrorHandler>> errorHandlers = new HashMap<String, Component<ErrorHandler>>();
    private final Map<String, EventProcessingConfigurer.EventProcessorBuilder> eventProcessorBuilders = new HashMap<String, EventProcessingConfigurer.EventProcessorBuilder>();
    private final Map<String, Component<EventProcessor>> eventProcessors = new HashMap<String, Component<EventProcessor>>();
    private final List<BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>>> defaultHandlerInterceptors = new ArrayList();
    private final Map<String, List<Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>>> handlerInterceptorsBuilders = new HashMap();
    private final Map<String, String> processingGroupsAssignments = new HashMap<String, String>();
    private final Map<String, Component<SequencingPolicy<? super EventMessage<?>>>> sequencingPolicies = new HashMap();
    private final Map<String, MessageMonitorFactory> messageMonitorFactories = new HashMap<String, MessageMonitorFactory>();
    private final Map<String, Component<TokenStore>> tokenStore = new HashMap<String, Component<TokenStore>>();
    private final Map<String, Component<RollbackConfiguration>> rollbackConfigurations = new HashMap<String, Component<RollbackConfiguration>>();
    private final Map<String, Component<TransactionManager>> transactionManagers = new HashMap<String, Component<TransactionManager>>();
    private final TypeProcessingGroupSelector annotationGroupSelector = TypeProcessingGroupSelector.access$000(type -> EventProcessingModule.annotatedProcessingGroupOfType(type).orElse(null));
    private TypeProcessingGroupSelector typeFallback = TypeProcessingGroupSelector.access$000(c -> c.getSimpleName() + "Processor");
    private InstanceProcessingGroupSelector instanceFallbackSelector = InstanceProcessingGroupSelector.access$100(EventProcessingModule::packageOfObject);
    private Configuration configuration;
    private final Component<ListenerInvocationErrorHandler> defaultListenerInvocationErrorHandler = new Component<ListenerInvocationErrorHandler>(() -> this.configuration, "listenerInvocationErrorHandler", c -> c.getComponent(ListenerInvocationErrorHandler.class, LoggingErrorHandler::new));
    private final Component<ErrorHandler> defaultErrorHandler = new Component<ErrorHandler>(() -> this.configuration, "errorHandler", c -> c.getComponent(ErrorHandler.class, PropagatingErrorHandler::instance));
    private final Component<SequencingPolicy<? super EventMessage<?>>> defaultSequencingPolicy = new Component<SequencingPolicy>(() -> this.configuration, "sequencingPolicy", c -> SequentialPerAggregatePolicy.instance());
    private final Component<TokenStore> defaultTokenStore = new Component<TokenStore>(() -> this.configuration, "tokenStore", c -> c.getComponent(TokenStore.class, InMemoryTokenStore::new));
    private final Component<RollbackConfiguration> defaultRollbackConfiguration = new Component<RollbackConfiguration>(() -> this.configuration, "rollbackConfiguration", c -> c.getComponent(RollbackConfiguration.class, () -> RollbackConfigurationType.ANY_THROWABLE));
    private final Component<SagaStore> sagaStore = new Component<SagaStore>(() -> this.configuration, "sagaStore", c -> c.getComponent(SagaStore.class, InMemorySagaStore::new));
    private final Component<TransactionManager> defaultTransactionManager = new Component<TransactionManager>(() -> this.configuration, "transactionManager", c -> c.getComponent(TransactionManager.class, NoTransactionManager::instance));
    private final Component<StreamableMessageSource<TrackedEventMessage<?>>> defaultStreamableSource = new Component<StreamableMessageSource>(() -> this.configuration, "defaultStreamableMessageSource", c -> (StreamableMessageSource)c.eventBus());
    private final Component<SubscribableMessageSource<? extends EventMessage<?>>> defaultSubscribableSource = new Component<SubscribableMessageSource>(() -> this.configuration, "defaultSubscribableMessageSource", Configuration::eventBus);
    private final Component<TrackingEventProcessorConfiguration> defaultTrackingEventProcessorConfiguration = new Component<TrackingEventProcessorConfiguration>(() -> this.configuration, "trackingEventProcessorConfiguration", c -> c.getComponent(TrackingEventProcessorConfiguration.class, TrackingEventProcessorConfiguration::forSingleThreadedProcessing));
    private EventProcessingConfigurer.EventProcessorBuilder defaultEventProcessorBuilder = this::defaultEventProcessor;
    private Function<String, String> defaultProcessingGroupAssignment = Function.identity();

    @Override
    public void initialize(Configuration configuration) {
        this.configuration = configuration;
        this.eventProcessors.clear();
        this.instanceSelectors.sort(Comparator.comparing(ProcessingGroupSelector::getPriority).reversed());
        HashMap<String, List<Function<Configuration, EventHandlerInvoker>>> handlerInvokers = new HashMap<String, List<Function<Configuration, EventHandlerInvoker>>>();
        this.registerSimpleEventHandlerInvokers(handlerInvokers);
        this.registerSagaManagers(handlerInvokers);
        handlerInvokers.forEach((processorName, invokers) -> {
            Component<EventProcessor> eventProcessorComponent = new Component<EventProcessor>(configuration, (String)processorName, c -> this.buildEventProcessor((List<Function<Configuration, EventHandlerInvoker>>)invokers, (String)processorName));
            this.eventProcessors.put((String)processorName, eventProcessorComponent);
        });
        this.initializeProcessors();
    }

    private void initializeProcessors() {
        this.configuration.onStart(Integer.MIN_VALUE, () -> this.eventProcessors.values().forEach(Component::get));
    }

    private String selectProcessingGroupByType(Class<?> type) {
        ArrayList<TypeProcessingGroupSelector> selectors = new ArrayList<TypeProcessingGroupSelector>(this.typeSelectors);
        selectors.add(this.annotationGroupSelector);
        selectors.add(this.typeFallback);
        return selectors.stream().map(s -> s.select(type)).filter(Optional::isPresent).map(Optional::get).findFirst().orElseThrow(() -> new IllegalStateException("Could not select a processing group for type [" + type.getSimpleName() + "]"));
    }

    private void registerSimpleEventHandlerInvokers(Map<String, List<Function<Configuration, EventHandlerInvoker>>> handlerInvokers) {
        HashMap<String, List> assignments = new HashMap<String, List>();
        ArrayList<InstanceProcessingGroupSelector> selectors = new ArrayList<InstanceProcessingGroupSelector>(this.instanceSelectors);
        this.typeSelectors.stream().map(x$0 -> new InstanceToTypeProcessingGroupSelectorAdapter((TypeProcessingGroupSelector)x$0)).forEach(selectors::add);
        selectors.add(new InstanceToTypeProcessingGroupSelectorAdapter(this.annotationGroupSelector));
        selectors.add(this.instanceFallbackSelector);
        this.eventHandlerBuilders.stream().map(Component::get).forEach(handler -> {
            String processingGroup = selectors.stream().map(s -> s.select(handler)).filter(Optional::isPresent).map(Optional::get).findFirst().orElseThrow(() -> new IllegalStateException("Could not select a processing group for handler [" + handler.getClass().getSimpleName() + "]"));
            assignments.computeIfAbsent(processingGroup, k -> new ArrayList()).add(handler);
        });
        assignments.forEach((processingGroup, handlers) -> {
            String processorName = this.processorNameForProcessingGroup((String)processingGroup);
            handlerInvokers.computeIfAbsent(processorName, k -> new ArrayList()).add(c -> SimpleEventHandlerInvoker.builder().eventHandlers(handlers).handlerDefinition(this.retrieveHandlerDefinition((List<Object>)handlers)).parameterResolverFactory(this.configuration.parameterResolverFactory()).listenerInvocationErrorHandler(this.listenerInvocationErrorHandler((String)processingGroup)).sequencingPolicy(this.sequencingPolicy((String)processingGroup)).build());
        });
    }

    private HandlerDefinition retrieveHandlerDefinition(List<Object> handlers) {
        return this.configuration.handlerDefinition(handlers.get(0).getClass());
    }

    private void registerSagaManagers(Map<String, List<Function<Configuration, EventHandlerInvoker>>> handlerInvokers) {
        this.sagaConfigurations.forEach(sc -> {
            SagaConfiguration sagaConfig = sc.initialize(this.configuration);
            String processingGroup = this.selectProcessingGroupByType(sagaConfig.type());
            String processorName = this.processorNameForProcessingGroup(processingGroup);
            handlerInvokers.computeIfAbsent(processorName, k -> new ArrayList()).add(c -> sagaConfig.manager());
        });
    }

    private EventProcessor buildEventProcessor(List<Function<Configuration, EventHandlerInvoker>> builderFunctions, String processorName) {
        List invokers = builderFunctions.stream().map(invokerBuilder -> (EventHandlerInvoker)invokerBuilder.apply(this.configuration)).collect(Collectors.toList());
        MultiEventHandlerInvoker multiEventHandlerInvoker = new MultiEventHandlerInvoker(invokers);
        EventProcessor eventProcessor = this.eventProcessorBuilders.getOrDefault(processorName, this.defaultEventProcessorBuilder).build(processorName, this.configuration, (EventHandlerInvoker)multiEventHandlerInvoker);
        ((List)this.handlerInterceptorsBuilders.getOrDefault(processorName, new ArrayList())).stream().map(hi -> (MessageHandlerInterceptor)hi.apply(this.configuration)).forEach(arg_0 -> ((EventProcessor)eventProcessor).registerHandlerInterceptor(arg_0));
        this.defaultHandlerInterceptors.stream().map(f -> (MessageHandlerInterceptor)f.apply(this.configuration, processorName)).filter(Objects::nonNull).forEach(arg_0 -> ((EventProcessor)eventProcessor).registerHandlerInterceptor(arg_0));
        eventProcessor.registerHandlerInterceptor((MessageHandlerInterceptor)new CorrelationDataInterceptor(this.configuration.correlationDataProviders()));
        return eventProcessor;
    }

    @Override
    public <T extends EventProcessor> Optional<T> eventProcessorByProcessingGroup(String processingGroup) {
        this.ensureInitialized();
        return Optional.ofNullable(this.eventProcessors().get(this.processorNameForProcessingGroup(processingGroup)));
    }

    @Override
    public Map<String, EventProcessor> eventProcessors() {
        this.ensureInitialized();
        HashMap<String, EventProcessor> result = new HashMap<String, EventProcessor>(this.eventProcessors.size());
        this.eventProcessors.forEach((name, component) -> {
            EventProcessor cfr_ignored_0 = (EventProcessor)result.put((String)name, (EventProcessor)component.get());
        });
        return result;
    }

    @Override
    public String sagaProcessingGroup(Class<?> sagaType) {
        return this.selectProcessingGroupByType(sagaType);
    }

    @Override
    public List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptorsFor(String processorName) {
        this.ensureInitialized();
        return this.eventProcessor(processorName).map(EventProcessor::getHandlerInterceptors).orElse(Collections.emptyList());
    }

    @Override
    public ListenerInvocationErrorHandler listenerInvocationErrorHandler(String processingGroup) {
        this.ensureInitialized();
        return this.listenerInvocationErrorHandlers.containsKey(processingGroup) ? this.listenerInvocationErrorHandlers.get(processingGroup).get() : this.defaultListenerInvocationErrorHandler.get();
    }

    @Override
    public SequencingPolicy<? super EventMessage<?>> sequencingPolicy(String processingGroup) {
        this.ensureInitialized();
        return this.sequencingPolicies.containsKey(processingGroup) ? this.sequencingPolicies.get(processingGroup).get() : this.defaultSequencingPolicy.get();
    }

    @Override
    public RollbackConfiguration rollbackConfiguration(String componentName) {
        this.ensureInitialized();
        return this.rollbackConfigurations.containsKey(componentName) ? this.rollbackConfigurations.get(componentName).get() : this.defaultRollbackConfiguration.get();
    }

    @Override
    public ErrorHandler errorHandler(String componentName) {
        this.ensureInitialized();
        return this.errorHandlers.containsKey(componentName) ? this.errorHandlers.get(componentName).get() : this.defaultErrorHandler.get();
    }

    @Override
    public SagaStore sagaStore() {
        this.ensureInitialized();
        return this.sagaStore.get();
    }

    @Override
    public List<SagaConfiguration<?>> sagaConfigurations() {
        this.ensureInitialized();
        return this.sagaConfigurations.stream().map(sc -> sc.initialize(this.configuration)).collect(Collectors.toList());
    }

    private String processorNameForProcessingGroup(String processingGroup) {
        this.ensureInitialized();
        return this.processingGroupsAssignments.getOrDefault(processingGroup, this.defaultProcessingGroupAssignment.apply(processingGroup));
    }

    @Override
    public MessageMonitor<? super Message<?>> messageMonitor(Class<?> componentType, String eventProcessorName) {
        this.ensureInitialized();
        if (this.messageMonitorFactories.containsKey(eventProcessorName)) {
            return this.messageMonitorFactories.get(eventProcessorName).create(this.configuration, componentType, eventProcessorName);
        }
        return this.configuration.messageMonitor(componentType, eventProcessorName);
    }

    @Override
    public TokenStore tokenStore(String processorName) {
        this.ensureInitialized();
        return this.tokenStore.containsKey(processorName) ? this.tokenStore.get(processorName).get() : this.defaultTokenStore.get();
    }

    @Override
    public TransactionManager transactionManager(String processingGroup) {
        this.ensureInitialized();
        return this.transactionManagers.containsKey(processingGroup) ? this.transactionManagers.get(processingGroup).get() : this.defaultTransactionManager.get();
    }

    private void ensureInitialized() {
        BuilderUtils.assertNonNull((Object)this.configuration, (String)"Configuration is not initialized yet");
    }

    @Override
    public <T> EventProcessingConfigurer registerSaga(Class<T> sagaType, Consumer<SagaConfigurer<T>> sagaConfigurer) {
        SagaConfigurer<T> configurer = SagaConfigurer.forType(sagaType);
        sagaConfigurer.accept(configurer);
        this.sagaConfigurations.add(configurer);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerSagaStore(Function<Configuration, SagaStore> sagaStoreBuilder) {
        this.sagaStore.update(sagaStoreBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerEventHandler(Function<Configuration, Object> eventHandlerBuilder) {
        this.eventHandlerBuilders.add(new Component<Object>(() -> this.configuration, "eventHandler", eventHandlerBuilder));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerDefaultListenerInvocationErrorHandler(Function<Configuration, ListenerInvocationErrorHandler> listenerInvocationErrorHandlerBuilder) {
        this.defaultListenerInvocationErrorHandler.update(listenerInvocationErrorHandlerBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerListenerInvocationErrorHandler(String processingGroup, Function<Configuration, ListenerInvocationErrorHandler> listenerInvocationErrorHandlerBuilder) {
        this.listenerInvocationErrorHandlers.put(processingGroup, new Component<ListenerInvocationErrorHandler>(() -> this.configuration, "listenerInvocationErrorHandler", listenerInvocationErrorHandlerBuilder));
        return this;
    }

    @Override
    public EventProcessingConfigurer configureDefaultStreamableMessageSource(Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> defaultSource) {
        this.defaultStreamableSource.update(defaultSource);
        return this;
    }

    @Override
    public EventProcessingConfigurer configureDefaultSubscribableMessageSource(Function<Configuration, SubscribableMessageSource<EventMessage<?>>> defaultSource) {
        this.defaultSubscribableSource.update(defaultSource);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerTrackingEventProcessor(String name, Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> source) {
        return this.registerTrackingEventProcessor(name, source, c -> this.defaultTrackingEventProcessorConfiguration.get());
    }

    @Override
    public EventProcessingConfigurer registerTrackingEventProcessor(String name, Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> source, Function<Configuration, TrackingEventProcessorConfiguration> processorConfiguration) {
        this.registerEventProcessor(name, (n, c, ehi) -> this.trackingEventProcessor(n, ehi, (TrackingEventProcessorConfiguration)processorConfiguration.apply(c), (StreamableMessageSource)source.apply(c)));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerEventProcessorFactory(EventProcessingConfigurer.EventProcessorBuilder eventProcessorBuilder) {
        this.defaultEventProcessorBuilder = eventProcessorBuilder;
        return this;
    }

    @Override
    public EventProcessingConfigurer registerEventProcessor(String name, EventProcessingConfigurer.EventProcessorBuilder eventProcessorBuilder) {
        if (this.eventProcessorBuilders.containsKey(name)) {
            throw new AxonConfigurationException(String.format("Event processor with name %s already exists", name));
        }
        this.eventProcessorBuilders.put(name, eventProcessorBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerTokenStore(String processingGroup, Function<Configuration, TokenStore> tokenStore) {
        this.tokenStore.put(processingGroup, new Component<TokenStore>(() -> this.configuration, "tokenStore", tokenStore));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerTokenStore(Function<Configuration, TokenStore> tokenStore) {
        this.defaultTokenStore.update(tokenStore);
        return this;
    }

    @Override
    public EventProcessingConfigurer usingSubscribingEventProcessors() {
        this.defaultEventProcessorBuilder = (name, conf, eventHandlerInvoker) -> this.subscribingEventProcessor(name, eventHandlerInvoker, this.defaultSubscribableSource.get());
        return this;
    }

    @Override
    public EventProcessingConfigurer usingTrackingEventProcessors() {
        this.defaultEventProcessorBuilder = (name, conf, eventHandlerInvoker) -> this.trackingEventProcessor(name, eventHandlerInvoker, this.defaultTrackingEventProcessorConfiguration.get(), this.defaultStreamableSource.get());
        return this;
    }

    @Override
    public EventProcessingConfigurer registerSubscribingEventProcessor(String name, Function<Configuration, SubscribableMessageSource<? extends EventMessage<?>>> messageSource) {
        this.registerEventProcessor(name, (n, c, ehi) -> this.subscribingEventProcessor(n, ehi, (SubscribableMessageSource)messageSource.apply(c)));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerDefaultErrorHandler(Function<Configuration, ErrorHandler> errorHandlerBuilder) {
        this.defaultErrorHandler.update(errorHandlerBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerErrorHandler(String eventProcessorName, Function<Configuration, ErrorHandler> errorHandlerBuilder) {
        this.errorHandlers.put(eventProcessorName, new Component<ErrorHandler>(() -> this.configuration, "errorHandler", errorHandlerBuilder));
        return this;
    }

    @Override
    public EventProcessingConfigurer byDefaultAssignHandlerInstancesTo(Function<Object, String> assignmentFunction) {
        this.instanceFallbackSelector = InstanceProcessingGroupSelector.defaultSelector(assignmentFunction);
        return this;
    }

    @Override
    public EventProcessingConfigurer byDefaultAssignHandlerTypesTo(Function<Class<?>, String> assignmentFunction) {
        this.typeFallback = TypeProcessingGroupSelector.defaultSelector(assignmentFunction);
        return this;
    }

    @Override
    public EventProcessingConfigurer assignHandlerInstancesMatching(String processingGroup, int priority, Predicate<Object> criteria) {
        this.instanceSelectors.add(new InstanceProcessingGroupSelector(processingGroup, priority, criteria));
        return this;
    }

    @Override
    public EventProcessingConfigurer assignHandlerTypesMatching(String processingGroup, int priority, Predicate<Class<?>> criteria) {
        this.typeSelectors.add(new TypeProcessingGroupSelector(processingGroup, priority, criteria));
        return this;
    }

    @Override
    public EventProcessingConfigurer assignProcessingGroup(String processingGroup, String processorName) {
        this.processingGroupsAssignments.put(processingGroup, processorName);
        return this;
    }

    @Override
    public EventProcessingConfigurer assignProcessingGroup(Function<String, String> assignmentRule) {
        this.defaultProcessingGroupAssignment = assignmentRule;
        return this;
    }

    @Override
    public EventProcessingConfigurer registerHandlerInterceptor(String processorName, Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder) {
        Component<EventProcessor> eps;
        if (this.configuration != null && (eps = this.eventProcessors.get(processorName)) != null && eps.isInitialized()) {
            eps.get().registerHandlerInterceptor(interceptorBuilder.apply(this.configuration));
        }
        this.handlerInterceptorsBuilders.computeIfAbsent(processorName, k -> new ArrayList()).add(interceptorBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerDefaultHandlerInterceptor(BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>> interceptorBuilder) {
        this.defaultHandlerInterceptors.add(interceptorBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerSequencingPolicy(String processingGroup, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> policyBuilder) {
        this.sequencingPolicies.put(processingGroup, new Component(() -> this.configuration, "sequencingPolicy", policyBuilder));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerDefaultSequencingPolicy(Function<Configuration, SequencingPolicy<? super EventMessage<?>>> policyBuilder) {
        this.defaultSequencingPolicy.update(policyBuilder);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerMessageMonitorFactory(String eventProcessorName, MessageMonitorFactory messageMonitorFactory) {
        this.messageMonitorFactories.put(eventProcessorName, messageMonitorFactory);
        return this;
    }

    @Override
    public EventProcessingConfigurer registerRollbackConfiguration(String name, Function<Configuration, RollbackConfiguration> rollbackConfigurationBuilder) {
        this.rollbackConfigurations.put(name, new Component<RollbackConfiguration>(() -> this.configuration, "rollbackConfiguration", rollbackConfigurationBuilder));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerTransactionManager(String name, Function<Configuration, TransactionManager> transactionManagerBuilder) {
        this.transactionManagers.put(name, new Component<TransactionManager>(() -> this.configuration, "transactionManager", transactionManagerBuilder));
        return this;
    }

    @Override
    public EventProcessingConfigurer registerTrackingEventProcessorConfiguration(Function<Configuration, TrackingEventProcessorConfiguration> trackingEventProcessorConfigurationBuilder) {
        this.defaultTrackingEventProcessorConfiguration.update(trackingEventProcessorConfigurationBuilder);
        return this;
    }

    private EventProcessor defaultEventProcessor(String name, Configuration conf, EventHandlerInvoker eventHandlerInvoker) {
        if (conf.eventBus() instanceof StreamableMessageSource) {
            return this.trackingEventProcessor(name, eventHandlerInvoker, this.defaultTrackingEventProcessorConfiguration.get(), this.defaultStreamableSource.get());
        }
        return this.subscribingEventProcessor(name, eventHandlerInvoker, this.defaultSubscribableSource.get());
    }

    private SubscribingEventProcessor subscribingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> messageSource) {
        return SubscribingEventProcessor.builder().name(name).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(this.rollbackConfiguration(name)).errorHandler(this.errorHandler(name)).messageMonitor(this.messageMonitor(SubscribingEventProcessor.class, name)).messageSource(messageSource).processingStrategy((EventProcessingStrategy)DirectEventProcessingStrategy.INSTANCE).transactionManager(this.transactionManager(name)).build();
    }

    private TrackingEventProcessor trackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, TrackingEventProcessorConfiguration config, StreamableMessageSource<TrackedEventMessage<?>> source) {
        return TrackingEventProcessor.builder().name(name).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(this.rollbackConfiguration(name)).errorHandler(this.errorHandler(name)).messageMonitor(this.messageMonitor(TrackingEventProcessor.class, name)).messageSource(source).tokenStore(this.tokenStore(name)).transactionManager(this.transactionManager(name)).trackingEventProcessorConfiguration(config).build();
    }

    protected static String packageOfObject(Object object) {
        return object.getClass().getName().replace("." + object.getClass().getSimpleName(), "");
    }

    private static Optional<String> annotatedProcessingGroupOfType(Class<?> type) {
        Optional annAttr = AnnotationUtils.findAnnotationAttributes(type, ProcessingGroup.class);
        return annAttr.map(attr -> (String)attr.get("processingGroup"));
    }

    private static class ProcessingGroupSelector<T> {
        private final int priority;
        private final Function<T, Optional<String>> function;

        private ProcessingGroupSelector(int priority, Function<T, Optional<String>> selectorFunction) {
            this.priority = priority;
            this.function = selectorFunction;
        }

        private ProcessingGroupSelector(String name, int priority, Predicate<T> criteria) {
            this(priority, handler -> {
                if (criteria.test(handler)) {
                    return Optional.of(name);
                }
                return Optional.empty();
            });
        }

        public Optional<String> select(T handler) {
            return this.function.apply(handler);
        }

        public int getPriority() {
            return this.priority;
        }
    }

    private static class InstanceToTypeProcessingGroupSelectorAdapter
    extends InstanceProcessingGroupSelector {
        private InstanceToTypeProcessingGroupSelectorAdapter(TypeProcessingGroupSelector delegate) {
            super(delegate.getPriority(), i -> delegate.select(i.getClass()));
        }
    }

    private static class TypeProcessingGroupSelector
    extends ProcessingGroupSelector<Class<?>> {
        private static TypeProcessingGroupSelector defaultSelector(Function<Class<?>, String> selectorFunction) {
            return new TypeProcessingGroupSelector(Integer.MIN_VALUE, selectorFunction.andThen(Optional::ofNullable));
        }

        private TypeProcessingGroupSelector(int priority, Function<Class<?>, Optional<String>> selectorFunction) {
            super(priority, selectorFunction);
        }

        private TypeProcessingGroupSelector(String name, int priority, Predicate<Class<?>> criteria) {
            super(name, priority, criteria);
        }
    }

    private static class InstanceProcessingGroupSelector
    extends ProcessingGroupSelector<Object> {
        private static InstanceProcessingGroupSelector defaultSelector(Function<Object, String> selectorFunction) {
            return new InstanceProcessingGroupSelector(Integer.MIN_VALUE, selectorFunction.andThen(Optional::ofNullable));
        }

        private InstanceProcessingGroupSelector(int priority, Function<Object, Optional<String>> selectorFunction) {
            super(priority, selectorFunction);
        }

        private InstanceProcessingGroupSelector(String name, int priority, Predicate<Object> criteria) {
            super(name, priority, criteria);
        }
    }
}

