/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration.utils;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaEmbedded {
    private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
    private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
    private final Properties effectiveConfig;
    private final File logDir;
    private final TemporaryFolder tmpFolder = new TemporaryFolder();
    private final KafkaServer kafka;

    public KafkaEmbedded(Properties config, MockTime time) throws IOException {
        this.tmpFolder.create();
        this.logDir = this.tmpFolder.newFolder();
        this.effectiveConfig = this.effectiveConfigFrom(config);
        boolean loggingEnabled = true;
        KafkaConfig kafkaConfig = new KafkaConfig((Map)this.effectiveConfig, true);
        log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", (Object)this.logDir, (Object)this.zookeeperConnect());
        this.kafka = TestUtils.createServer((KafkaConfig)kafkaConfig, (Time)time);
        log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", (Object)this.brokerList(), (Object)this.zookeeperConnect());
    }

    private Properties effectiveConfigFrom(Properties initialConfig) {
        Properties effectiveConfig = new Properties();
        effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), (Object)0);
        effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "localhost");
        effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092");
        effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), (Object)1);
        effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), (Object)true);
        effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), (Object)1000000);
        effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), (Object)true);
        effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), (Object)10000);
        effectiveConfig.putAll((Map<?, ?>)initialConfig);
        effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), this.logDir.getAbsolutePath());
        return effectiveConfig;
    }

    public String brokerList() {
        Object listenerConfig = this.effectiveConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
        return this.kafka.config().hostName() + ":" + this.kafka.boundPort(new ListenerName(listenerConfig != null ? listenerConfig.toString() : "PLAINTEXT"));
    }

    public String zookeeperConnect() {
        return this.effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT);
    }

    public void stop() {
        log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", (Object)this.brokerList(), (Object)this.zookeeperConnect());
        this.kafka.shutdown();
        this.kafka.awaitShutdown();
        log.debug("Removing log dir at {} ...", (Object)this.logDir);
        try {
            Utils.delete((File)this.logDir);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.tmpFolder.delete();
        log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", (Object)this.brokerList(), (Object)this.zookeeperConnect());
    }

    public void createTopic(String topic) {
        this.createTopic(topic, 1, 1, Collections.emptyMap());
    }

    public void createTopic(String topic, int partitions, int replication) {
        this.createTopic(topic, partitions, replication, Collections.emptyMap());
    }

    public void createTopic(String topic, int partitions, int replication, Map<String, String> topicConfig) {
        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", new Object[]{topic, partitions, replication, topicConfig});
        NewTopic newTopic = new NewTopic(topic, partitions, (short)replication);
        newTopic.configs(topicConfig);
        try (AdminClient adminClient = this.createAdminClient();){
            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public AdminClient createAdminClient() {
        Properties adminClientConfig = new Properties();
        adminClientConfig.put("bootstrap.servers", this.brokerList());
        Object listeners = this.effectiveConfig.get(KafkaConfig$.MODULE$.ListenersProp());
        if (listeners != null && listeners.toString().contains("SSL")) {
            adminClientConfig.put("ssl.truststore.location", this.effectiveConfig.get("ssl.truststore.location"));
            adminClientConfig.put("ssl.truststore.password", ((Password)this.effectiveConfig.get("ssl.truststore.password")).value());
            adminClientConfig.put("security.protocol", "SSL");
        }
        return AdminClient.create((Properties)adminClientConfig);
    }

    public void deleteTopic(String topic) {
        block14: {
            log.debug("Deleting topic { name: {} }", (Object)topic);
            try (AdminClient adminClient = this.createAdminClient();){
                adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
            }
            catch (InterruptedException | ExecutionException e) {
                if (e.getCause() instanceof UnknownTopicOrPartitionException) break block14;
                throw new RuntimeException(e);
            }
        }
    }

    public KafkaServer kafkaServer() {
        return this.kafka;
    }
}

