/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener;

import io.awspring.cloud.sqs.ConfigUtils;
import io.awspring.cloud.sqs.LifecycleHandler;
import io.awspring.cloud.sqs.MessageExecutionThread;
import io.awspring.cloud.sqs.MessageExecutionThreadFactory;
import io.awspring.cloud.sqs.UnsupportedThreadFactoryException;
import io.awspring.cloud.sqs.listener.AbstractMessageListenerContainer;
import io.awspring.cloud.sqs.listener.BackPressureHandler;
import io.awspring.cloud.sqs.listener.ContainerComponentFactory;
import io.awspring.cloud.sqs.listener.ContainerOptions;
import io.awspring.cloud.sqs.listener.ContainerOptionsBuilder;
import io.awspring.cloud.sqs.listener.IdentifiableContainerComponent;
import io.awspring.cloud.sqs.listener.SemaphoreBackPressureHandler;
import io.awspring.cloud.sqs.listener.TaskExecutorAware;
import io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingContextInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.AfterProcessingInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.BeforeProcessingContextInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.BeforeProcessingInterceptorExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.ErrorHandlerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingConfiguration;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder;
import io.awspring.cloud.sqs.listener.sink.MessageProcessingPipelineSink;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import io.awspring.cloud.sqs.listener.source.AcknowledgementProcessingMessageSource;
import io.awspring.cloud.sqs.listener.source.MessageSource;
import io.awspring.cloud.sqs.listener.source.PollingMessageSource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public abstract class AbstractPipelineMessageListenerContainer<T, O extends ContainerOptions<O, B>, B extends ContainerOptionsBuilder<B, O>>
extends AbstractMessageListenerContainer<T, O, B> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractPipelineMessageListenerContainer.class);
    private Collection<MessageSource<T>> messageSources;
    private MessageSink<T> messageSink;
    private TaskExecutor componentsTaskExecutor;
    @Nullable
    private TaskExecutor acknowledgementResultTaskExecutor;

    protected AbstractPipelineMessageListenerContainer(O options) {
        super(options);
    }

    @Override
    protected void doStart() {
        ContainerComponentFactory componentFactory = this.determineComponentFactory();
        this.messageSources = this.createMessageSources(componentFactory);
        this.messageSink = componentFactory.createMessageSink(this.getContainerOptions());
        this.configureComponents(componentFactory);
        LifecycleHandler.get().start(this.messageSink, this.messageSources);
    }

    private ContainerComponentFactory<T, O> determineComponentFactory() {
        return this.getComponentFactories().stream().filter(factory -> factory.supports(this.getQueueNames(), this.getContainerOptions())).findFirst().orElseThrow(() -> new IllegalArgumentException("No ContainerComponentFactory found for queues " + this.getQueueNames()));
    }

    private Collection<ContainerComponentFactory<T, O>> getComponentFactories() {
        return !this.getContainerComponentFactories().isEmpty() ? this.getContainerComponentFactories() : this.createDefaultComponentFactories();
    }

    protected abstract Collection<ContainerComponentFactory<T, O>> createDefaultComponentFactories();

    protected Collection<MessageSource<T>> createMessageSources(ContainerComponentFactory<T, O> componentFactory) {
        ArrayList<String> queueNames = new ArrayList<String>(this.getQueueNames());
        return IntStream.range(0, queueNames.size()).mapToObj(index -> this.createMessageSource((String)queueNames.get(index), index, componentFactory)).collect(Collectors.toList());
    }

    protected MessageSource<T> createMessageSource(String queueName, int index, ContainerComponentFactory<T, O> componentFactory) {
        MessageSource<T> messageSource = componentFactory.createMessageSource(this.getContainerOptions());
        ConfigUtils.INSTANCE.acceptIfInstance(messageSource, PollingMessageSource.class, pms -> pms.setPollingEndpointName(queueName)).acceptIfInstance(messageSource, IdentifiableContainerComponent.class, icc -> icc.setId(this.getId() + "-" + index));
        return messageSource;
    }

    private void configureComponents(ContainerComponentFactory<T, O> componentFactory) {
        this.getContainerOptions().configure(this.messageSources).configure(this.messageSink);
        this.componentsTaskExecutor = this.resolveComponentsTaskExecutor();
        this.configureMessageSources(componentFactory);
        this.configureMessageSink(this.createMessageProcessingPipeline(componentFactory));
        this.configureContainerComponents();
    }

    private void verifyThreadType() {
        if (!MessageExecutionThread.class.isAssignableFrom(Thread.currentThread().getClass())) {
            throw new UnsupportedThreadFactoryException("Custom TaskExecutors must use a %s.".formatted(MessageExecutionThreadFactory.class.getSimpleName()));
        }
    }

    protected void configureMessageSources(ContainerComponentFactory<T, O> componentFactory) {
        TaskExecutor taskExecutor = this.createSourcesTaskExecutor();
        ConfigUtils.INSTANCE.acceptMany(this.messageSources, source -> source.setMessageSink(this.messageSink)).acceptManyIfInstance(this.messageSources, PollingMessageSource.class, pms -> pms.setBackPressureHandler(this.createBackPressureHandler())).acceptManyIfInstance(this.messageSources, AcknowledgementProcessingMessageSource.class, ams -> ams.setAcknowledgementProcessor(componentFactory.createAcknowledgementProcessor(this.getContainerOptions()))).acceptManyIfInstance(this.messageSources, AcknowledgementProcessingMessageSource.class, ams -> ams.setAcknowledgementResultCallback(this.getAcknowledgementResultCallback())).acceptManyIfInstance(this.messageSources, TaskExecutorAware.class, teac -> teac.setTaskExecutor(taskExecutor));
        this.doConfigureMessageSources(this.messageSources);
    }

    protected void doConfigureMessageSources(Collection<MessageSource<T>> messageSources) {
    }

    protected void configureMessageSink(MessageProcessingPipeline<T> messageProcessingPipeline) {
        ConfigUtils.INSTANCE.acceptIfInstance(this.messageSink, IdentifiableContainerComponent.class, icc -> icc.setId(this.getId())).acceptIfInstance(this.messageSink, TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.messageSink, MessageProcessingPipelineSink.class, mls -> mls.setMessagePipeline(messageProcessingPipeline));
        this.doConfigureMessageSink(this.messageSink);
    }

    protected void doConfigureMessageSink(MessageSink<T> messageSink) {
    }

    protected void configureContainerComponents() {
        ConfigUtils.INSTANCE.acceptManyIfInstance(this.getMessageInterceptors(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.getMessageListener(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.getErrorHandler(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getComponentsTaskExecutor())).acceptIfInstance(this.getAcknowledgementResultCallback(), TaskExecutorAware.class, teac -> teac.setTaskExecutor(this.getAcknowledgementResultTaskExecutor()));
    }

    protected MessageProcessingPipeline<T> createMessageProcessingPipeline(ContainerComponentFactory<T, O> componentFactory) {
        return MessageProcessingPipelineBuilder.first(BeforeProcessingContextInterceptorExecutionStage::new).then(BeforeProcessingInterceptorExecutionStage::new).then(MessageListenerExecutionStage::new).thenInTheFuture(ErrorHandlerExecutionStage::new).thenInTheFuture(AfterProcessingInterceptorExecutionStage::new).thenInTheFuture(AfterProcessingContextInterceptorExecutionStage::new).thenInTheFuture(AcknowledgementHandlerExecutionStage::new).build(MessageProcessingConfiguration.builder().interceptors(this.getMessageInterceptors()).messageListener(this.getMessageListener()).errorHandler(this.getErrorHandler()).ackHandler(componentFactory.createAcknowledgementHandler(this.getContainerOptions())).build());
    }

    private TaskExecutor resolveComponentsTaskExecutor() {
        return this.getContainerOptions().getComponentsTaskExecutor() != null ? this.validateCustomExecutor(this.getContainerOptions().getComponentsTaskExecutor()) : this.createTaskExecutor();
    }

    private TaskExecutor validateCustomExecutor(TaskExecutor taskExecutor) {
        CompletableFuture.runAsync(this::verifyThreadType, (Executor)taskExecutor).join();
        return taskExecutor;
    }

    protected BackPressureHandler createBackPressureHandler() {
        return SemaphoreBackPressureHandler.builder().batchSize(this.getContainerOptions().getMaxMessagesPerPoll()).totalPermits(this.getContainerOptions().getMaxConcurrentMessages()).acquireTimeout(this.getContainerOptions().getMaxDelayBetweenPolls()).throughputConfiguration(this.getContainerOptions().getBackPressureMode()).build();
    }

    protected TaskExecutor createSourcesTaskExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        executor.setThreadNamePrefix(this.getId() + "#message_source-");
        return executor;
    }

    protected TaskExecutor createTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        int poolSize = this.getContainerOptions().getMaxConcurrentMessages() * this.messageSources.size();
        executor.setMaxPoolSize(poolSize);
        executor.setCorePoolSize(poolSize);
        executor.setQueueCapacity(poolSize);
        executor.setAllowCoreThreadTimeOut(true);
        executor.setThreadFactory(this.createThreadFactory());
        executor.afterPropertiesSet();
        return executor;
    }

    protected ThreadFactory createThreadFactory() {
        MessageExecutionThreadFactory threadFactory = new MessageExecutionThreadFactory();
        threadFactory.setThreadNamePrefix(this.getId() + "-");
        return threadFactory;
    }

    @Override
    protected void doStop() {
        LifecycleHandler.get().stop(this.messageSources, this.messageSink);
        this.shutdownComponentsTaskExecutor();
        logger.debug("Container {} stopped", (Object)this.getId());
    }

    protected TaskExecutor getComponentsTaskExecutor() {
        return this.componentsTaskExecutor;
    }

    protected TaskExecutor getAcknowledgementResultTaskExecutor() {
        if (this.acknowledgementResultTaskExecutor == null) {
            this.acknowledgementResultTaskExecutor = this.determineAcknowledgementResultExecutor();
        }
        return this.acknowledgementResultTaskExecutor;
    }

    private TaskExecutor determineAcknowledgementResultExecutor() {
        return this.getContainerOptions().getAcknowledgementResultTaskExecutor() != null ? this.validateCustomExecutor(this.getContainerOptions().getAcknowledgementResultTaskExecutor()) : this.createTaskExecutor();
    }

    private void shutdownComponentsTaskExecutor() {
        if (!this.componentsTaskExecutor.equals(this.getContainerOptions().getComponentsTaskExecutor())) {
            LifecycleHandler.get().dispose(this.getComponentsTaskExecutor());
        }
        if (this.acknowledgementResultTaskExecutor != null && !this.acknowledgementResultTaskExecutor.equals(this.getContainerOptions().getAcknowledgementResultTaskExecutor())) {
            LifecycleHandler.get().dispose(this.getAcknowledgementResultTaskExecutor());
        }
    }
}

