/*
 * Decompiled with CFR 0.152.
 */
package org.sdase.commons.server.kafka;

import com.codahale.metrics.health.HealthCheck;
import io.dropwizard.Configuration;
import io.dropwizard.ConfiguredBundle;
import io.dropwizard.lifecycle.Managed;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.sdase.commons.server.dropwizard.lifecycle.ManagedShutdownListener;
import org.sdase.commons.server.kafka.KafkaConfiguration;
import org.sdase.commons.server.kafka.KafkaConfigurationProvider;
import org.sdase.commons.server.kafka.KafkaProperties;
import org.sdase.commons.server.kafka.builder.MessageListenerRegistration;
import org.sdase.commons.server.kafka.builder.ProducerRegistration;
import org.sdase.commons.server.kafka.config.ConsumerConfig;
import org.sdase.commons.server.kafka.config.ListenerConfig;
import org.sdase.commons.server.kafka.config.ProducerConfig;
import org.sdase.commons.server.kafka.config.TopicConfig;
import org.sdase.commons.server.kafka.consumer.MessageListener;
import org.sdase.commons.server.kafka.exception.ConfigurationException;
import org.sdase.commons.server.kafka.exception.TopicCreationException;
import org.sdase.commons.server.kafka.health.ExternalKafkaHealthCheck;
import org.sdase.commons.server.kafka.health.KafkaHealthCheck;
import org.sdase.commons.server.kafka.producer.KafkaMessageProducer;
import org.sdase.commons.server.kafka.producer.MessageProducer;
import org.sdase.commons.server.kafka.prometheus.ConsumerTopicMessageHistogram;
import org.sdase.commons.server.kafka.prometheus.KafkaConsumerMetrics;
import org.sdase.commons.server.kafka.prometheus.ProducerTopicMessageCounter;
import org.sdase.commons.server.kafka.topicana.ComparisonResult;
import org.sdase.commons.server.kafka.topicana.EvaluationException;
import org.sdase.commons.server.kafka.topicana.ExpectedTopicConfiguration;
import org.sdase.commons.server.kafka.topicana.MismatchedTopicConfigException;
import org.sdase.commons.server.kafka.topicana.TopicComparer;
import org.sdase.commons.server.kafka.topicana.TopicConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBundle<C extends Configuration>
implements ConfiguredBundle<C> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaBundle.class);
    public static final String HEALTHCHECK_NAME = "kafkaConnection";
    public static final String EXTERNAL_HEALTHCHECK_NAME = "kafkaConnectionExternal";
    private final Function<C, KafkaConfiguration> configurationProvider;
    private KafkaConfiguration kafkaConfiguration;
    private final boolean healthCheckDisabled;
    private ProducerTopicMessageCounter topicProducerCounterSpec;
    private ConsumerTopicMessageHistogram topicConsumerHistogram;
    private final List<MessageListener<?, ?>> messageListeners = new ArrayList();
    private final List<ThreadedMessageListener<?, ?>> threadedMessageListeners = new ArrayList();
    private final List<KafkaMessageProducer<?, ?>> messageProducers = new ArrayList();
    private final Map<String, ExpectedTopicConfiguration> topics = new HashMap<String, ExpectedTopicConfiguration>();
    private KafkaHealthCheck kafkaHealthCheck;

    private KafkaBundle(KafkaConfigurationProvider<C> configurationProvider, boolean healthCheckDisabled) {
        this.configurationProvider = configurationProvider;
        this.healthCheckDisabled = healthCheckDisabled;
    }

    public static InitialBuilder builder() {
        return new Builder();
    }

    public void initialize(Bootstrap<?> bootstrap) {
    }

    public void run(C configuration, Environment environment) {
        this.kafkaConfiguration = this.configurationProvider.apply(configuration);
        this.kafkaConfiguration.getTopics().forEach((k, v) -> this.topics.put((String)k, this.createTopicDescription((TopicConfig)v)));
        if (!this.kafkaConfiguration.isDisabled()) {
            if (this.healthCheckDisabled) {
                this.kafkaHealthCheck = new ExternalKafkaHealthCheck(this.kafkaConfiguration);
                environment.healthChecks().register(EXTERNAL_HEALTHCHECK_NAME, (HealthCheck)this.kafkaHealthCheck);
            } else {
                this.kafkaHealthCheck = new KafkaHealthCheck(this.kafkaConfiguration);
                environment.healthChecks().register(HEALTHCHECK_NAME, (HealthCheck)this.kafkaHealthCheck);
            }
        }
        this.topicProducerCounterSpec = new ProducerTopicMessageCounter();
        this.topicConsumerHistogram = new ConsumerTopicMessageHistogram();
        new KafkaConsumerMetrics(this.messageListeners);
        this.setupManagedThreadManager(environment);
    }

    public ExpectedTopicConfiguration getTopicConfiguration(String name) throws ConfigurationException {
        if (this.topics.get(name) == null) {
            throw new ConfigurationException(String.format("Topic with name '%s' seems not to be part of the read configuration. Please check the name and configuration.", name));
        }
        return this.topics.get(name);
    }

    public <K, V> List<MessageListener<K, V>> createMessageListener(MessageListenerRegistration<K, V> registration) {
        ComparisonResult comparisonResult;
        if (this.kafkaConfiguration.isDisabled()) {
            return Collections.emptyList();
        }
        this.checkInit();
        if (registration.isCheckTopicConfiguration() && !(comparisonResult = this.checkTopics(registration.getTopics())).ok()) {
            throw new MismatchedTopicConfigException(comparisonResult);
        }
        ListenerConfig listenerConfig = registration.getListenerConfig();
        if (listenerConfig == null && registration.getListenerConfigName() != null && (listenerConfig = this.kafkaConfiguration.getListenerConfig().get(registration.getListenerConfigName())) == null) {
            throw new ConfigurationException(String.format("Listener config with name '%s' cannot be found within the current configuration.", registration.getListenerConfigName()));
        }
        if (listenerConfig == null) {
            throw new ConfigurationException("No valid listener config given within the MessageHandlerRegistration");
        }
        if (registration.getStrategy() == null) {
            throw new IllegalStateException("A strategy is mandatory for message listeners.");
        }
        ArrayList<MessageListener<K, V>> listener = new ArrayList<MessageListener<K, V>>(listenerConfig.getInstances());
        for (int i = 0; i < listenerConfig.getInstances(); ++i) {
            registration.getStrategy().init(this.topicConsumerHistogram);
            MessageListener<K, V> instance = new MessageListener<K, V>(registration.getTopicsNames(), this.createConsumer(registration, i), listenerConfig, registration.getStrategy());
            listener.add(instance);
            Thread t = new Thread(instance);
            t.start();
            this.threadedMessageListeners.add(new ThreadedMessageListener(instance, t));
        }
        this.messageListeners.addAll(listener);
        return listener;
    }

    public <K, V> MessageProducer<K, V> registerProducer(ProducerRegistration<K, V> registration) throws ConfigurationException {
        ComparisonResult comparisonResult;
        if (this.kafkaConfiguration.isDisabled()) {
            return new MessageProducer<K, V>(){

                @Override
                public Future<RecordMetadata> send(K key, V value) {
                    return null;
                }

                @Override
                public Future<RecordMetadata> send(K key, V value, Headers headers) {
                    return null;
                }
            };
        }
        this.checkInit();
        if (registration.isCreateTopicIfMissing()) {
            this.createNotExistingTopics(Collections.singletonList(registration.getTopic()));
        }
        if (registration.isCheckTopicConfiguration() && !(comparisonResult = this.checkTopics(Collections.singletonList(registration.getTopic()))).ok()) {
            throw new MismatchedTopicConfigException(comparisonResult);
        }
        KafkaProducer<K, V> producer = this.createProducer(registration);
        Map.Entry entry = producer.metrics().entrySet().stream().findFirst().orElse(null);
        String clientId = entry != null ? (String)((MetricName)entry.getKey()).tags().get("client-id") : "";
        KafkaMessageProducer<K, V> messageProducer = new KafkaMessageProducer<K, V>(registration.getTopicName(), producer, this.topicProducerCounterSpec, clientId);
        this.messageProducers.add(messageProducer);
        return messageProducer;
    }

    private void createNotExistingTopics(Collection<ExpectedTopicConfiguration> topics) {
        if (this.kafkaConfiguration.isDisabled()) {
            return;
        }
        ComparisonResult comparisonResult = this.checkTopics(topics);
        if (!comparisonResult.getMissingTopics().isEmpty()) {
            this.createTopics(topics.stream().filter(t -> comparisonResult.getMissingTopics().contains(t.getTopicName())).collect(Collectors.toList()));
        }
    }

    private ComparisonResult checkTopics(Collection<ExpectedTopicConfiguration> topics) {
        if (this.kafkaConfiguration.isDisabled()) {
            return new ComparisonResult.ComparisonResultBuilder().build();
        }
        TopicComparer topicComparer = new TopicComparer();
        return topicComparer.compare(topics, this.kafkaConfiguration);
    }

    private void createTopics(Collection<ExpectedTopicConfiguration> topics) {
        try (AdminClient adminClient = AdminClient.create((Properties)KafkaProperties.forAdminClient(this.kafkaConfiguration));){
            List topicList = topics.stream().map(t -> {
                int partitions = 1;
                int replications = 1;
                if (!t.getPartitions().isSpecified()) {
                    LOGGER.warn("Partitions for topic '{}' is not specified. Using default value 1", (Object)t.getTopicName());
                } else {
                    partitions = t.getPartitions().count();
                }
                if (!t.getReplicationFactor().isSpecified()) {
                    LOGGER.warn("Replication factor for topic '{}' is not specified. Using default value 1", (Object)t.getTopicName());
                } else {
                    replications = t.getReplicationFactor().count();
                }
                return new NewTopic(t.getTopicName(), partitions, (short)replications).configs(t.getProps());
            }).collect(Collectors.toList());
            CreateTopicsResult result = adminClient.createTopics(topicList);
            result.all().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new EvaluationException("Exception during adminClient.createTopics", e);
        }
        catch (ExecutionException e) {
            throw new TopicCreationException("TopicConfig creation failed", e);
        }
    }

    private ExpectedTopicConfiguration createTopicDescription(TopicConfig c) {
        return TopicConfigurationBuilder.builder(c.getName()).withPartitionCount(c.getPartitions()).withReplicationFactor(c.getReplicationFactor()).withConfig(c.getConfig()).build();
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> keyDeSerializer, Deserializer<V> valueDeSerializer, String consumerConfigName) {
        ConsumerConfig consumerConfig = this.getConsumerConfiguration(consumerConfigName);
        return this.createConsumer(keyDeSerializer, valueDeSerializer, consumerConfig, 0);
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> keyDeSerializer, Deserializer<V> valueDeSerializer, ConsumerConfig consumerConfig, int instanceId) {
        KafkaProperties consumerProperties = KafkaProperties.forConsumer(this.kafkaConfiguration);
        if (consumerConfig != null) {
            consumerProperties.putAll(consumerConfig.getConfig());
            consumerProperties.put("client.id", consumerConfig.getClientId() + "-" + instanceId);
        }
        return new KafkaConsumer((Properties)consumerProperties, keyDeSerializer, valueDeSerializer);
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerConfig producerConfig) {
        KafkaProperties producerProperties = KafkaProperties.forProducer(this.kafkaConfiguration);
        if (producerConfig != null) {
            producerConfig.getConfig().forEach(producerProperties::put);
        }
        return new KafkaProducer((Properties)producerProperties, keySerializer, valueSerializer);
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer, String producerConfigName) {
        ProducerConfig producerConfig = this.getProducerConfiguration(producerConfigName);
        if (producerConfig != null && producerConfig.getClientId() == null) {
            producerConfig.setClientId(producerConfigName);
        }
        return this.createProducer(keySerializer, valueSerializer, producerConfig);
    }

    private <K, V> KafkaProducer<K, V> createProducer(ProducerRegistration<K, V> registration) {
        ProducerConfig producerConfig = registration.getProducerConfig();
        if (producerConfig == null && registration.getProducerConfigName() != null) {
            producerConfig = this.getProducerConfiguration(registration.getProducerConfigName());
        }
        if (producerConfig != null && producerConfig.getClientId() == null) {
            producerConfig.setClientId(registration.getProducerConfigName());
        }
        return this.createProducer(registration.getKeySerializer(), registration.getValueSerializer(), producerConfig);
    }

    public ConsumerConfig getConsumerConfiguration(String name) {
        if (!this.kafkaConfiguration.getConsumers().containsKey(name)) {
            throw new ConfigurationException(String.format("Consumer config with name '%s' cannot be found within the current configuration.", name));
        }
        return this.kafkaConfiguration.getConsumers().get(name);
    }

    public ProducerConfig getProducerConfiguration(String name) {
        if (!this.kafkaConfiguration.getProducers().containsKey(name)) {
            throw new ConfigurationException(String.format("Producer config with name '%s' cannot be found within the current configuration.", name));
        }
        return this.kafkaConfiguration.getProducers().get(name);
    }

    private <K, V> KafkaConsumer<K, V> createConsumer(MessageListenerRegistration<K, V> registration, int instanceId) {
        ConsumerConfig consumerConfig = registration.getConsumerConfig();
        if (consumerConfig == null && registration.getConsumerConfigName() != null) {
            consumerConfig = this.getConsumerConfiguration(registration.getConsumerConfigName());
        }
        if (consumerConfig != null) {
            this.applyForcedConfigFromStrategy(registration, consumerConfig);
            registration.getStrategy().verifyConsumerConfig(consumerConfig.getConfig());
        }
        if (consumerConfig != null && consumerConfig.getClientId() == null) {
            consumerConfig.setClientId(registration.getConsumerConfigName());
        }
        return this.createConsumer(registration.getKeyDeserializer(), registration.getValueDeserializer(), consumerConfig, instanceId);
    }

    private void applyForcedConfigFromStrategy(MessageListenerRegistration<?, ?> registration, ConsumerConfig consumerConfig) {
        HashMap<String, String> newConfig = new HashMap<String, String>(consumerConfig.getConfig());
        Map<String, String> forcedConfig = registration.getStrategy().forcedConfigToApply();
        for (Map.Entry<String, String> configEntry : forcedConfig.entrySet()) {
            String key = configEntry.getKey();
            String newValue = configEntry.getValue();
            String oldValue = (String)newConfig.get(key);
            if (oldValue == null) {
                LOGGER.info("Setting in consumer config: '{}'='{}' (forced from strategy {})", new Object[]{key, newValue, registration.getStrategy().getClass().getSimpleName()});
            } else if (!newValue.equals(oldValue)) {
                LOGGER.warn("Overwriting in consumer config: '{}'='{}' with new value '{}' (forced from strategy {})", new Object[]{key, oldValue, newValue, registration.getStrategy().getClass().getSimpleName()});
            }
            newConfig.put(key, newValue);
        }
        consumerConfig.setConfig(newConfig);
    }

    private void checkInit() {
        if (this.kafkaConfiguration == null) {
            throw new IllegalStateException("KafkaConfiguration not yet initialized!");
        }
    }

    private void setupManagedThreadManager(Environment environment) {
        environment.lifecycle().manage((Managed)ManagedShutdownListener.onShutdown(() -> {
            this.shutdownConsumerThreads();
            this.stopProducers();
            this.shutdownKafkaHealthCheck();
        }));
    }

    private void stopProducers() {
        this.messageProducers.forEach(p -> {
            try {
                p.close();
            }
            catch (InterruptException ie) {
                LOGGER.error("Error closing producer", (Throwable)ie);
                Thread.currentThread().interrupt();
            }
        });
        this.topicProducerCounterSpec.unregister();
    }

    private void shutdownConsumerThreads() {
        this.threadedMessageListeners.forEach(l -> ((ThreadedMessageListener)l).messageListener.stopConsumer());
        this.threadedMessageListeners.forEach(l -> {
            try {
                ((ThreadedMessageListener)l).thread.join();
            }
            catch (InterruptedException e) {
                LOGGER.warn("Error while shutting down consumer threads", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        this.topicConsumerHistogram.unregister();
    }

    private void shutdownKafkaHealthCheck() {
        if (this.kafkaHealthCheck != null) {
            this.kafkaHealthCheck.shutdown();
        }
    }

    private static class ThreadedMessageListener<K, V> {
        private final MessageListener<K, V> messageListener;
        private final Thread thread;

        private ThreadedMessageListener(MessageListener<K, V> messageListener, Thread thread) {
            this.messageListener = messageListener;
            this.thread = thread;
        }
    }

    public static class Builder<T extends Configuration>
    implements InitialBuilder,
    FinalBuilder<T> {
        private KafkaConfigurationProvider<T> configurationProvider;
        private boolean healthCheckDisabled = false;

        private Builder() {
        }

        private Builder(KafkaConfigurationProvider<T> configurationProvider) {
            this.configurationProvider = configurationProvider;
        }

        @Override
        public FinalBuilder<T> withoutHealthCheck() {
            this.healthCheckDisabled = true;
            return this;
        }

        @Override
        public KafkaBundle<T> build() {
            return new KafkaBundle(this.configurationProvider, this.healthCheckDisabled);
        }

        @Override
        public <C extends Configuration> FinalBuilder<C> withConfigurationProvider(KafkaConfigurationProvider<C> configurationProvider) {
            return new Builder<C>(configurationProvider);
        }
    }

    public static interface FinalBuilder<T extends Configuration> {
        public FinalBuilder<T> withoutHealthCheck();

        public KafkaBundle<T> build();
    }

    public static interface InitialBuilder {
        public <C extends Configuration> FinalBuilder<C> withConfigurationProvider(@NotNull KafkaConfigurationProvider<C> var1);
    }
}

