/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;
    private static final int NUM_THREADS = 2;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final StreamsBuilder builder = new StreamsBuilder();
    private KafkaStreams globalStreams;
    private Properties props;
    @Rule
    public TestName testName = new TestName();

    @Before
    public void before() {
        this.props = new Properties();
        this.props.put("application.id", "appId");
        this.props.put("client.id", "clientId");
        this.props.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", (Object)2);
        this.props.put("internal.leave.group.on.close", (Object)true);
        this.globalStreams = new KafkaStreams(this.builder.build(), this.props);
    }

    @After
    public void cleanup() {
        if (this.globalStreams != null) {
            this.globalStreams.close();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        this.props.put("send.buffer.bytes", (Object)-1);
        this.props.put("receive.buffer.bytes", (Object)-1);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        streams.close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketSendBufferSize() {
        this.props.put("send.buffer.bytes", (Object)-2);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        streams.close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() {
        this.props.put("receive.buffer.bytes", (Object)-2);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        streams.close();
    }

    @Test
    public void stateShouldTransitToNotRunningIfCloseRightAfterCreated() {
        this.globalStreams.close();
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)this.globalStreams.state());
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
        StateListenerStub stateListener = new StateListenerStub();
        this.globalStreams.setStateListener((KafkaStreams.StateListener)stateListener);
        Assert.assertEquals((long)0L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.CREATED, (Object)this.globalStreams.state());
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> stateListener.numChanges == 2, (String)"Streams never started.");
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)this.globalStreams.state());
        for (StreamThread thread : this.globalStreams.threads) {
            thread.stateListener().onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
        }
        Assert.assertEquals((long)3L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)this.globalStreams.state());
        for (StreamThread thread : this.globalStreams.threads) {
            thread.stateListener().onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
        }
        Assert.assertEquals((long)3L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)this.globalStreams.state());
        this.globalStreams.threads[1].stateListener().onChange((Thread)this.globalStreams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
        this.globalStreams.threads[1].stateListener().onChange((Thread)this.globalStreams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals((long)3L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)this.globalStreams.state());
        for (StreamThread thread : this.globalStreams.threads) {
            if (thread == this.globalStreams.threads[1]) continue;
            thread.stateListener().onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.RUNNING, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
        }
        Assert.assertEquals((long)4L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)this.globalStreams.state());
        this.globalStreams.close();
        TestUtils.waitForCondition(() -> stateListener.numChanges == 6, (String)"Streams never closed.");
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)this.globalStreams.state());
    }

    @Test
    public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
        StateListenerStub stateListener = new StateListenerStub();
        this.globalStreams.setStateListener((KafkaStreams.StateListener)stateListener);
        Assert.assertEquals((long)0L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.CREATED, (Object)this.globalStreams.state());
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> stateListener.numChanges == 2, (String)"Streams never started.");
        Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)this.globalStreams.state());
        for (StreamThread thread : this.globalStreams.threads) {
            thread.stateListener().onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
        }
        Assert.assertEquals((long)3L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)this.globalStreams.state());
        this.globalStreams.threads[1].stateListener().onChange((Thread)this.globalStreams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
        this.globalStreams.threads[1].stateListener().onChange((Thread)this.globalStreams.threads[1], (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
        Assert.assertEquals((long)3L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)this.globalStreams.state());
        for (StreamThread thread : this.globalStreams.threads) {
            if (thread == this.globalStreams.threads[1]) continue;
            thread.stateListener().onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
            thread.stateListener().onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
        }
        Assert.assertEquals((long)4L, (long)stateListener.numChanges);
        Assert.assertEquals((Object)KafkaStreams.State.ERROR, (Object)this.globalStreams.state());
        this.globalStreams.close();
        TestUtils.waitForCondition(() -> stateListener.numChanges == 6, (String)"Streams never closed.");
        Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)this.globalStreams.state());
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        this.builder.globalTable("anyTopic");
        List<Node> nodes = Collections.singletonList(new Node(0, "localhost", 8121));
        Cluster cluster = new Cluster("mockClusterId", nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
        MockClientSupplier clientSupplier = new MockClientSupplier();
        clientSupplier.setClusterForAdminClient(cluster);
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props, (KafkaClientSupplier)clientSupplier);
        streams.close();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
        Assert.assertTrue((boolean)clientSupplier.consumer.closed());
        Assert.assertTrue((boolean)clientSupplier.restoreConsumer.closed());
        for (MockProducer p : clientSupplier.producers) {
            Assert.assertTrue((boolean)p.closed());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateThreadClose() throws Exception {
        this.builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            Field threadsField = streams.getClass().getDeclaredField("threads");
            threadsField.setAccessible(true);
            StreamThread[] threads = (StreamThread[])threadsField.get(streams);
            Assert.assertEquals((long)2L, (long)threads.length);
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            for (int i = 0; i < 2; ++i) {
                StreamThread tmpThread = threads[i];
                tmpThread.shutdown();
                TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, (String)"Thread never stopped.");
                threads[i].join();
            }
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.ERROR, (String)"Streams never stopped.");
        }
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
        Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
        globalThreadField.setAccessible(true);
        GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
        Assert.assertNull((Object)globalStreamThread);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStateGlobalThreadClose() throws Exception {
        this.builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
            globalThreadField.setAccessible(true);
            GlobalStreamThread globalStreamThread = (GlobalStreamThread)globalThreadField.get(streams);
            globalStreamThread.shutdown();
            TestUtils.waitForCondition(() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, (String)"Thread never stopped.");
            globalStreamThread.join();
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.ERROR);
        }
        Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
    }

    @Test
    public void globalThreadShouldTimeoutWhenBrokerConnectionCannotBeEstablished() {
        Properties props = new Properties();
        props.put("application.id", "appId");
        props.put("bootstrap.servers", "localhost:1");
        props.put("metric.reporters", MockMetricsReporter.class.getName());
        props.put("state.dir", TestUtils.tempDirectory().getPath());
        props.put("num.stream.threads", (Object)2);
        props.put("default.api.timeout.ms", (Object)200);
        this.builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), props);){
            streams.start();
            Assert.fail((String)"expected start() to time out and throw an exception.");
        }
        catch (StreamsException streamsException) {
            // empty catch block
        }
    }

    @Test
    public void testLocalThreadCloseWithoutConnectingToBroker() {
        Properties props = new Properties();
        props.setProperty("application.id", "appId");
        props.setProperty("bootstrap.servers", "localhost:1");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        props.put("num.stream.threads", (Object)2);
        this.builder.table("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), props);){
            streams.start();
        }
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            int newInitCount = MockMetricsReporter.INIT_COUNT.get();
            int initDiff = newInitCount - oldInitCount;
            Assert.assertTrue((String)"some reporters should be initialized by calling on construction", (initDiff > 0 ? 1 : 0) != 0);
            streams.start();
            int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
            streams.close();
            Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        this.globalStreams.close();
        int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
        this.globalStreams.close();
        Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test
    public void testCannotStartOnceClosed() {
        this.globalStreams.start();
        this.globalStreams.close();
        try {
            this.globalStreams.start();
            Assert.fail((String)"Should have throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void testCannotStartTwice() {
        this.globalStreams.start();
        try {
            this.globalStreams.start();
            Assert.fail((String)"Should throw an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        this.globalStreams.start();
        try {
            this.globalStreams.setGlobalStateRestoreListener((StateRestoreListener)new MockStateRestoreListener());
            Assert.fail((String)"Should throw an IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
        }
        finally {
            this.globalStreams.close();
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        this.globalStreams.start();
        try {
            this.globalStreams.setUncaughtExceptionHandler(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        this.globalStreams.start();
        try {
            this.globalStreams.setStateListener(null);
            Assert.fail((String)"Should throw IllegalStateException");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testIllegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", "illegalConfig");
        try {
            new KafkaStreams(this.builder.build(), this.props);
            Assert.fail((String)"Should have throw ConfigException");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void testLegalMetricsConfig() {
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        new KafkaStreams(this.builder.build(), this.props).close();
        this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        new KafkaStreams(this.builder.build(), this.props).close();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() {
        this.globalStreams.allMetadata();
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
        this.globalStreams.allMetadataForStore("store");
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
        this.globalStreams.metadataForKey("store", (Object)"key", Serdes.String().serializer());
    }

    @Test(expected=IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
        this.globalStreams.metadataForKey("store", (Object)"key", (topic, key, value, numPartitions) -> 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        AtomicBoolean keepRunning = new AtomicBoolean(true);
        KafkaStreams streams = null;
        try {
            StreamsBuilder builder = new StreamsBuilder();
            CountDownLatch latch = new CountDownLatch(1);
            String topic = "input";
            CLUSTER.createTopics("input");
            builder.stream("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).foreach((key, value) -> {
                try {
                    latch.countDown();
                    while (keepRunning.get()) {
                        Thread.sleep(10L);
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            });
            streams = new KafkaStreams(builder.build(), this.props);
            streams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue((Object)"A", (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), System.currentTimeMillis());
            Assert.assertTrue((String)"Timed out waiting to receive single message", (boolean)latch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse((boolean)streams.close(Duration.ofMillis(10L)));
        }
        finally {
            keepRunning.set(false);
            if (streams != null) {
                streams.close();
            }
        }
    }

    @Test
    public void shouldReturnThreadMetadata() {
        this.globalStreams.start();
        Set threadMetadata = this.globalStreams.localThreadsMetadata();
        Assert.assertNotNull((Object)threadMetadata);
        Assert.assertEquals((long)2L, (long)threadMetadata.size());
        for (ThreadMetadata metadata : threadMetadata) {
            Assert.assertTrue((String)("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED"), (boolean)Arrays.asList("RUNNING", "STARTING", "PARTITIONS_REVOKED", "PARTITIONS_ASSIGNED", "CREATED").contains(metadata.threadState()));
            Assert.assertEquals((long)0L, (long)metadata.standbyTasks().size());
            Assert.assertEquals((long)0L, (long)metadata.activeTasks().size());
            String threadName = metadata.threadName();
            Assert.assertTrue((boolean)threadName.startsWith("clientId-StreamThread-"));
            Assert.assertEquals((Object)(threadName + "-consumer"), (Object)metadata.consumerClientId());
            Assert.assertEquals((Object)(threadName + "-restore-consumer"), (Object)metadata.restoreConsumerClientId());
            Assert.assertEquals(Collections.singleton(threadName + "-producer"), (Object)metadata.producerClientIds());
            Assert.assertEquals((Object)"clientId-admin", (Object)metadata.adminClientId());
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        try {
            this.globalStreams.cleanUp();
            this.globalStreams.start();
        }
        finally {
            this.globalStreams.close();
        }
        this.globalStreams.cleanUp();
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        this.globalStreams.start();
        TestUtils.waitForCondition(() -> this.globalStreams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
        try {
            this.globalStreams.cleanUp();
            Assert.fail((String)"Should have thrown IllegalStateException");
        }
        catch (IllegalStateException expected) {
            Assert.assertEquals((Object)"Cannot clean up while running.", (Object)expected.getMessage());
        }
    }

    @Test
    public void shouldCleanupOldStateDirs() throws InterruptedException {
        this.props.setProperty("state.cleanup.delay.ms", "1");
        String topic = "topic";
        CLUSTER.createTopic("topic");
        StreamsBuilder builder = new StreamsBuilder();
        builder.table("topic", Materialized.as((String)"store"));
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props);){
            CountDownLatch latch = new CountDownLatch(1);
            streams.setStateListener((newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                    latch.countDown();
                }
            });
            String appDir = this.props.getProperty("state.dir") + File.separator + this.props.getProperty("application.id");
            File oldTaskDir = new File(appDir, "10_1");
            Assert.assertTrue((boolean)oldTaskDir.mkdirs());
            streams.start();
            latch.await(30L, TimeUnit.SECONDS);
            this.verifyCleanupStateDir(appDir, oldTaskDir);
            Assert.assertTrue((boolean)oldTaskDir.mkdirs());
            this.verifyCleanupStateDir(appDir, oldTaskDir);
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForClose() {
        try (KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);){
            streams.close(Duration.ofMillis(-1L));
            Assert.fail((String)"should not accept negative close parameter");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.builder.build(), this.props);
        Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));
        th.start();
        try {
            th.join(30000L);
            Assert.assertFalse((boolean)th.isAlive());
        }
        finally {
            streams.close();
        }
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory() throws Exception {
        String inputTopic = this.testName.getMethodName() + "-input";
        String outputTopic = this.testName.getMethodName() + "-output";
        CLUSTER.createTopics(inputTopic, outputTopic);
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{inputTopic}).addProcessor("process", () -> new AbstractProcessor<String, String>(){

            public void process(String key, String value) {
                if (value.length() % 2 == 0) {
                    this.context().forward((Object)key, (Object)(key + value));
                }
            }
        }, new String[]{"source"}).addSink("sink", outputTopic, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"process"});
        this.startStreamsAndCheckDirExists(topology, Collections.singleton(inputTopic), outputTopic, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() throws Exception {
        String inputTopic = this.testName.getMethodName() + "-input";
        String outputTopic = this.testName.getMethodName() + "-output";
        String globalTopicName = this.testName.getMethodName() + "-global";
        String storeName = this.testName.getMethodName() + "-counts";
        String globalStoreName = this.testName.getMethodName() + "-globalStore";
        Topology topology = this.getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
        this.startStreamsAndCheckDirExists(topology, Arrays.asList(inputTopic, globalTopicName), outputTopic, false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory() throws Exception {
        String inputTopic = this.testName.getMethodName() + "-input";
        String outputTopic = this.testName.getMethodName() + "-output";
        String globalTopicName = this.testName.getMethodName() + "-global";
        String storeName = this.testName.getMethodName() + "-counts";
        String globalStoreName = this.testName.getMethodName() + "-globalStore";
        Topology topology = this.getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
        this.startStreamsAndCheckDirExists(topology, Arrays.asList(inputTopic, globalTopicName), outputTopic, true);
    }

    private Topology getStatefulTopology(String inputTopic, String outputTopic, String globalTopicName, final String storeName, String globalStoreName, boolean isPersistentStore) throws Exception {
        CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName);
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(isPersistentStore ? Stores.persistentKeyValueStore((String)storeName) : Stores.inMemoryKeyValueStore((String)storeName)), (Serde)Serdes.String(), (Serde)Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{inputTopic}).addProcessor("process", () -> new AbstractProcessor<String, String>(){

            public void process(String key, String value) {
                KeyValueStore kvStore = (KeyValueStore)this.context().getStateStore(storeName);
                kvStore.put((Object)key, (Object)5L);
                this.context().forward((Object)key, (Object)"5");
                this.context().commit();
            }
        }, new String[]{"source"}).addStateStore(storeBuilder, new String[]{"process"}).addSink("sink", outputTopic, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"process"});
        StoreBuilder globalStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(isPersistentStore ? Stores.persistentKeyValueStore((String)globalStoreName) : Stores.inMemoryKeyValueStore((String)globalStoreName)), (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled();
        topology.addGlobalStore(globalStoreBuilder, "global", Serdes.String().deserializer(), Serdes.String().deserializer(), globalTopicName, globalTopicName + "-processor", new MockProcessorSupplier());
        return topology;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startStreamsAndCheckDirExists(Topology topology, Collection<String> inputTopics, String outputTopic, boolean shouldFilesExist) throws Exception {
        File baseDir = new File(TestUtils.IO_TMP_DIR + File.separator + "kafka-" + TestUtils.randomString((int)5));
        Path basePath = baseDir.toPath();
        if (!baseDir.exists()) {
            Files.createDirectory(basePath, new FileAttribute[0]);
        }
        Properties localProps = new Properties();
        localProps.putAll((Map<?, ?>)this.props);
        localProps.put("state.dir", baseDir.getAbsolutePath());
        KafkaStreams streams = new KafkaStreams(topology, localProps);
        streams.start();
        for (String topic : inputTopics) {
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, Collections.singletonList(new KeyValue((Object)"A", (Object)"A")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), System.currentTimeMillis());
        }
        IntegrationTestUtils.readKeyValues(outputTopic, TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)(outputTopic + "-group"), StringDeserializer.class, StringDeserializer.class), 5000L, 1);
        try {
            List files = Files.find(basePath, 999, (p, bfa) -> !p.equals(basePath), new FileVisitOption[0]).collect(Collectors.toList());
            if (shouldFilesExist && files.isEmpty()) {
                Assert.fail((String)("Files should have existed, but it didn't: " + files));
            }
            if (!shouldFilesExist && !files.isEmpty()) {
                Assert.fail((String)("Files should not have existed, but it did: " + files));
            }
        }
        catch (IOException e) {
            Assert.fail((String)("Couldn't read the state directory : " + baseDir.getPath()));
        }
        finally {
            streams.close();
            streams.cleanUp();
            Utils.delete((File)baseDir);
        }
    }

    private void verifyCleanupStateDir(String appDir, File oldTaskDir) throws InterruptedException {
        File taskDir = new File(appDir, "0_0");
        TestUtils.waitForCondition(() -> !oldTaskDir.exists() && taskDir.exists(), (String)"cleanup has not successfully run");
        Assert.assertTrue((boolean)taskDir.exists());
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        int numChanges = 0;
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
        }
    }
}

