/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.test;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;

public class EmbeddedKafkaKraftBroker
implements EmbeddedKafkaBroker {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(EmbeddedKafkaKraftBroker.class));
    public static final String BROKER_LIST_PROPERTY = "spring.embedded.kafka.brokers.property";
    public static final int DEFAULT_ADMIN_TIMEOUT = 10;
    private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent((String)"org.apache.kafka.server.config.AbstractKafkaConfig", (ClassLoader)EmbeddedKafkaKraftBroker.class.getClassLoader());
    private static final Method SET_CONFIG_METHOD = IS_KAFKA_39_OR_LATER ? ReflectionUtils.findMethod(KafkaClusterTestKit.Builder.class, (String)"setConfigProp", (Class[])new Class[]{String.class, Object.class}) : null;
    private final int count;
    private final Set<String> topics;
    private final int partitionsPerTopic;
    private final Properties brokerProperties = new Properties();
    private final AtomicBoolean initialized = new AtomicBoolean();
    private KafkaClusterTestKit cluster;
    private int[] kafkaPorts;
    private Duration adminTimeout = Duration.ofSeconds(10L);
    private String brokerListProperty = "spring.kafka.bootstrap-servers";

    public EmbeddedKafkaKraftBroker(int count, int partitions, String ... topics) {
        this.count = count;
        this.kafkaPorts = new int[this.count];
        this.topics = topics != null ? new HashSet<String>(Arrays.asList(topics)) : new HashSet<String>();
        this.partitionsPerTopic = partitions;
    }

    @Override
    public EmbeddedKafkaBroker brokerProperties(Map<String, String> properties) {
        this.brokerProperties.putAll(properties);
        return this;
    }

    public EmbeddedKafkaBroker brokerProperty(String property, Object value) {
        this.brokerProperties.put(property, value);
        return this;
    }

    @Override
    public EmbeddedKafkaKraftBroker kafkaPorts(int ... ports) {
        Assert.isTrue((ports.length == this.count ? 1 : 0) != 0, (String)("A port must be provided for each instance [" + this.count + "], provided: " + Arrays.toString(ports) + ", use 0 for a random port"));
        this.kafkaPorts = Arrays.copyOf(ports, ports.length);
        return this;
    }

    @Override
    public EmbeddedKafkaBroker brokerListProperty(String brokerListProperty) {
        this.brokerListProperty = brokerListProperty;
        return this;
    }

    @Override
    public EmbeddedKafkaBroker adminTimeout(int adminTimeout) {
        this.adminTimeout = Duration.ofSeconds(adminTimeout);
        return this;
    }

    public void setAdminTimeout(int adminTimeout) {
        this.adminTimeout = Duration.ofSeconds(adminTimeout);
    }

    @Override
    public void afterPropertiesSet() {
        if (this.initialized.compareAndSet(false, true)) {
            this.overrideExitMethods();
            this.addDefaultBrokerPropsIfAbsent();
            this.start();
        }
    }

    private void start() {
        if (this.cluster != null) {
            return;
        }
        try {
            KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setCombined(true).setNumBrokerNodes(this.count).setNumControllerNodes(this.count).build());
            this.brokerProperties.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(k, v) -> EmbeddedKafkaKraftBroker.setConfigProperty(clusterBuilder, (String)k, v)));
            this.cluster = clusterBuilder.build();
        }
        catch (Exception ex) {
            throw new IllegalStateException("Failed to create embedded cluster", ex);
        }
        try {
            this.cluster.format();
            this.cluster.startup();
            this.cluster.waitForReadyBrokers();
        }
        catch (Exception ex) {
            throw new IllegalStateException("Failed to start test Kafka cluster", ex);
        }
        this.createKafkaTopics(this.topics);
        if (this.brokerListProperty == null) {
            this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
        }
        if (this.brokerListProperty != null) {
            System.setProperty(this.brokerListProperty, this.getBrokersAsString());
        }
        System.setProperty("spring.embedded.kafka.brokers", this.getBrokersAsString());
    }

    private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) {
        if (IS_KAFKA_39_OR_LATER) {
            ReflectionUtils.invokeMethod((Method)SET_CONFIG_METHOD, (Object)clusterBuilder, (Object[])new Object[]{key, value});
        } else {
            clusterBuilder.setConfigProp(key, (String)value);
        }
    }

    @Override
    public void destroy() {
        AtomicReference shutdownFailure = new AtomicReference();
        Utils.closeQuietly((AutoCloseable)this.cluster, (String)"embedded Kafka cluster", shutdownFailure);
        if (shutdownFailure.get() != null) {
            throw new IllegalStateException("Failed to shut down embedded Kafka cluster", (Throwable)shutdownFailure.get());
        }
        this.cluster = null;
    }

    private void addDefaultBrokerPropsIfAbsent() {
        this.brokerProperties.putIfAbsent("delete.topic.enable", "true");
        this.brokerProperties.putIfAbsent("group.initial.rebalance.delay.ms", "0");
        this.brokerProperties.putIfAbsent("offsets.topic.replication.factor", "" + this.count);
        this.brokerProperties.putIfAbsent("num.partitions", "" + this.partitionsPerTopic);
    }

    private void logDir(Properties brokerConfigProperties) {
        try {
            brokerConfigProperties.put("log.dir", Files.createTempDirectory("spring.kafka." + UUID.randomUUID(), new FileAttribute[0]).toString());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void overrideExitMethods() {
        String exitMsg = "Exit.%s(%d, %s) called";
        Exit.setExitProcedure((statusCode, message) -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug((Throwable)new RuntimeException(), (CharSequence)String.format(exitMsg, "exit", statusCode, message));
            } else {
                LOGGER.warn((CharSequence)String.format(exitMsg, "exit", statusCode, message));
            }
        });
        Exit.setHaltProcedure((statusCode, message) -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug((Throwable)new RuntimeException(), (CharSequence)String.format(exitMsg, "halt", statusCode, message));
            } else {
                LOGGER.warn((CharSequence)String.format(exitMsg, "halt", statusCode, message));
            }
        });
    }

    @Override
    public void addTopics(String ... topicsToAdd) {
        Assert.notNull((Object)this.cluster, (String)"Broker must be started before this method can be called");
        HashSet<String> set = new HashSet<String>(Arrays.asList(topicsToAdd));
        this.createKafkaTopics(set);
        this.topics.addAll(set);
    }

    @Override
    public void addTopics(NewTopic ... topicsToAdd) {
        Assert.notNull((Object)this.cluster, (String)"Broker must be started before this method can be called");
        for (NewTopic topic : topicsToAdd) {
            Assert.isTrue((boolean)this.topics.add(topic.name()), () -> "topic already exists: " + topic);
            Assert.isTrue((topic.replicationFactor() <= this.count && (topic.replicasAssignments() == null || topic.replicasAssignments().size() <= this.count) ? 1 : 0) != 0, () -> "Embedded kafka does not support the requested replication factor: " + topic);
        }
        this.doWithAdmin(admin -> this.createTopics((AdminClient)admin, Arrays.asList(topicsToAdd)));
    }

    private void createKafkaTopics(Set<String> topicsToCreate) {
        this.doWithAdmin(admin -> this.createTopics((AdminClient)admin, topicsToCreate.stream().map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList())));
    }

    private void createTopics(AdminClient admin, List<NewTopic> newTopics) {
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        try {
            createTopics.all().get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
        }
        catch (Exception e) {
            throw new KafkaException((Throwable)e);
        }
    }

    @Override
    public Map<String, Exception> addTopicsWithResults(String ... topicsToAdd) {
        Assert.notNull((Object)this.cluster, (String)"Broker must be started before this method can be called");
        HashSet<String> set = new HashSet<String>(Arrays.asList(topicsToAdd));
        this.topics.addAll(set);
        return this.createKafkaTopicsWithResults(set);
    }

    @Override
    public Map<String, Exception> addTopicsWithResults(NewTopic ... topicsToAdd) {
        Assert.notNull((Object)this.cluster, (String)"Broker must be started before this method can be called");
        for (NewTopic topic : topicsToAdd) {
            Assert.isTrue((boolean)this.topics.add(topic.name()), () -> "topic already exists: " + topic);
            Assert.isTrue((topic.replicationFactor() <= this.count && (topic.replicasAssignments() == null || topic.replicasAssignments().size() <= this.count) ? 1 : 0) != 0, () -> "Embedded kafka does not support the requested replication factor: " + topic);
        }
        return this.doWithAdminFunction(admin -> this.createTopicsWithResults((AdminClient)admin, Arrays.asList(topicsToAdd)));
    }

    private Map<String, Exception> createKafkaTopicsWithResults(Set<String> topicsToCreate) {
        return this.doWithAdminFunction(admin -> this.createTopicsWithResults((AdminClient)admin, topicsToCreate.stream().map(t -> new NewTopic(t, this.partitionsPerTopic, (short)this.count)).collect(Collectors.toList())));
    }

    private Map<String, Exception> createTopicsWithResults(AdminClient admin, List<NewTopic> newTopics) {
        CreateTopicsResult createTopics = admin.createTopics(newTopics);
        HashMap<String, Exception> results = new HashMap<String, Exception>();
        createTopics.values().entrySet().stream().map(entry -> {
            Exception result;
            try {
                ((KafkaFuture)entry.getValue()).get(this.adminTimeout.getSeconds(), TimeUnit.SECONDS);
                result = null;
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                result = e;
            }
            return new AbstractMap.SimpleEntry<String, Exception>((String)entry.getKey(), result);
        }).forEach(entry -> results.put((String)entry.getKey(), (Exception)entry.getValue()));
        return results;
    }

    public void doWithAdmin(java.util.function.Consumer<AdminClient> callback) {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        try (AdminClient admin = AdminClient.create(adminConfigs);){
            callback.accept(admin);
        }
    }

    public <T> T doWithAdminFunction(Function<AdminClient, T> callback) {
        HashMap<String, String> adminConfigs = new HashMap<String, String>();
        adminConfigs.put("bootstrap.servers", this.getBrokersAsString());
        try (AdminClient admin = AdminClient.create(adminConfigs);){
            T t = callback.apply(admin);
            return t;
        }
    }

    @Override
    public Set<String> getTopics() {
        return new HashSet<String>(this.topics);
    }

    @Override
    public int getPartitionsPerTopic() {
        return this.partitionsPerTopic;
    }

    @Override
    public String getBrokersAsString() {
        return (String)this.cluster.clientProperties().get("bootstrap.servers");
    }

    public KafkaClusterTestKit getCluster() {
        return this.cluster;
    }

    @Override
    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) {
        this.consumeFromEmbeddedTopics(consumer, this.topics.toArray(new String[0]));
    }

    @Override
    public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd) {
        this.consumeFromEmbeddedTopics(consumer, seekToEnd, this.topics.toArray(new String[0]));
    }

    @Override
    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) {
        this.consumeFromEmbeddedTopics(consumer, topic);
    }

    @Override
    public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, boolean seekToEnd, String topic) {
        this.consumeFromEmbeddedTopics(consumer, seekToEnd, topic);
    }

    @Override
    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, String ... topicsToConsume) {
        this.consumeFromEmbeddedTopics(consumer, false, topicsToConsume);
    }

    @Override
    public void consumeFromEmbeddedTopics(Consumer<?, ?> consumer, boolean seekToEnd, String ... topicsToConsume) {
        List notEmbedded = Arrays.stream(topicsToConsume).filter(topic -> !this.topics.contains(topic)).collect(Collectors.toList());
        if (!notEmbedded.isEmpty()) {
            throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list");
        }
        final AtomicReference assigned = new AtomicReference();
        consumer.subscribe(Arrays.asList(topicsToConsume), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                assigned.set(partitions);
                LOGGER.debug(() -> "partitions assigned: " + partitions);
            }
        });
        int n = 0;
        while (assigned.get() == null && n++ < 600) {
            consumer.poll(Duration.ofMillis(100L));
        }
        if (assigned.get() != null) {
            LOGGER.debug(() -> "Partitions assigned " + assigned.get() + "; re-seeking to " + (seekToEnd ? "end; " : "beginning"));
            if (seekToEnd) {
                consumer.seekToEnd((Collection)assigned.get());
                ((Collection)assigned.get()).forEach(arg_0 -> consumer.position(arg_0));
            } else {
                consumer.seekToBeginning((Collection)assigned.get());
            }
        } else {
            throw new IllegalStateException("Failed to be assigned partitions from the embedded topics");
        }
        LOGGER.debug((CharSequence)"Subscription Initiated");
    }
}

