/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.messaging.eventhandling.processing.streaming.pooled;

import jakarta.annotation.Nonnull;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.FutureUtils;
import org.axonframework.common.configuration.BaseModule;
import org.axonframework.common.configuration.ComponentBuilder;
import org.axonframework.common.configuration.ComponentDefinition;
import org.axonframework.common.configuration.Configuration;
import org.axonframework.common.configuration.ModuleBuilder;
import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory;
import org.axonframework.messaging.eventhandling.EventHandlingComponent;
import org.axonframework.messaging.eventhandling.configuration.DefaultEventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventHandlingComponentsConfigurer;
import org.axonframework.messaging.eventhandling.configuration.EventProcessorConfiguration;
import org.axonframework.messaging.eventhandling.configuration.EventProcessorCustomization;
import org.axonframework.messaging.eventhandling.configuration.EventProcessorModule;
import org.axonframework.messaging.eventhandling.interception.InterceptingEventHandlingComponent;
import org.axonframework.messaging.eventhandling.processing.streaming.pooled.PooledStreamingEventProcessor;
import org.axonframework.messaging.eventhandling.processing.streaming.pooled.PooledStreamingEventProcessorConfiguration;
import org.axonframework.messaging.eventhandling.processing.streaming.segmenting.SequenceCachingEventHandlingComponent;
import org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore;

