/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.StandbyContextImpl;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StandbyTaskTest {
    private final TaskId taskId = new TaskId(0, 1);
    private StandbyTask task;
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final String applicationId = "test-application";
    private final String storeName1 = "store1";
    private final String storeName2 = "store2";
    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store1");
    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store2");
    private final String globalStoreName = "ktable1";
    private final TopicPartition partition1 = new TopicPartition(this.storeChangelogTopicName1, 1);
    private final TopicPartition partition2 = new TopicPartition(this.storeChangelogTopicName2, 1);
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final Set<TopicPartition> topicPartitions = Collections.emptySet();
    private final ProcessorTopology topology = ProcessorTopologyFactories.withLocalStores(Arrays.asList(new MockKeyValueStoreBuilder("store1", false).build(), new MockKeyValueStoreBuilder("store2", true).build()), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"store1", (Object)this.storeChangelogTopicName1), Utils.mkEntry((Object)"store2", (Object)this.storeChangelogTopicName2)}));
    private final TopicPartition globalTopicPartition = new TopicPartition("ktable1", 0);
    private final Set<TopicPartition> ktablePartitions = Utils.mkSet((Object[])new TopicPartition[]{this.globalTopicPartition});
    private final ProcessorTopology ktableTopology = ProcessorTopologyFactories.withLocalStores(Collections.singletonList(new MockKeyValueStoreBuilder(this.globalTopicPartition.topic(), true).withLoggingDisabled().build()), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"ktable1", (Object)this.globalTopicPartition.topic())}));
    private File baseDir;
    private StateDirectory stateDirectory;
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer(new IntegerSerializer(), new IntegerSerializer());
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.restoreStateConsumer, Duration.ZERO, (StateRestoreListener)this.stateRestoreListener, new LogContext("standby-task-test "));
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);

    private StreamsConfig createConfig(File baseDir) throws IOException {
        return new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"state.dir", (Object)baseDir.getCanonicalPath()), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName())})));
    }

    @Before
    public void setup() throws Exception {
        this.restoreStateConsumer.reset();
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName1, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])));
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName2, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])));
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory(this.createConfig(this.baseDir), (Time)new MockTime(), true);
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null && !this.task.isClosed()) {
            this.task.close(true, false);
            this.task = null;
        }
        Utils.delete((File)this.baseDir);
    }

    @Test
    public void testStorePartitions() throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        this.task = new StandbyTask(this.taskId, this.topicPartitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        this.task.initializeStateStores();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.partition2, this.partition1}), new HashSet(this.task.checkpointedOffsets().keySet()));
    }

    @Test
    public void testUpdateNonInitializedStore() throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        this.task = new StandbyTask(this.taskId, this.topicPartitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        this.restoreStateConsumer.assign(new ArrayList<TopicPartition>(this.task.checkpointedOffsets().keySet()));
        try {
            this.task.update(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)this.recordKey, (Object)this.recordValue)));
            Assert.fail((String)"expected an exception");
        }
        catch (NullPointerException npe) {
            MatcherAssert.assertThat((Object)npe.getMessage(), (Matcher)CoreMatchers.containsString((String)"stateRestoreCallback must not be null"));
        }
    }

    @Test
    public void testUpdate() throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        this.task = new StandbyTask(this.taskId, this.topicPartitions, this.topology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        this.task.initializeStateStores();
        Set<TopicPartition> partition = Collections.singleton(this.partition2);
        this.restoreStateConsumer.assign(partition);
        for (ConsumerRecord record : Arrays.asList(new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)1, (Object)100), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)2, (Object)100), new ConsumerRecord(this.partition2.topic(), this.partition2.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)3, (Object)100))) {
            this.restoreStateConsumer.bufferRecord((ConsumerRecord<Integer, Integer>)record);
        }
        this.restoreStateConsumer.seekToBeginning(partition);
        this.task.update(this.partition2, this.restoreStateConsumer.poll(Duration.ofMillis(100L)).records(this.partition2));
        StandbyContextImpl context = (StandbyContextImpl)this.task.context();
        MockKeyValueStore store1 = (MockKeyValueStore)context.getStateMgr().getStore("store1");
        MockKeyValueStore store2 = (MockKeyValueStore)context.getStateMgr().getStore("store2");
        Assert.assertEquals(Collections.emptyList(), store1.keys);
        Assert.assertEquals(Arrays.asList(1, 2, 3), store2.keys);
    }

    @Test
    public void shouldRestoreToWindowedStores() throws IOException {
        String storeName = "windowed-store";
        String changelogName = "test-application-windowed-store-changelog";
        TopicPartition topicPartition = new TopicPartition("test-application-windowed-store-changelog", 1);
        List<TopicPartition> partitions = Collections.singletonList(topicPartition);
        this.consumer.assign(partitions);
        InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId("test-application");
        InternalStreamsBuilder builder = new InternalStreamsBuilder(internalTopologyBuilder);
        builder.stream(Collections.singleton("topic"), new ConsumedInternal()).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(60000L)).grace(Duration.ofMillis(0L))).count(Materialized.as((String)"windowed-store").withRetention(Duration.ofMillis(120000L)));
        builder.buildAndOptimizeTopology();
        this.task = new StandbyTask(this.taskId, partitions, internalTopologyBuilder.build(Integer.valueOf(0)), this.consumer, (ChangelogReader)new StoreChangelogReader(this.restoreStateConsumer, Duration.ZERO, (StateRestoreListener)this.stateRestoreListener, new LogContext("standby-task-test ")), this.createConfig(this.baseDir), (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
        this.task.initializeStateStores();
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicPartition, (Object)new OffsetAndMetadata(35L))}));
        this.task.commit();
        List remaining1 = this.task.update(topicPartition, Arrays.asList(this.makeWindowedConsumerRecord("test-application-windowed-store-changelog", 10, 1, 0L, 60000L), this.makeWindowedConsumerRecord("test-application-windowed-store-changelog", 20, 2, 60000L, 120000L), this.makeWindowedConsumerRecord("test-application-windowed-store-changelog", 30, 3, 120000L, 180000L), this.makeWindowedConsumerRecord("test-application-windowed-store-changelog", 40, 4, 180000L, 240000L)));
        Assert.assertEquals(Arrays.asList(new KeyValue((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 60000L)), (Object)100L), new KeyValue((Object)new Windowed((Object)2, (Window)new TimeWindow(60000L, 120000L)), (Object)100L), new KeyValue((Object)new Windowed((Object)3, (Window)new TimeWindow(120000L, 180000L)), (Object)100L)), this.getWindowedStoreContents("windowed-store", this.task));
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicPartition, (Object)new OffsetAndMetadata(45L))}));
        this.task.commit();
        List remaining2 = this.task.update(topicPartition, remaining1);
        Assert.assertEquals(Collections.emptyList(), (Object)remaining2);
        Assert.assertEquals(Arrays.asList(new KeyValue((Object)new Windowed((Object)2, (Window)new TimeWindow(60000L, 120000L)), (Object)100L), new KeyValue((Object)new Windowed((Object)3, (Window)new TimeWindow(120000L, 180000L)), (Object)100L), new KeyValue((Object)new Windowed((Object)4, (Window)new TimeWindow(180000L, 240000L)), (Object)100L)), this.getWindowedStoreContents("windowed-store", this.task));
    }

    private ConsumerRecord<byte[], byte[]> makeWindowedConsumerRecord(String changelogName, int offset, int key, long start, long end) {
        Windowed data = new Windowed((Object)key, (Window)new TimeWindow(start, end));
        Bytes wrap = Bytes.wrap((byte[])new IntegerSerializer().serialize(null, (Integer)data.key()));
        byte[] keyBytes = WindowKeySchema.toStoreKeyBinary((Windowed)new Windowed((Object)wrap, data.window()), (int)1).get();
        return new ConsumerRecord(changelogName, 1, (long)offset, start, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)keyBytes, (Object)new LongSerializer().serialize(null, Long.valueOf(100L)));
    }

    @Test
    public void shouldWriteCheckpointFile() throws IOException {
        String storeName = "checkpoint-file-store";
        String changelogName = "test-application-checkpoint-file-store-changelog";
        TopicPartition topicPartition = new TopicPartition("test-application-checkpoint-file-store-changelog", 1);
        List<TopicPartition> partitions = Collections.singletonList(topicPartition);
        InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder().setApplicationId("test-application");
        InternalStreamsBuilder builder = new InternalStreamsBuilder(internalTopologyBuilder);
        builder.stream(Collections.singleton("topic"), new ConsumedInternal()).groupByKey().count(Materialized.as((String)"checkpoint-file-store"));
        builder.buildAndOptimizeTopology();
        this.consumer.assign(partitions);
        this.task = new StandbyTask(this.taskId, partitions, internalTopologyBuilder.build(Integer.valueOf(0)), this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(this.baseDir), (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
        this.task.initializeStateStores();
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)topicPartition, (Object)new OffsetAndMetadata(20L))}));
        this.task.commit();
        this.task.update(topicPartition, Collections.singletonList(this.makeWindowedConsumerRecord("test-application-checkpoint-file-store-changelog", 10, 1, 0L, 60000L)));
        this.task.suspend();
        this.task.close(true, false);
        File taskDir = this.stateDirectory.directoryForTask(this.taskId);
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ".checkpoint"));
        Map offsets = checkpoint.read();
        Assert.assertEquals((long)1L, (long)offsets.size());
        Assert.assertEquals((Object)new Long(11L), offsets.get(topicPartition));
    }

    private List<KeyValue<Windowed<Integer>, Long>> getWindowedStoreContents(String storeName, StandbyTask task) {
        StandbyContextImpl context = (StandbyContextImpl)task.context();
        ArrayList<KeyValue<Windowed<Integer>, Long>> result = new ArrayList<KeyValue<Windowed<Integer>, Long>>();
        try (KeyValueIterator iterator = ((WindowStore)context.getStateMgr().getStore(storeName)).all();){
            while (iterator.hasNext()) {
                KeyValue next = (KeyValue)iterator.next();
                Integer deserializedKey = new IntegerDeserializer().deserialize(null, (byte[])((Windowed)next.key).key());
                result.add((KeyValue<Windowed<Integer>, Long>)new KeyValue((Object)new Windowed((Object)deserializedKey, ((Windowed)next.key).window()), next.value));
            }
        }
        return result;
    }

    @Test
    public void shouldRestoreToKTable() throws IOException {
        this.consumer.assign(Collections.singletonList(this.globalTopicPartition));
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.globalTopicPartition, (Object)new OffsetAndMetadata(0L))}));
        this.task = new StandbyTask(this.taskId, this.ktablePartitions, this.ktableTopology, this.consumer, (ChangelogReader)this.changelogReader, this.createConfig(this.baseDir), null, this.stateDirectory);
        this.task.initializeStateStores();
        List remaining = this.task.update(this.globalTopicPartition, Arrays.asList(this.makeConsumerRecord(this.globalTopicPartition, 10L, 1), this.makeConsumerRecord(this.globalTopicPartition, 20L, 2), this.makeConsumerRecord(this.globalTopicPartition, 30L, 3), this.makeConsumerRecord(this.globalTopicPartition, 40L, 4), this.makeConsumerRecord(this.globalTopicPartition, 50L, 5)));
        Assert.assertEquals((long)5L, (long)remaining.size());
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.globalTopicPartition, (Object)new OffsetAndMetadata(10L))}));
        this.task.commit();
        remaining = this.task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)5L, (long)remaining.size());
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.globalTopicPartition, (Object)new OffsetAndMetadata(11L))}));
        this.task.commit();
        remaining = this.task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)4L, (long)remaining.size());
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.globalTopicPartition, (Object)new OffsetAndMetadata(45L))}));
        this.task.commit();
        remaining = this.task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)1L, (long)remaining.size());
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.globalTopicPartition, (Object)new OffsetAndMetadata(50L))}));
        this.task.commit();
        remaining = this.task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals((long)1L, (long)remaining.size());
        this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.globalTopicPartition, (Object)new OffsetAndMetadata(60L))}));
        this.task.commit();
        remaining = this.task.update(this.globalTopicPartition, remaining);
        Assert.assertEquals(Collections.emptyList(), (Object)remaining);
    }

    private ConsumerRecord<byte[], byte[]> makeConsumerRecord(TopicPartition topicPartition, long offset, int key) {
        IntegerSerializer integerSerializer = new IntegerSerializer();
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)integerSerializer.serialize(null, Integer.valueOf(key)), (Object)integerSerializer.serialize(null, Integer.valueOf(100)));
    }

    @Test
    public void shouldInitializeStateStoreWithoutException() throws IOException {
        InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        builder.stream(Collections.singleton("topic"), new ConsumedInternal()).groupByKey().count();
        this.initializeStandbyStores(builder);
    }

    @Test
    public void shouldInitializeWindowStoreWithoutException() throws IOException {
        InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
        builder.stream(Collections.singleton("topic"), new ConsumedInternal()).groupByKey().windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(100L))).count();
        this.initializeStandbyStores(builder);
    }

    private void initializeStandbyStores(InternalStreamsBuilder builder) throws IOException {
        StreamsConfig config = this.createConfig(this.baseDir);
        builder.buildAndOptimizeTopology();
        InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);
        ProcessorTopology topology = internalTopologyBuilder.setApplicationId("test-application").build(Integer.valueOf(0));
        this.task = new StandbyTask(this.taskId, Collections.emptySet(), topology, this.consumer, (ChangelogReader)this.changelogReader, config, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
        this.task.initializeStateStores();
        Assert.assertTrue((boolean)this.task.hasStateStores());
    }

    @Test
    public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
        this.consumer.assign(Collections.singletonList(this.globalTopicPartition));
        HashMap<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(100L));
        this.consumer.commitSync(committedOffsets);
        this.restoreStateConsumer.updatePartitions("ktable1", Collections.singletonList(new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])));
        TaskId taskId = new TaskId(0, 0);
        MockTime time = new MockTime();
        StreamsConfig config = this.createConfig(this.baseDir);
        this.task = new StandbyTask(taskId, this.ktablePartitions, this.ktableTopology, this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory);
        this.task.initializeStateStores();
        this.restoreStateConsumer.assign(new ArrayList<TopicPartition>(this.task.checkpointedOffsets().keySet()));
        byte[] serializedValue = Serdes.Integer().serializer().serialize("", (Object)1);
        this.task.update(this.globalTopicPartition, Collections.singletonList(new ConsumerRecord(this.globalTopicPartition.topic(), this.globalTopicPartition.partition(), 50L, (Object)serializedValue, (Object)serializedValue)));
        time.sleep(config.getLong("commit.interval.ms").longValue());
        this.task.commit();
        Map checkpoint = new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).read();
        MatcherAssert.assertThat((Object)checkpoint, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(this.globalTopicPartition, 51L)));
    }

    @Test
    public void shouldCloseStateMangerOnTaskCloseWhenCommitFailed() throws Exception {
        this.consumer.assign(Collections.singletonList(this.globalTopicPartition));
        HashMap<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        committedOffsets.put(new TopicPartition(this.globalTopicPartition.topic(), this.globalTopicPartition.partition()), new OffsetAndMetadata(100L));
        this.consumer.commitSync(committedOffsets);
        this.restoreStateConsumer.updatePartitions("ktable1", Collections.singletonList(new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0])));
        StreamsConfig config = this.createConfig(this.baseDir);
        final AtomicBoolean closedStateManager = new AtomicBoolean(false);
        this.task = new StandbyTask(this.taskId, this.ktablePartitions, this.ktableTopology, (Consumer)this.consumer, (ChangelogReader)this.changelogReader, config, null, this.stateDirectory){

            public void commit() {
                throw new RuntimeException("KABOOM!");
            }

            void closeStateManager(boolean clean) throws ProcessorStateException {
                closedStateManager.set(true);
            }
        };
        this.task.initializeStateStores();
        try {
            this.task.close(true, false);
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception e) {
            this.task = null;
        }
        Assert.assertTrue((boolean)closedStateManager.get());
    }
}

