/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.boot.autoconfig;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.axonframework.boot.autoconfig.AxonAutoConfiguration;
import org.axonframework.boot.autoconfig.KafkaProperties;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.kafka.eventhandling.consumer.KafkaMessageSource;
import org.axonframework.kafka.eventhandling.consumer.SortedKafkaMessageBuffer;
import org.axonframework.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.kafka.eventhandling.producer.KafkaPublisherConfiguration;
import org.axonframework.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.config.AxonConfiguration;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnClass(value={KafkaPublisher.class})
@EnableConfigurationProperties(value={KafkaProperties.class})
@AutoConfigureAfter(value={AxonAutoConfiguration.class})
public class KafkaAutoConfiguration {
    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @ConditionalOnMissingBean
    @ConditionalOnProperty(value={"axon.kafka.producer.transaction-id-prefix"})
    @Bean
    public ProducerFactory<String, byte[]> producerFactory() {
        Map<String, Object> producer = this.properties.buildProducerProperties();
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix == null) {
            throw new IllegalStateException("transactionalIdPrefix cannot be empty");
        }
        return DefaultProducerFactory.builder(producer).withConfirmationMode(ConfirmationMode.TRANSACTIONAL).withTransactionalIdPrefix(transactionIdPrefix).build();
    }

    @ConditionalOnMissingBean
    @Bean
    @ConditionalOnProperty(value={"axon.kafka.consumer.group-id"})
    public ConsumerFactory<String, byte[]> kafkaConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @ConditionalOnMissingBean
    @Bean(initMethod="start", destroyMethod="shutDown")
    @ConditionalOnBean(value={ProducerFactory.class})
    public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> producerFactory, EventBus eventBus, KafkaMessageConverter<String, byte[]> messageConverter, AxonConfiguration configuration) {
        return new KafkaPublisher(KafkaPublisherConfiguration.builder().withTopic(this.properties.getDefaultTopic()).withMessageConverter(messageConverter).withProducerFactory(producerFactory).withMessageSource((SubscribableMessageSource)eventBus).withMessageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).build());
    }

    @ConditionalOnMissingBean
    @Bean(destroyMethod="shutdown")
    @ConditionalOnBean(value={ConsumerFactory.class, KafkaMessageConverter.class})
    public Fetcher<String, byte[]> kafkaFetcher(ConsumerFactory<String, byte[]> consumerFactory, KafkaMessageConverter<String, byte[]> messageConverter) {
        return AsyncFetcher.builder(consumerFactory).withTopic(this.properties.getDefaultTopic()).withPollTimeout(this.properties.getFetcher().getPollTimeout(), TimeUnit.MILLISECONDS).withMessageConverter(messageConverter).withBufferFactory(() -> new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize())).build();
    }

    @ConditionalOnMissingBean
    @Bean
    @ConditionalOnBean(value={ConsumerFactory.class})
    public KafkaMessageSource<String, byte[]> kafkaMessageSource(Fetcher<String, byte[]> fetcher) {
        return new KafkaMessageSource(fetcher);
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(@Qualifier(value="eventSerializer") Serializer eventSerializer) {
        return new DefaultKafkaMessageConverter(eventSerializer);
    }
}