public class PooledStreamingEventProcessorModule
extends BaseModule<PooledStreamingEventProcessorModule>
implements EventProcessorModule,
ModuleBuilder<PooledStreamingEventProcessorModule>,
EventProcessorModule.EventHandlingPhase<PooledStreamingEventProcessorModule, PooledStreamingEventProcessorConfiguration>,
EventProcessorModule.CustomizationPhase<PooledStreamingEventProcessorModule, PooledStreamingEventProcessorConfiguration> {
    private final String processorName;
    private List<ComponentBuilder<EventHandlingComponent>> eventHandlingComponentBuilders;
    private ComponentBuilder<PooledStreamingEventProcessorConfiguration> customizedProcessorConfigurationBuilder;

    public PooledStreamingEventProcessorModule(@Nonnull String processorName) {
        super(processorName);
        this.processorName = processorName;
    }

    public PooledStreamingEventProcessorModule build() {
        this.registerCustomizedConfiguration();
        this.registerTokenStore();
        this.registerUnitOfWorkFactory();
        this.registerEventHandlingComponents();
        this.registerEventProcessor();
        return this;
    }

    private void registerCustomizedConfiguration() {
        this.componentRegistry(cr -> cr.registerComponent(ComponentDefinition.ofType(PooledStreamingEventProcessorConfiguration.class).withBuilder(cfg -> {
            PooledStreamingEventProcessorConfiguration configuration = (PooledStreamingEventProcessorConfiguration)this.customizedProcessorConfigurationBuilder.build(cfg);
            configuration.workerExecutor(Optional.ofNullable(configuration.workerExecutor()).orElseGet(() -> PooledStreamingEventProcessorModule.defaultExecutor(4, "WorkPackage[" + this.processorName + "]")));
            configuration.coordinatorExecutor(Optional.ofNullable(configuration.coordinatorExecutor()).orElseGet(() -> PooledStreamingEventProcessorModule.defaultExecutor(1, "Coordinator[" + this.processorName + "]")));
            return configuration;
        }).onShutdown(0, (cfg, processor) -> {
            processor.workerExecutor().shutdown();
            return FutureUtils.emptyCompletedFuture();
        }).onShutdown(0, (cfg, processor) -> {
            processor.coordinatorExecutor().shutdown();
            return FutureUtils.emptyCompletedFuture();
        })));
    }

    private void registerTokenStore() {
        this.componentRegistry(cr -> cr.registerComponent(ComponentDefinition.ofTypeAndName(TokenStore.class, (String)("TokenStore[" + this.processorName + "]")).withBuilder(cfg -> ((PooledStreamingEventProcessorConfiguration)cfg.getComponent(PooledStreamingEventProcessorConfiguration.class)).tokenStore())));
    }

    private void registerUnitOfWorkFactory() {
        this.componentRegistry(cr -> cr.registerComponent(ComponentDefinition.ofTypeAndName(UnitOfWorkFactory.class, (String)("UnitOfWorkFactory[" + this.processorName + "]")).withBuilder(cfg -> ((PooledStreamingEventProcessorConfiguration)cfg.getComponent(PooledStreamingEventProcessorConfiguration.class)).unitOfWorkFactory())));
    }

    private void registerEventProcessor() {
        ComponentDefinition processorComponentDefinition = ComponentDefinition.ofTypeAndName(PooledStreamingEventProcessor.class, (String)this.processorName).withBuilder(cfg -> new PooledStreamingEventProcessor(this.processorName, this.getEventHandlingComponents(cfg), (PooledStreamingEventProcessorConfiguration)cfg.getComponent(PooledStreamingEventProcessorConfiguration.class))).onStart(0x3FFFFFFF, (cfg, component) -> component.start()).onShutdown(0x3FFFFFFF, (cfg, component) -> component.shutdown());
        this.componentRegistry(cr -> cr.registerComponent(processorComponentDefinition));
    }

    private void registerEventHandlingComponents() {
        for (int i = 0; i < this.eventHandlingComponentBuilders.size(); ++i) {
            ComponentBuilder<EventHandlingComponent> componentBuilder = this.eventHandlingComponentBuilders.get(i);
            String componentName = this.processorEventHandlingComponentName(i);
            this.componentRegistry(cr -> {
                cr.registerComponent(EventHandlingComponent.class, componentName, cfg -> {
                    EventHandlingComponent component = (EventHandlingComponent)componentBuilder.build(cfg);
                    return new SequenceCachingEventHandlingComponent(component);
                });
                cr.registerDecorator(EventHandlingComponent.class, componentName, -2147483548, (config, name, delegate) -> {
                    PooledStreamingEventProcessorConfiguration configuration = (PooledStreamingEventProcessorConfiguration)config.getComponent(PooledStreamingEventProcessorConfiguration.class);
                    return new InterceptingEventHandlingComponent(configuration.interceptors(), (EventHandlingComponent)delegate);
                });
            });
        }
    }

    private List<EventHandlingComponent> getEventHandlingComponents(Configuration configuration) {
        return IntStream.range(0, this.eventHandlingComponentBuilders.size()).mapToObj(i -> {
            String componentName = this.processorEventHandlingComponentName(i);
            return (EventHandlingComponent)configuration.getComponent(EventHandlingComponent.class, componentName);
        }).toList();
    }

    @Nonnull
    private String processorEventHandlingComponentName(int index) {
        return "EventHandlingComponent[" + this.processorName + "][" + index + "]";
    }

    private static ScheduledExecutorService defaultExecutor(int poolSize, String factoryName) {
        return Executors.newScheduledThreadPool(poolSize, (ThreadFactory)new AxonThreadFactory(factoryName));
    }

    @Override
    public PooledStreamingEventProcessorModule customized(@Nonnull BiFunction<Configuration, PooledStreamingEventProcessorConfiguration, PooledStreamingEventProcessorConfiguration> instanceCustomization) {
        this.customizedProcessorConfigurationBuilder = cfg -> {
            PooledStreamingEventProcessorConfiguration typeCustomization = (PooledStreamingEventProcessorConfiguration)PooledStreamingEventProcessorModule.typeSpecificCustomizationOrNoOp(cfg).apply(cfg, PooledStreamingEventProcessorModule.defaultEventProcessorsConfiguration(cfg));
            return (PooledStreamingEventProcessorConfiguration)instanceCustomization.apply(cfg, typeCustomization);
        };
        return this;
    }

    private static Customization typeSpecificCustomizationOrNoOp(Configuration cfg) {
        return cfg.getOptionalComponent(Customization.class).orElseGet(Customization::noOp);
    }

    private static PooledStreamingEventProcessorConfiguration defaultEventProcessorsConfiguration(Configuration cfg) {
        return new PooledStreamingEventProcessorConfiguration((EventProcessorConfiguration)PooledStreamingEventProcessorModule.parentSharedCustomizationOrDefault(cfg).apply(cfg, new EventProcessorConfiguration(cfg)), cfg);
    }

    private static EventProcessorCustomization parentSharedCustomizationOrDefault(Configuration cfg) {
        return cfg.getOptionalComponent(EventProcessorCustomization.class).orElseGet(EventProcessorCustomization::noOp);
    }

    @Override
    public PooledStreamingEventProcessorModule notCustomized() {
        if (this.customizedProcessorConfigurationBuilder == null) {
            this.customized((cfg, config) -> config);
        }
        return this;
    }

    @Override
    public EventProcessorModule.CustomizationPhase<PooledStreamingEventProcessorModule, PooledStreamingEventProcessorConfiguration> eventHandlingComponents(@Nonnull Function<EventHandlingComponentsConfigurer.RequiredComponentPhase, EventHandlingComponentsConfigurer.CompletePhase> configurerTask) {
        Objects.requireNonNull(configurerTask, "configurerTask may not be null");
        DefaultEventHandlingComponentsConfigurer componentsConfigurer = new DefaultEventHandlingComponentsConfigurer();
        this.eventHandlingComponentBuilders = configurerTask.apply(componentsConfigurer).toList();
        return this;
    }

    @FunctionalInterface
    public static interface Customization
    extends BiFunction<Configuration, PooledStreamingEventProcessorConfiguration, PooledStreamingEventProcessorConfiguration> {
        public static Customization noOp() {
            return (axonConfig, processorConfig) -> processorConfig;
        }

        default public Customization andThen(@Nonnull Customization other) {
            Objects.requireNonNull(other, "other may not be null");
            return (axonConfig, processorConfig) -> (PooledStreamingEventProcessorConfiguration)other.apply(axonConfig, (PooledStreamingEventProcessorConfiguration)this.apply(axonConfig, processorConfig));
        }
    }
}

