/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.UnwindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.WindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class InternalTopicManagerTest {
    private final Node broker1 = new Node(0, "dummyHost-1", 1234);
    private final Node broker2 = new Node(1, "dummyHost-2", 1234);
    private final List<Node> cluster = new ArrayList<Node>(2){
        {
            this.add(InternalTopicManagerTest.this.broker1);
            this.add(InternalTopicManagerTest.this.broker2);
        }
    };
    private final String topic1 = "test_topic";
    private final String topic2 = "test_topic_2";
    private final String topic3 = "test_topic_3";
    private final String topic4 = "test_topic_4";
    private final String topic5 = "test_topic_5";
    private final List<Node> singleReplica = Collections.singletonList(this.broker1);
    private String threadName;
    private MockAdminClient mockAdminClient;
    private InternalTopicManager internalTopicManager;
    private final Map<String, Object> config = new HashMap<String, Object>(){
        {
            this.put("application.id", "app-id");
            this.put("bootstrap.servers", InternalTopicManagerTest.this.broker1.host() + ":" + InternalTopicManagerTest.this.broker1.port());
            this.put("replication.factor", 1);
            this.put(StreamsConfig.producerPrefix((String)"batch.size"), 16384);
            this.put(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms"), 100);
            this.put("retry.backoff.ms", 10);
        }
    };

    @Before
    public void init() {
        this.threadName = Thread.currentThread().getName();
        this.mockAdminClient = new MockAdminClient(this.cluster, this.broker1);
        this.internalTopicManager = new InternalTopicManager(Time.SYSTEM, (Admin)this.mockAdminClient, new StreamsConfig(this.config));
    }

    @After
    public void shutdown() {
        this.mockAdminClient.close();
    }

    @Test
    public void shouldCreateTopics() throws Exception {
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        this.internalTopicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)}));
        Set newlyCreatedTopics = (Set)this.mockAdminClient.listTopics().names().get();
        MatcherAssert.assertThat((Object)newlyCreatedTopics.size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)newlyCreatedTopics, (Matcher)Matchers.hasItem((Object)"test_topic"));
        MatcherAssert.assertThat((Object)newlyCreatedTopics, (Matcher)Matchers.hasItem((Object)"test_topic_2"));
    }

    @Test
    public void shouldNotCreateTopicsWithEmptyInput() throws Exception {
        this.internalTopicManager.setup(Collections.emptyMap());
        Set newlyCreatedTopics = (Set)this.mockAdminClient.listTopics().names().get();
        MatcherAssert.assertThat((Object)newlyCreatedTopics, (Matcher)Matchers.empty());
    }

    @Test
    public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
        AdminClient admin = (AdminClient)EasyMock.createMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager((Time)new MockTime(1L), (Admin)admin, streamsConfig);
        KafkaFutureImpl createTopicFailFuture = new KafkaFutureImpl();
        createTopicFailFuture.completeExceptionally((Throwable)new TopicExistsException("exists"));
        KafkaFutureImpl createTopicSuccessfulFuture = new KafkaFutureImpl();
        createTopicSuccessfulFuture.complete((Object)new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        NewTopic newTopic1 = this.newTopic("test_topic", internalTopicConfig1, streamsConfig);
        NewTopic newTopic2 = this.newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic1, newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicSuccessfulFuture), Utils.mkEntry((Object)"test_topic_2", (Object)createTopicFailFuture)})));
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_2", (Object)createTopicSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)}));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldRetryCreateTopicWhenCreationTimesOut() {
        this.shouldRetryCreateTopicWhenRetriableExceptionIsThrown((Exception)new TimeoutException("timed out"));
    }

    @Test
    public void shouldRetryCreateTopicWhenTopicNotYetDeleted() {
        this.shouldRetryCreateTopicWhenRetriableExceptionIsThrown((Exception)new TopicExistsException("exists"));
    }

    private void shouldRetryCreateTopicWhenRetriableExceptionIsThrown(Exception retriableException) {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        KafkaFutureImpl createTopicFailFuture = new KafkaFutureImpl();
        createTopicFailFuture.completeExceptionally((Throwable)retriableException);
        KafkaFutureImpl createTopicSuccessfulFuture = new KafkaFutureImpl();
        createTopicSuccessfulFuture.complete((Object)new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        NewTopic newTopic = this.newTopic("test_topic", internalTopicConfig, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicSuccessfulFuture)})));
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_2", (Object)createTopicSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig)}));
    }

    @Test
    public void shouldThrowInformativeExceptionForOlderBrokers() {
        MockAdminClient admin = new MockAdminClient(){

            public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
                CreateTopicsRequestData.CreatableTopic topicToBeCreated = new CreateTopicsRequestData.CreatableTopic();
                topicToBeCreated.setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection());
                topicToBeCreated.setNumPartitions(1);
                topicToBeCreated.setReplicationFactor((short)-1);
                CreateTopicsRequestData.CreatableTopicCollection topicsToBeCreated = new CreateTopicsRequestData.CreatableTopicCollection();
                topicsToBeCreated.add((ImplicitLinkedHashCollection.Element)topicToBeCreated);
                try {
                    new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(topicsToBeCreated).setTimeoutMs(0).setValidateOnly(options.shouldValidateOnly())).build((short)3);
                    throw new IllegalStateException("Building CreateTopicRequest should have thrown.");
                }
                catch (UnsupportedVersionException expected) {
                    KafkaFutureImpl future = new KafkaFutureImpl();
                    future.completeExceptionally((Throwable)expected);
                    return new CreateTopicsResult(Collections.singletonMap("test_topic", future)){};
                }
            }
        };
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        topicConfig.setNumberOfPartitions(1);
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.lambda$shouldThrowInformativeExceptionForOlderBrokers$4(topicManager, (InternalTopicConfig)topicConfig));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Could not create topic test_topic, because brokers don't support configuration replication.factor=-1. You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
    }

    @Test
    public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
        this.setupTopicInMockAdminClient("test_topic", Collections.emptyMap());
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        TimeoutException exception = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.internalTopicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig)));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)("Could not create internal topics within " + (Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 2 + " milliseconds. This can happen if the Kafka cluster is temporarily not available or a topic is marked for deletion and the broker did not complete its deletion within the timeout. The last errors seen per topic are: {" + "test_topic" + "=org.apache.kafka.common.errors.TopicExistsException: Topic test_topic exists already.}")));
    }

    @Test
    public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        KafkaFutureImpl createTopicFailFuture = new KafkaFutureImpl();
        createTopicFailFuture.completeExceptionally((Throwable)new IllegalStateException("Nobody expects the Spanish inquisition"));
        NewTopic newTopic = this.newTopic("test_topic", internalTopicConfig, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic}))).andStubAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicFailFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(StreamsException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig)})));
    }

    @Test
    public void shouldThrowWhenCreateTopicsResultsDoNotContainTopic() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        NewTopic newTopic = this.newTopic("test_topic", internalTopicConfig, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic}))).andStubAnswer(() -> new MockCreateTopicsResult(Collections.singletonMap("test_topic_2", new KafkaFutureImpl())));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(IllegalStateException.class, () -> topicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenCreateTopicExceedsTimeout() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, streamsConfig);
        KafkaFutureImpl createTopicFailFuture = new KafkaFutureImpl();
        createTopicFailFuture.completeExceptionally((Throwable)new TimeoutException());
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        NewTopic newTopic = this.newTopic("test_topic", internalTopicConfig, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic}))).andStubAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicFailFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(TimeoutException.class, () -> topicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringSetup() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, streamsConfig);
        KafkaFutureImpl createTopicFutureThatNeverCompletes = new KafkaFutureImpl();
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        NewTopic newTopic = this.newTopic("test_topic", internalTopicConfig, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic}))).andStubAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicFutureThatNeverCompletes)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(TimeoutException.class, () -> topicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldCleanUpWhenUnexpectedExceptionIsThrownDuringSetup() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        this.setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);
        KafkaFutureImpl deleteTopicSuccessfulFuture = new KafkaFutureImpl();
        deleteTopicSuccessfulFuture.complete(null);
        EasyMock.expect((Object)admin.deleteTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic"}))).andAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(StreamsException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)})));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldCleanUpWhenCreateTopicsResultsDoNotContainTopic() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        KafkaFutureImpl createTopicFailFuture1 = new KafkaFutureImpl();
        createTopicFailFuture1.completeExceptionally((Throwable)new TopicExistsException("exists"));
        KafkaFutureImpl createTopicSuccessfulFuture = new KafkaFutureImpl();
        createTopicSuccessfulFuture.complete((Object)new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        NewTopic newTopic1 = this.newTopic("test_topic", internalTopicConfig1, streamsConfig);
        NewTopic newTopic2 = this.newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic1, newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicSuccessfulFuture), Utils.mkEntry((Object)"test_topic_2", (Object)createTopicFailFuture1)})));
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_3", (Object)createTopicSuccessfulFuture)})));
        KafkaFutureImpl deleteTopicSuccessfulFuture = new KafkaFutureImpl();
        deleteTopicSuccessfulFuture.complete(null);
        EasyMock.expect((Object)admin.deleteTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic"}))).andAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(IllegalStateException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)})));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldCleanUpWhenCreateTopicsTimesOut() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        KafkaFutureImpl createTopicFailFuture1 = new KafkaFutureImpl();
        createTopicFailFuture1.completeExceptionally((Throwable)new TopicExistsException("exists"));
        KafkaFutureImpl createTopicSuccessfulFuture = new KafkaFutureImpl();
        createTopicSuccessfulFuture.complete((Object)new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        NewTopic newTopic1 = this.newTopic("test_topic", internalTopicConfig1, streamsConfig);
        NewTopic newTopic2 = this.newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic1, newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicSuccessfulFuture), Utils.mkEntry((Object)"test_topic_2", (Object)createTopicFailFuture1)})));
        KafkaFutureImpl createTopicFutureThatNeverCompletes = new KafkaFutureImpl();
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic2}))).andStubAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_2", (Object)createTopicFutureThatNeverCompletes)})));
        KafkaFutureImpl deleteTopicSuccessfulFuture = new KafkaFutureImpl();
        deleteTopicSuccessfulFuture.complete(null);
        EasyMock.expect((Object)admin.deleteTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic"}))).andAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(TimeoutException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)})));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldRetryDeleteTopicWhenTopicUnknown() {
        this.shouldRetryDeleteTopicWhenRetriableException((Exception)((Object)new UnknownTopicOrPartitionException()));
    }

    @Test
    public void shouldRetryDeleteTopicWhenLeaderNotAvailable() {
        this.shouldRetryDeleteTopicWhenRetriableException((Exception)new LeaderNotAvailableException("leader not available"));
    }

    @Test
    public void shouldRetryDeleteTopicWhenFutureTimesOut() {
        this.shouldRetryDeleteTopicWhenRetriableException((Exception)new TimeoutException("timed out"));
    }

    private void shouldRetryDeleteTopicWhenRetriableException(Exception retriableException) {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        this.setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);
        KafkaFutureImpl deleteTopicFailFuture = new KafkaFutureImpl();
        deleteTopicFailFuture.completeExceptionally((Throwable)retriableException);
        KafkaFutureImpl deleteTopicSuccessfulFuture = new KafkaFutureImpl();
        deleteTopicSuccessfulFuture.complete(null);
        EasyMock.expect((Object)admin.deleteTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic"}))).andAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicFailFuture)}))).andAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(StreamsException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)})));
        EasyMock.verify((Object[])new Object[0]);
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringCleanUp() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        this.setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);
        KafkaFutureImpl deleteTopicFutureThatNeverCompletes = new KafkaFutureImpl();
        EasyMock.expect((Object)admin.deleteTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic"}))).andStubAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicFutureThatNeverCompletes)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(TimeoutException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)})));
    }

    @Test
    public void shouldThrowWhenDeleteTopicsThrowsUnexpectedException() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, streamsConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        this.setupCleanUpScenario(admin, streamsConfig, internalTopicConfig1, internalTopicConfig2);
        KafkaFutureImpl deleteTopicFailFuture = new KafkaFutureImpl();
        deleteTopicFailFuture.completeExceptionally((Throwable)new IllegalStateException("Nobody expects the Spanish inquisition"));
        EasyMock.expect((Object)admin.deleteTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic"}))).andStubAnswer(() -> new MockDeleteTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)deleteTopicFailFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(StreamsException.class, () -> topicManager.setup(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)})));
    }

    private void setupCleanUpScenario(AdminClient admin, StreamsConfig streamsConfig, InternalTopicConfig internalTopicConfig1, InternalTopicConfig internalTopicConfig2) {
        KafkaFutureImpl createTopicFailFuture1 = new KafkaFutureImpl();
        createTopicFailFuture1.completeExceptionally((Throwable)new TopicExistsException("exists"));
        KafkaFutureImpl createTopicFailFuture2 = new KafkaFutureImpl();
        createTopicFailFuture2.completeExceptionally((Throwable)new IllegalStateException("Nobody expects the Spanish inquisition"));
        KafkaFutureImpl createTopicSuccessfulFuture = new KafkaFutureImpl();
        createTopicSuccessfulFuture.complete((Object)new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        NewTopic newTopic1 = this.newTopic("test_topic", internalTopicConfig1, streamsConfig);
        NewTopic newTopic2 = this.newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic1, newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)createTopicSuccessfulFuture), Utils.mkEntry((Object)"test_topic_2", (Object)createTopicFailFuture1)})));
        EasyMock.expect((Object)admin.createTopics((Collection)Utils.mkSet((Object[])new NewTopic[]{newTopic2}))).andAnswer(() -> new MockCreateTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_2", (Object)createTopicFailFuture2)})));
    }

    @Test
    public void shouldReturnCorrectPartitionCounts() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.singleReplica, Collections.emptyList())), null);
        Assert.assertEquals(Collections.singletonMap("test_topic", 1), (Object)this.internalTopicManager.getNumPartitions(Collections.singleton("test_topic"), Collections.emptySet()));
    }

    @Test
    public void shouldCreateRequiredTopics() throws Exception {
        RepartitionTopicConfig topicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        topicConfig.setNumberOfPartitions(1);
        UnwindowedChangelogTopicConfig topicConfig2 = new UnwindowedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        topicConfig2.setNumberOfPartitions(1);
        WindowedChangelogTopicConfig topicConfig3 = new WindowedChangelogTopicConfig("test_topic_3", Collections.emptyMap());
        topicConfig3.setNumberOfPartitions(1);
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", topicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_2", topicConfig2));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_3", topicConfig3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"test_topic", "test_topic_2", "test_topic_3"}), (Object)this.mockAdminClient.listTopics().names().get());
        Assert.assertEquals((Object)new TopicDescription("test_topic", false, (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), (Object)((KafkaFuture)this.mockAdminClient.describeTopics(Collections.singleton("test_topic")).topicNameValues().get("test_topic")).get());
        Assert.assertEquals((Object)new TopicDescription("test_topic_2", false, (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), (Object)((KafkaFuture)this.mockAdminClient.describeTopics(Collections.singleton("test_topic_2")).topicNameValues().get("test_topic_2")).get());
        Assert.assertEquals((Object)new TopicDescription("test_topic_3", false, (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }), (Object)((KafkaFuture)this.mockAdminClient.describeTopics(Collections.singleton("test_topic_3")).topicNameValues().get("test_topic_3")).get());
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource resource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        ConfigResource resource3 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_3");
        Assert.assertEquals((Object)new ConfigEntry("cleanup.policy", "delete"), (Object)((Config)((KafkaFuture)this.mockAdminClient.describeConfigs(Collections.singleton(resource)).values().get(resource)).get()).get("cleanup.policy"));
        Assert.assertEquals((Object)new ConfigEntry("cleanup.policy", "compact"), (Object)((Config)((KafkaFuture)this.mockAdminClient.describeConfigs(Collections.singleton(resource2)).values().get(resource2)).get()).get("cleanup.policy"));
        Assert.assertEquals((Object)new ConfigEntry("cleanup.policy", "compact,delete"), (Object)((Config)((KafkaFuture)this.mockAdminClient.describeConfigs(Collections.singleton(resource3)).values().get(resource3)).get()).get("cleanup.policy"));
    }

    @Test
    public void shouldCompleteTopicValidationOnRetry() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl();
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionSuccessFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(partitionInfo), Collections.emptySet()));
        topicDescriptionFailFuture.completeExceptionally((Throwable)new UnknownTopicOrPartitionException("KABOOM!"));
        KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl();
        topicCreationFuture.completeExceptionally((Throwable)new TopicExistsException("KABOOM!"));
        EasyMock.expect((Object)admin.describeTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic", "test_topic_2"}))).andReturn((Object)new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessFuture), Utils.mkEntry((Object)"test_topic_2", (Object)topicDescriptionFailFuture)}))).once();
        EasyMock.expect((Object)admin.createTopics(Collections.singleton(new NewTopic("test_topic_2", Optional.of(1), Optional.of((short)1)).configs(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"compact"), Utils.mkEntry((Object)"message.timestamp.type", (Object)"CreateTime")}))))).andReturn((Object)new MockCreateTopicsResult(Collections.singletonMap("test_topic_2", topicCreationFuture))).once();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic_2"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic_2", topicDescriptionSuccessFuture)));
        EasyMock.replay((Object[])new Object[]{admin});
        UnwindowedChangelogTopicConfig topicConfig = new UnwindowedChangelogTopicConfig("test_topic", Collections.emptyMap());
        topicConfig.setNumberOfPartitions(1);
        UnwindowedChangelogTopicConfig topic2Config = new UnwindowedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        topic2Config.setNumberOfPartitions(1);
        topicManager.makeReady(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicConfig), Utils.mkEntry((Object)"test_topic_2", (Object)topic2Config)}));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
        this.mockAdminClient.addTopic(false, "test_topic", (List)new ArrayList<TopicPartitionInfo>(){
            {
                this.add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
                this.add(new TopicPartitionInfo(1, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }, null);
        try {
            RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
            internalTopicConfig.setNumberOfPartitions(1);
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
            Assert.fail((String)"Should have thrown StreamsException");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), null);
        InternalTopicManager internalTopicManager2 = new InternalTopicManager(Time.SYSTEM, (Admin)this.mockAdminClient, new StreamsConfig(this.config));
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        internalTopicManager2.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    @Test
    public void shouldNotThrowExceptionForEmptyTopicMap() {
        this.internalTopicManager.makeReady(Collections.emptyMap());
    }

    @Test
    public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
        this.mockAdminClient.timeoutNextRequest(1);
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        try {
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
            Assert.fail((String)"Should have thrown StreamsException.");
        }
        catch (StreamsException expected) {
            Assert.assertEquals(TimeoutException.class, expected.getCause().getClass());
        }
    }

    @Test
    public void shouldLogWhenTopicNotFoundAndNotThrowException() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), null);
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        RepartitionTopicConfig internalTopicConfigII = new RepartitionTopicConfig("internal-topic", Collections.emptyMap());
        internalTopicConfigII.setNumberOfPartitions(1);
        HashMap<String, RepartitionTopicConfig> topicConfigMap = new HashMap<String, RepartitionTopicConfig>();
        topicConfigMap.put("test_topic", internalTopicConfig);
        topicConfigMap.put("internal-topic", internalTopicConfigII);
        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class);){
            this.internalTopicManager.makeReady(topicConfigMap);
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)Matchers.hasItem((Object)("stream-thread [" + this.threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\nError message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")));
        }
    }

    @Test
    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl();
        topicDescriptionLeaderNotAvailableFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl();
        topicDescriptionUnknownTopicFuture.completeExceptionally((Throwable)new UnknownTopicOrPartitionException("Unknown Topic!"));
        KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl();
        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionLeaderNotAvailableFuture))).once();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionUnknownTopicFuture))).once();
        EasyMock.expect((Object)admin.createTopics(Collections.singleton(new NewTopic("test_topic", Optional.of(1), Optional.of((short)1)).configs(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"delete"), Utils.mkEntry((Object)"message.timestamp.type", (Object)"CreateTime"), Utils.mkEntry((Object)"segment.bytes", (Object)"52428800"), Utils.mkEntry((Object)"retention.ms", (Object)"-1")}))))).andReturn((Object)new MockCreateTopicsResult(Collections.singletonMap("test_topic", topicCreationFuture))).once();
        EasyMock.replay((Object[])new Object[]{admin});
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        topicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl();
        topicDescriptionSuccessFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(partitionInfo), Collections.emptySet()));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionFailFuture))).once();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionSuccessFuture))).once();
        EasyMock.replay((Object[])new Object[]{admin});
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        topicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Collections.singletonMap("test_topic", topicDescriptionFailFuture))).anyTimes();
        EasyMock.replay((Object[])new Object[]{admin});
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        TimeoutException exception = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.lambda$shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable$33(topicManager, (InternalTopicConfig)internalTopicConfig));
        Assert.assertNull((Object)exception.getCause());
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), null);
        this.mockAdminClient.markTopicForDeletion("test_topic");
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(1);
        TimeoutException exception = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.lambda$shouldExhaustRetriesOnMarkedForDeletionTopic$34((InternalTopicConfig)internalTopicConfig));
        Assert.assertNull((Object)exception.getCause());
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
    }

    @Test
    public void shouldValidateSuccessfully() {
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        this.setupTopicInMockAdminClient("test_topic_2", this.repartitionTopicConfig());
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)}));
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldValidateSuccessfullyWithEmptyInternalTopics() {
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Collections.emptyMap());
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldReportMissingTopics() {
        String missingTopic1 = "missingTopic1";
        String missingTopic2 = "missingTopic2";
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("missingTopic1", 1);
        InternalTopicConfig internalTopicConfig3 = this.setupRepartitionTopicConfig("missingTopic2", 1);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"missingTopic1", (Object)internalTopicConfig2), Utils.mkEntry((Object)"missingTopic2", (Object)internalTopicConfig3)}));
        Set missingTopics = validationResult.missingTopics();
        MatcherAssert.assertThat((Object)missingTopics.size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)missingTopics, (Matcher)Matchers.hasItem((Object)"missingTopic1"));
        MatcherAssert.assertThat((Object)missingTopics, (Matcher)Matchers.hasItem((Object)"missingTopic2"));
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldReportMisconfigurationsOfPartitionCount() {
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        this.setupTopicInMockAdminClient("test_topic_2", this.repartitionTopicConfig());
        this.setupTopicInMockAdminClient("test_topic_3", this.repartitionTopicConfig());
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 2);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 3);
        InternalTopicConfig internalTopicConfig3 = this.setupRepartitionTopicConfig("test_topic_3", 1);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2), Utils.mkEntry((Object)"test_topic_3", (Object)internalTopicConfig3)}));
        Map misconfigurationsForTopics = validationResult.misconfigurationsForTopics();
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)misconfigurationsForTopics.size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic")).get(0), (Matcher)Matchers.is((Object)"Internal topic test_topic requires 2 partitions, but the existing topic on the broker has 1 partitions."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_2"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_2")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_2")).get(0), (Matcher)Matchers.is((Object)"Internal topic test_topic_2 requires 3 partitions, but the existing topic on the broker has 1 partitions."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.not((Matcher)Matchers.hasKey((Object)"test_topic_3")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForUnwindowedChangelogTopics() {
        Map<String, String> unwindowedChangelogConfigWithDeleteCleanupPolicy = this.unwindowedChangelogConfig();
        unwindowedChangelogConfigWithDeleteCleanupPolicy.put("cleanup.policy", "delete");
        this.setupTopicInMockAdminClient("test_topic", unwindowedChangelogConfigWithDeleteCleanupPolicy);
        Map<String, String> unwindowedChangelogConfigWithDeleteCompactCleanupPolicy = this.unwindowedChangelogConfig();
        unwindowedChangelogConfigWithDeleteCompactCleanupPolicy.put("cleanup.policy", "compact,delete");
        this.setupTopicInMockAdminClient("test_topic_2", unwindowedChangelogConfigWithDeleteCompactCleanupPolicy);
        this.setupTopicInMockAdminClient("test_topic_3", this.unwindowedChangelogConfig());
        InternalTopicConfig internalTopicConfig1 = this.setupUnwindowedChangelogTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupUnwindowedChangelogTopicConfig("test_topic_2", 1);
        InternalTopicConfig internalTopicConfig3 = this.setupUnwindowedChangelogTopicConfig("test_topic_3", 1);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2), Utils.mkEntry((Object)"test_topic_3", (Object)internalTopicConfig3)}));
        Map misconfigurationsForTopics = validationResult.misconfigurationsForTopics();
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)misconfigurationsForTopics.size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic")).get(0), (Matcher)Matchers.is((Object)"Cleanup policy (cleanup.policy) of existing internal topic test_topic should not contain \"delete\"."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_2"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_2")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_2")).get(0), (Matcher)Matchers.is((Object)"Cleanup policy (cleanup.policy) of existing internal topic test_topic_2 should not contain \"delete\"."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.not((Matcher)Matchers.hasKey((Object)"test_topic_3")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopics() {
        long retentionMs = 1000L;
        long shorterRetentionMs = 900L;
        this.setupTopicInMockAdminClient("test_topic", this.windowedChangelogConfig(1000L));
        this.setupTopicInMockAdminClient("test_topic_2", this.windowedChangelogConfig(900L));
        Map<String, String> windowedChangelogConfigOnlyCleanupPolicyCompact = this.windowedChangelogConfig(1000L);
        windowedChangelogConfigOnlyCleanupPolicyCompact.put("cleanup.policy", "compact");
        this.setupTopicInMockAdminClient("test_topic_3", windowedChangelogConfigOnlyCleanupPolicyCompact);
        Map<String, String> windowedChangelogConfigOnlyCleanupPolicyDelete = this.windowedChangelogConfig(900L);
        windowedChangelogConfigOnlyCleanupPolicyDelete.put("cleanup.policy", "delete");
        this.setupTopicInMockAdminClient("test_topic_4", windowedChangelogConfigOnlyCleanupPolicyDelete);
        Map<String, String> windowedChangelogConfigWithRetentionBytes = this.windowedChangelogConfig(1000L);
        windowedChangelogConfigWithRetentionBytes.put("retention.bytes", "1024");
        this.setupTopicInMockAdminClient("test_topic_5", windowedChangelogConfigWithRetentionBytes);
        InternalTopicConfig internalTopicConfig1 = this.setupWindowedChangelogTopicConfig("test_topic", 1, 1000L);
        InternalTopicConfig internalTopicConfig2 = this.setupWindowedChangelogTopicConfig("test_topic_2", 1, 1000L);
        InternalTopicConfig internalTopicConfig3 = this.setupWindowedChangelogTopicConfig("test_topic_3", 1, 1000L);
        InternalTopicConfig internalTopicConfig4 = this.setupWindowedChangelogTopicConfig("test_topic_4", 1, 1000L);
        InternalTopicConfig internalTopicConfig5 = this.setupWindowedChangelogTopicConfig("test_topic_5", 1, 1000L);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2), Utils.mkEntry((Object)"test_topic_3", (Object)internalTopicConfig3), Utils.mkEntry((Object)"test_topic_4", (Object)internalTopicConfig4), Utils.mkEntry((Object)"test_topic_5", (Object)internalTopicConfig5)}));
        Map misconfigurationsForTopics = validationResult.misconfigurationsForTopics();
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)misconfigurationsForTopics.size(), (Matcher)Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_2"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_2")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_2")).get(0), (Matcher)Matchers.is((Object)"Retention time (retention.ms) of existing internal topic test_topic_2 is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_4"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_4")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_4")).get(0), (Matcher)Matchers.is((Object)"Retention time (retention.ms) of existing internal topic test_topic_4 is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_5"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_5")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_5")).get(0), (Matcher)Matchers.is((Object)"Retention byte (retention.bytes) of existing internal topic test_topic_5 is set but it should be unset."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.not((Matcher)Matchers.hasKey((Object)"test_topic")));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.not((Matcher)Matchers.hasKey((Object)"test_topic_3")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() {
        long retentionMs = 1000L;
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        Map<String, String> repartitionTopicConfigCleanupPolicyCompact = this.repartitionTopicConfig();
        repartitionTopicConfigCleanupPolicyCompact.put("cleanup.policy", "compact");
        this.setupTopicInMockAdminClient("test_topic_2", repartitionTopicConfigCleanupPolicyCompact);
        Map<String, String> repartitionTopicConfigCleanupPolicyCompactAndDelete = this.repartitionTopicConfig();
        repartitionTopicConfigCleanupPolicyCompactAndDelete.put("cleanup.policy", "compact,delete");
        this.setupTopicInMockAdminClient("test_topic_3", repartitionTopicConfigCleanupPolicyCompactAndDelete);
        Map<String, String> repartitionTopicConfigWithFiniteRetentionMs = this.repartitionTopicConfig();
        repartitionTopicConfigWithFiniteRetentionMs.put("retention.ms", String.valueOf(1000L));
        this.setupTopicInMockAdminClient("test_topic_4", repartitionTopicConfigWithFiniteRetentionMs);
        Map<String, String> repartitionTopicConfigWithRetentionBytesSet = this.repartitionTopicConfig();
        repartitionTopicConfigWithRetentionBytesSet.put("retention.bytes", "1024");
        this.setupTopicInMockAdminClient("test_topic_5", repartitionTopicConfigWithRetentionBytesSet);
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        InternalTopicConfig internalTopicConfig3 = this.setupRepartitionTopicConfig("test_topic_3", 1);
        InternalTopicConfig internalTopicConfig4 = this.setupRepartitionTopicConfig("test_topic_4", 1);
        InternalTopicConfig internalTopicConfig5 = this.setupRepartitionTopicConfig("test_topic_5", 1);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2), Utils.mkEntry((Object)"test_topic_3", (Object)internalTopicConfig3), Utils.mkEntry((Object)"test_topic_4", (Object)internalTopicConfig4), Utils.mkEntry((Object)"test_topic_5", (Object)internalTopicConfig5)}));
        Map misconfigurationsForTopics = validationResult.misconfigurationsForTopics();
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)misconfigurationsForTopics.size(), (Matcher)Matchers.is((Object)4));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_2"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_2")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_2")).get(0), (Matcher)Matchers.is((Object)"Cleanup policy (cleanup.policy) of existing internal topic test_topic_2 should not contain \"compact\"."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_3"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_3")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_3")).get(0), (Matcher)Matchers.is((Object)"Cleanup policy (cleanup.policy) of existing internal topic test_topic_3 should not contain \"compact\"."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_4"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_4")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_4")).get(0), (Matcher)Matchers.is((Object)"Retention time (retention.ms) of existing internal topic test_topic_4 is 1000 but should be -1."));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic_5"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic_5")).size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic_5")).get(0), (Matcher)Matchers.is((Object)"Retention byte (retention.bytes) of existing internal topic test_topic_5 is set but it should be unset."));
    }

    @Test
    public void shouldReportMultipleMisconfigurationsForSameTopic() {
        long retentionMs = 1000L;
        long shorterRetentionMs = 900L;
        Map<String, String> windowedChangelogConfig = this.windowedChangelogConfig(900L);
        windowedChangelogConfig.put("retention.bytes", "1024");
        this.setupTopicInMockAdminClient("test_topic", windowedChangelogConfig);
        InternalTopicConfig internalTopicConfig1 = this.setupWindowedChangelogTopicConfig("test_topic", 1, 1000L);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1)}));
        Map misconfigurationsForTopics = validationResult.misconfigurationsForTopics();
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)misconfigurationsForTopics.size(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)misconfigurationsForTopics, (Matcher)Matchers.hasKey((Object)"test_topic"));
        MatcherAssert.assertThat((Object)((List)misconfigurationsForTopics.get("test_topic")).size(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic")).get(0), (Matcher)Matchers.is((Object)"Retention time (retention.ms) of existing internal topic test_topic is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat(((List)misconfigurationsForTopics.get("test_topic")).get(1), (Matcher)Matchers.is((Object)"Retention byte (retention.bytes) of existing internal topic test_topic is set but it should be unset."));
    }

    @Test
    public void shouldThrowWhenPartitionCountUnknown() {
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        Assert.assertThrows(IllegalStateException.class, () -> this.lambda$shouldThrowWhenPartitionCountUnknown$35((InternalTopicConfig)internalTopicConfig));
    }

    @Test
    public void shouldNotThrowExceptionIfTopicExistsWithDifferentReplication() {
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        InternalTopicManager internalTopicManager2 = new InternalTopicManager(Time.SYSTEM, (Admin)this.mockAdminClient, new StreamsConfig(this.config));
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicManager.ValidationResult validationResult = internalTopicManager2.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldRetryWhenCallsThrowTimeoutExceptionDuringValidation() {
        this.setupTopicInMockAdminClient("test_topic", this.repartitionTopicConfig());
        this.mockAdminClient.timeoutNextRequest(2);
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicManager.ValidationResult validationResult = this.internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
    }

    @Test
    public void shouldOnlyRetryDescribeTopicsWhenDescribeTopicsThrowsLeaderNotAvailableExceptionDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicDescriptionSuccessfulFuture = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionFailFuture)}))).andReturn((Object)new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessfulFuture)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config((Collection)this.repartitionTopicConfig().entrySet().stream().map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toSet())));
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource))).andReturn((Object)new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicManager.ValidationResult validationResult = topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldOnlyRetryDescribeConfigsWhenDescribeConfigsThrowsLeaderNotAvailableExceptionDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionSuccessfulFuture = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andReturn((Object)new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessfulFuture)})));
        KafkaFutureImpl topicConfigsFailFuture = new KafkaFutureImpl();
        topicConfigsFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config((Collection)this.repartitionTopicConfig().entrySet().stream().map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toSet())));
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource))).andReturn((Object)new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)topicConfigsFailFuture)}))).andReturn((Object)new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicManager.ValidationResult validationResult = topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldOnlyRetryNotSuccessfulFuturesDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl topicDescriptionSuccessfulFuture1 = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture1.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        KafkaFutureImpl topicDescriptionSuccessfulFuture2 = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture2.complete((Object)new TopicDescription("test_topic_2", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        EasyMock.expect((Object)admin.describeTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic", "test_topic_2"}))).andAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessfulFuture1), Utils.mkEntry((Object)"test_topic_2", (Object)topicDescriptionFailFuture)})));
        EasyMock.expect((Object)admin.describeTopics((Collection)Utils.mkSet((Object[])new String[]{"test_topic_2"}))).andAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_2", (Object)topicDescriptionSuccessfulFuture2)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config((Collection)this.repartitionTopicConfig().entrySet().stream().map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toSet())));
        ConfigResource topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        EasyMock.expect((Object)admin.describeConfigs((Collection)Utils.mkSet((Object[])new ConfigResource[]{topicResource1, topicResource2}))).andAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource1, (Object)topicConfigSuccessfulFuture), Utils.mkEntry((Object)topicResource2, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig1 = this.setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = this.setupRepartitionTopicConfig("test_topic_2", 1);
        InternalTopicManager.ValidationResult validationResult = topicManager.validate(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)internalTopicConfig1), Utils.mkEntry((Object)"test_topic_2", (Object)internalTopicConfig2)}));
        MatcherAssert.assertThat((Object)validationResult.missingTopics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)validationResult.misconfigurationsForTopics(), (Matcher)Matchers.anEmptyMap());
        EasyMock.verify((Object[])new Object[]{admin});
    }

    @Test
    public void shouldThrowWhenDescribeTopicsThrowsUnexpectedExceptionDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new IllegalStateException("Nobody expects the Spanish inquisition"));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andStubAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionFailFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        Assert.assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowWhenDescribeConfigsThrowsUnexpectedExceptionDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl configDescriptionFailFuture = new KafkaFutureImpl();
        configDescriptionFailFuture.completeExceptionally((Throwable)new IllegalStateException("Nobody expects the Spanish inquisition"));
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource))).andStubAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)configDescriptionFailFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        Assert.assertThrows(Throwable.class, () -> topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowWhenTopicDescriptionsDoNotContainTopicDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionSuccessfulFuture = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andStubAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic_2", (Object)topicDescriptionSuccessfulFuture)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config(Collections.emptySet()));
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource))).andStubAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        Assert.assertThrows(IllegalStateException.class, () -> topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainTopicDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionSuccessfulFuture = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andStubAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessfulFuture)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config(Collections.emptySet()));
        ConfigResource topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource topicResource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource1))).andStubAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource2, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        Assert.assertThrows(IllegalStateException.class, () -> topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotCleanupPolicyForUnwindowedConfigDuringValidation() {
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupUnwindowedChangelogTopicConfig("test_topic", 1), this.configWithoutKey(this.unwindowedChangelogConfig(), "cleanup.policy"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForWindowedConfigDuringValidation() {
        long retentionMs = 1000L;
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupWindowedChangelogTopicConfig("test_topic", 1, 1000L), this.configWithoutKey(this.windowedChangelogConfig(1000L), "cleanup.policy"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForWindowedConfigDuringValidation() {
        long retentionMs = 1000L;
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupWindowedChangelogTopicConfig("test_topic", 1, 1000L), this.configWithoutKey(this.windowedChangelogConfig(1000L), "retention.ms"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForWindowedConfigDuringValidation() {
        long retentionMs = 1000L;
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupWindowedChangelogTopicConfig("test_topic", 1, 1000L), this.configWithoutKey(this.windowedChangelogConfig(1000L), "retention.bytes"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForRepartitionConfigDuringValidation() {
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupRepartitionTopicConfig("test_topic", 1), this.configWithoutKey(this.repartitionTopicConfig(), "cleanup.policy"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForRepartitionConfigDuringValidation() {
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupRepartitionTopicConfig("test_topic", 1), this.configWithoutKey(this.repartitionTopicConfig(), "retention.ms"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForRepartitionConfigDuringValidation() {
        this.shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(this.setupRepartitionTopicConfig("test_topic", 1), this.configWithoutKey(this.repartitionTopicConfig(), "retention.bytes"));
    }

    private Config configWithoutKey(Map<String, String> config, String key) {
        return new Config((Collection)config.entrySet().stream().filter(entry -> !((String)entry.getKey()).equals(key)).map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toSet()));
    }

    private void shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(InternalTopicConfig streamsSideTopicConfig, Config brokerSideTopicConfig) {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        InternalTopicManager topicManager = new InternalTopicManager(Time.SYSTEM, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionSuccessfulFuture = new KafkaFutureImpl();
        topicDescriptionSuccessfulFuture.complete((Object)new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andStubAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionSuccessfulFuture)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)brokerSideTopicConfig);
        ConfigResource topicResource1 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource1))).andStubAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource1, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        Assert.assertThrows(IllegalStateException.class, () -> topicManager.validate(Collections.singletonMap("test_topic", streamsSideTopicConfig)));
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenTimeoutIsExceededDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl();
        topicDescriptionFailFuture.completeExceptionally((Throwable)new TimeoutException());
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andStubAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionFailFuture)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config((Collection)this.repartitionTopicConfig().entrySet().stream().map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toSet())));
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource))).andStubAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        Assert.assertThrows(TimeoutException.class, () -> topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringValidation() {
        AdminClient admin = (AdminClient)EasyMock.createNiceMock(AdminClient.class);
        MockTime time = new MockTime((long)((Integer)this.config.get(StreamsConfig.consumerPrefix((String)"max.poll.interval.ms")) / 3));
        InternalTopicManager topicManager = new InternalTopicManager((Time)time, (Admin)admin, new StreamsConfig(this.config));
        KafkaFutureImpl topicDescriptionFutureThatNeverCompletes = new KafkaFutureImpl();
        EasyMock.expect((Object)admin.describeTopics(Collections.singleton("test_topic"))).andStubAnswer(() -> new MockDescribeTopicsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"test_topic", (Object)topicDescriptionFutureThatNeverCompletes)})));
        KafkaFutureImpl topicConfigSuccessfulFuture = new KafkaFutureImpl();
        topicConfigSuccessfulFuture.complete((Object)new Config((Collection)this.repartitionTopicConfig().entrySet().stream().map(entry -> new ConfigEntry((String)entry.getKey(), (String)entry.getValue())).collect(Collectors.toSet())));
        ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        EasyMock.expect((Object)admin.describeConfigs(Collections.singleton(topicResource))).andStubAnswer(() -> new MockDescribeConfigsResult(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicResource, (Object)topicConfigSuccessfulFuture)})));
        EasyMock.replay((Object[])new Object[]{admin});
        InternalTopicConfig internalTopicConfig = this.setupRepartitionTopicConfig("test_topic", 1);
        Assert.assertThrows(TimeoutException.class, () -> topicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig)));
    }

    private NewTopic newTopic(String topicName, InternalTopicConfig topicConfig, StreamsConfig streamsConfig) {
        return new NewTopic(topicName, topicConfig.numberOfPartitions(), Optional.of(streamsConfig.getInt("replication.factor").shortValue())).configs(topicConfig.getProperties(Collections.emptyMap(), streamsConfig.getLong("windowstore.changelog.additional.retention.ms").longValue()));
    }

    private Map<String, String> repartitionTopicConfig() {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"delete"), Utils.mkEntry((Object)"retention.ms", (Object)"-1"), Utils.mkEntry((Object)"retention.bytes", null)});
    }

    private Map<String, String> unwindowedChangelogConfig() {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"compact")});
    }

    private Map<String, String> windowedChangelogConfig(long retentionMs) {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"cleanup.policy", (Object)"compact,delete"), Utils.mkEntry((Object)"retention.ms", (Object)String.valueOf(retentionMs)), Utils.mkEntry((Object)"retention.bytes", null)});
    }

    private void setupTopicInMockAdminClient(String topic, Map<String, String> topicConfig) {
        this.mockAdminClient.addTopic(false, topic, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), topicConfig);
    }

    private InternalTopicConfig setupUnwindowedChangelogTopicConfig(String topicName, int partitionCount) {
        UnwindowedChangelogTopicConfig internalTopicConfig = new UnwindowedChangelogTopicConfig(topicName, Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(partitionCount);
        return internalTopicConfig;
    }

    private InternalTopicConfig setupWindowedChangelogTopicConfig(String topicName, int partitionCount, long retentionMs) {
        WindowedChangelogTopicConfig internalTopicConfig = new WindowedChangelogTopicConfig(topicName, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"retention.ms", (Object)String.valueOf(retentionMs))}));
        internalTopicConfig.setNumberOfPartitions(partitionCount);
        return internalTopicConfig;
    }

    private InternalTopicConfig setupRepartitionTopicConfig(String topicName, int partitionCount) {
        RepartitionTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicName, Collections.emptyMap());
        internalTopicConfig.setNumberOfPartitions(partitionCount);
        return internalTopicConfig;
    }

    private /* synthetic */ void lambda$shouldThrowWhenPartitionCountUnknown$35(InternalTopicConfig internalTopicConfig) throws Throwable {
        this.internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    private /* synthetic */ void lambda$shouldExhaustRetriesOnMarkedForDeletionTopic$34(InternalTopicConfig internalTopicConfig) throws Throwable {
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    private /* synthetic */ void lambda$shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable$33(InternalTopicManager topicManager, InternalTopicConfig internalTopicConfig) throws Throwable {
        topicManager.makeReady(Collections.singletonMap("test_topic", internalTopicConfig));
    }

    private /* synthetic */ void lambda$shouldThrowInformativeExceptionForOlderBrokers$4(InternalTopicManager topicManager, InternalTopicConfig topicConfig) throws Throwable {
        topicManager.makeReady(Collections.singletonMap("test_topic", topicConfig));
    }

    private static class MockDescribeConfigsResult
    extends DescribeConfigsResult {
        MockDescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
            super(futures);
        }
    }

    private static class MockDescribeTopicsResult
    extends DescribeTopicsResult {
        MockDescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) {
            super(null, futures);
        }
    }

    private static class MockDeleteTopicsResult
    extends DeleteTopicsResult {
        MockDeleteTopicsResult(Map<String, KafkaFuture<Void>> futures) {
            super(null, futures);
        }
    }

    private static class MockCreateTopicsResult
    extends CreateTopicsResult {
        MockCreateTopicsResult(Map<String, KafkaFuture<CreateTopicsResult.TopicMetadataAndConfig>> futures) {
            super(futures);
        }
    }
}

