/*
 * Decompiled with CFR 0.152.
 */
package org.sdase.commons.server.kafka.topicana;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.sdase.commons.server.kafka.KafkaConfiguration;
import org.sdase.commons.server.kafka.KafkaProperties;
import org.sdase.commons.server.kafka.topicana.ComparisonResult;
import org.sdase.commons.server.kafka.topicana.EvaluationException;
import org.sdase.commons.server.kafka.topicana.ExpectedTopicConfiguration;

@Deprecated
public class TopicComparer {
    public ComparisonResult compare(Collection<ExpectedTopicConfiguration> expectedTopicConfiguration, KafkaConfiguration configuration) {
        try (AdminClient adminClient = AdminClient.create((Properties)KafkaProperties.forAdminClient(configuration));){
            ComparisonResult.ComparisonResultBuilder resultBuilder = new ComparisonResult.ComparisonResultBuilder();
            List<String> topicNames = expectedTopicConfiguration.stream().map(ExpectedTopicConfiguration::getTopicName).collect(Collectors.toList());
            Map<String, TopicDescription> topicDescriptions = this.getTopicDescriptions(resultBuilder, topicNames, adminClient);
            this.compareTopicDescriptions(expectedTopicConfiguration, resultBuilder, topicDescriptions);
            Map<String, Config> topicConfigs = this.getTopicConfigs(topicNames, topicDescriptions, adminClient);
            this.compareTopicConfigs(expectedTopicConfiguration, resultBuilder, topicDescriptions, topicConfigs);
            ComparisonResult comparisonResult = resultBuilder.build();
            return comparisonResult;
        }
    }

    private void compareTopicConfigs(Collection<ExpectedTopicConfiguration> expectedTopicConfiguration, ComparisonResult.ComparisonResultBuilder resultBuilder, Map<String, TopicDescription> topicDescriptions, Map<String, Config> topicConfigs) {
        expectedTopicConfiguration.stream().filter(exp -> topicDescriptions.containsKey(exp.getTopicName())).forEach(exp -> {
            Config config = (Config)topicConfigs.get(exp.getTopicName());
            exp.getProps().forEach((key, value) -> {
                ConfigEntry entry = config.get(key);
                if (entry == null) {
                    resultBuilder.addMismatchingConfiguration(exp.getTopicName(), (String)key, (String)value, null);
                } else if (!value.equals(entry.value())) {
                    resultBuilder.addMismatchingConfiguration(exp.getTopicName(), (String)key, (String)value, entry.value());
                }
            });
        });
    }

    private Map<String, Config> getTopicConfigs(List<String> topicNames, Map<String, TopicDescription> topicDescriptions, AdminClient adminClient) {
        return adminClient.describeConfigs((Collection)topicNames.stream().filter(topicDescriptions::containsKey).map(this::topicNameToResource).collect(Collectors.toList())).values().entrySet().stream().flatMap(tc -> {
            HashMap<String, Object> res = new HashMap<String, Object>();
            try {
                res.put(((ConfigResource)tc.getKey()).name(), ((KafkaFuture)tc.getValue()).get());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EvaluationException("Interrupted Exception during adminClient.describeConfigs", e);
            }
            catch (ExecutionException e) {
                throw new EvaluationException("Exception during adminClient.describeConfigs", e);
            }
            return res.entrySet().stream();
        }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private void compareTopicDescriptions(Collection<ExpectedTopicConfiguration> expectedTopicConfiguration, ComparisonResult.ComparisonResultBuilder resultBuilder, Map<String, TopicDescription> topicDescriptions) {
        expectedTopicConfiguration.stream().filter(exp -> topicDescriptions.containsKey(exp.getTopicName())).forEach(exp -> {
            TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(exp.getTopicName());
            if (exp.getPartitions().isSpecified() && topicDescription.partitions().size() != exp.getPartitions().count()) {
                resultBuilder.addMismatchingPartitionCount(exp.getTopicName(), exp.getPartitions().count(), topicDescription.partitions().size());
            }
            if (!topicDescription.partitions().isEmpty()) {
                int repflicationFactor = ((TopicPartitionInfo)topicDescription.partitions().get(0)).replicas().size();
                if (exp.getReplicationFactor().isSpecified() && repflicationFactor != exp.getReplicationFactor().count()) {
                    resultBuilder.addMismatchingReplicationFactor(exp.getTopicName(), exp.getReplicationFactor().count(), repflicationFactor);
                }
            }
        });
    }

    private Map<String, TopicDescription> getTopicDescriptions(ComparisonResult.ComparisonResultBuilder resultBuilder, List<String> topicNames, AdminClient adminClient) {
        return adminClient.describeTopics(topicNames).values().entrySet().stream().flatMap(desc -> {
            try {
                TopicDescription topicDescription = (TopicDescription)((KafkaFuture)desc.getValue()).get();
                return Stream.of(topicDescription);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                    resultBuilder.addMissingTopic((String)desc.getKey());
                    return Stream.empty();
                }
                throw (KafkaException)e.getCause();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new EvaluationException("InterruptedException during adminClient.describeTopics", e);
            }
        }).collect(Collectors.toMap(TopicDescription::name, i -> i));
    }

    private ConfigResource topicNameToResource(String topicName) {
        return new ConfigResource(ConfigResource.Type.TOPIC, topicName);
    }
}

