/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.config;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;

@Configuration
@ConditionalOnMissingBean(value={Binder.class})
@Import(value={KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaPropertiesConfiguration.class})
@EnableConfigurationProperties(value={KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
public class KafkaBinderConfiguration {
    protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
    @Autowired
    private Codec codec;
    @Autowired
    private KafkaBinderConfigurationProperties configurationProperties;
    @Autowired
    private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
    @Autowired
    private ProducerListener producerListener;
    @Autowired
    private ApplicationContext context;
    @Autowired(required=false)
    private AdminUtilsOperation adminUtilsOperation;

    @Bean
    KafkaTopicProvisioner provisioningProvider() {
        return new KafkaTopicProvisioner(this.configurationProperties, this.adminUtilsOperation);
    }

    @Bean
    KafkaMessageChannelBinder kafkaMessageChannelBinder() {
        KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(this.configurationProperties, this.provisioningProvider());
        kafkaMessageChannelBinder.setCodec(this.codec);
        kafkaMessageChannelBinder.setProducerListener((ProducerListener<byte[], byte[]>)this.producerListener);
        kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
        return kafkaMessageChannelBinder;
    }

    @Bean
    @ConditionalOnMissingBean(value={ProducerListener.class})
    ProducerListener producerListener() {
        return new LoggingProducerListener();
    }

    @Bean
    KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        if (!ObjectUtils.isEmpty((Object)this.configurationProperties.getConfiguration())) {
            props.putAll(this.configurationProperties.getConfiguration());
        }
        if (!props.containsKey("bootstrap.servers")) {
            props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(props);
        return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, (ConsumerFactory<?, ?>)consumerFactory);
    }

    @Bean(name={"adminUtilsOperation"})
    @Conditional(value={Kafka09Present.class})
    @ConditionalOnClass(name={"kafka.admin.AdminUtils"})
    public AdminUtilsOperation kafka09AdminUtilsOperation() {
        logger.info((Object)"AdminUtils selected: Kafka 0.9 AdminUtils");
        return new Kafka09AdminUtilsOperation();
    }

    @Bean(name={"adminUtilsOperation"})
    @Conditional(value={Kafka10Present.class})
    @ConditionalOnClass(name={"kafka.admin.AdminUtils"})
    public AdminUtilsOperation kafka10AdminUtilsOperation() {
        logger.info((Object)"AdminUtils selected: Kafka 0.10 AdminUtils");
        return new Kafka10AdminUtilsOperation();
    }

    @Bean
    public ApplicationListener<?> jaasInitializer() throws IOException {
        return new KafkaBinderJaasInitializerListener();
    }

    @ConditionalOnClass(name={"org.springframework.boot.autoconfigure.kafka.KafkaProperties"})
    public static class KafkaPropertiesConfiguration {
        @Autowired(required=false)
        private KafkaProperties kafkaProperties;
        @Autowired
        private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;

        @PostConstruct
        public void init() {
            Map configuration = this.kafkaBinderConfigurationProperties.getConfiguration();
            if (this.kafkaProperties != null) {
                for (Map.Entry properties : this.kafkaProperties.getProperties().entrySet()) {
                    if (configuration.containsKey(properties.getKey())) continue;
                    configuration.put(properties.getKey(), properties.getValue());
                }
                for (Map.Entry producerProperties : this.kafkaProperties.buildProducerProperties().entrySet()) {
                    if (configuration.containsKey(producerProperties.getKey())) continue;
                    configuration.put(producerProperties.getKey(), producerProperties.getValue());
                }
                for (Map.Entry consumerProperties : this.kafkaProperties.buildConsumerProperties().entrySet()) {
                    if (configuration.containsKey(consumerProperties.getKey())) continue;
                    configuration.put(consumerProperties.getKey(), consumerProperties.getValue());
                }
                if (ObjectUtils.isEmpty(configuration.get("bootstrap.servers"))) {
                    configuration.put("bootstrap.servers", this.kafkaBinderConfigurationProperties.getKafkaConnectionString());
                } else {
                    List bootStrapServers = (List)configuration.get("bootstrap.servers");
                    if (bootStrapServers.size() == 1 && ((String)bootStrapServers.get(0)).equals("localhost:9092")) {
                        configuration.put("bootstrap.servers", this.kafkaBinderConfigurationProperties.getKafkaConnectionString());
                    }
                }
            }
        }
    }

    public static class JaasConfigurationProperties {
        private JaasLoginModuleConfiguration kafka;
        private JaasLoginModuleConfiguration zookeeper;
    }

    static class Kafka09Present
    implements Condition {
        Kafka09Present() {
        }

        public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
            return AppInfoParser.getVersion().startsWith("0.9");
        }
    }

    static class Kafka10Present
    implements Condition {
        Kafka10Present() {
        }

        public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
            return AppInfoParser.getVersion().startsWith("0.10");
        }
    }
}

