/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.CachingSessionStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CachingInMemorySessionStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 600;
    private static final Long DEFAULT_TIMESTAMP = 10L;
    private static final long SEGMENT_INTERVAL = 100L;
    private static final String TOPIC = "topic";
    private static final String CACHE_NAMESPACE = "0_0-store-name";
    private final Bytes keyA = Bytes.wrap((byte[])"a".getBytes());
    private final Bytes keyAA = Bytes.wrap((byte[])"aa".getBytes());
    private final Bytes keyB = Bytes.wrap((byte[])"b".getBytes());
    private SessionStore<Bytes, byte[]> underlyingStore;
    private InternalMockProcessorContext context;
    private CachingSessionStore cachingStore;
    private ThreadCache cache;

    @Before
    public void before() {
        this.underlyingStore = new InMemorySessionStore("store-name", Long.MAX_VALUE, "metric-scope");
        this.cachingStore = new CachingSessionStore(this.underlyingStore, 100L);
        this.cache = new ThreadCache(new LogContext("testCache "), 600L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP.longValue(), 0L, 0, TOPIC, (Headers)new RecordHeaders()));
        this.cachingStore.init((StateStoreContext)this.context, (StateStore)this.cachingStore);
    }

    @After
    public void after() {
        this.cachingStore.close();
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        SessionStore inner = (SessionStore)EasyMock.mock(InMemorySessionStore.class);
        CachingSessionStore outer = new CachingSessionStore(inner, 100L);
        EasyMock.expect((Object)inner.name()).andStubReturn((Object)"store");
        inner.init((ProcessorContext)this.context, (StateStore)outer);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{inner});
        outer.init((ProcessorContext)this.context, (StateStore)outer);
        EasyMock.verify((Object[])new Object[]{inner});
    }

    @Test
    public void shouldDelegateInit() {
        SessionStore inner = (SessionStore)EasyMock.mock(InMemorySessionStore.class);
        CachingSessionStore outer = new CachingSessionStore(inner, 100L);
        EasyMock.expect((Object)inner.name()).andStubReturn((Object)"store");
        inner.init((StateStoreContext)this.context, (StateStore)outer);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{inner});
        outer.init((StateStoreContext)this.context, (StateStore)outer);
        EasyMock.verify((Object[])new Object[]{inner});
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        try (KeyValueIterator a = this.cachingStore.findSessions(this.keyA, 0L, 0L);
             KeyValueIterator b = this.cachingStore.findSessions(this.keyB, 0L, 0L);){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)a.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)b.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)a.hasNext());
            Assert.assertFalse((boolean)b.hasNext());
        }
    }

    @Test
    public void shouldMatchPositionAfterPutWithFlushListener() {
        this.cachingStore.setFlushListener(record -> {}, false);
        this.shouldMatchPositionAfterPut();
    }

    @Test
    public void shouldMatchPositionAfterPutWithoutFlushListener() {
        this.cachingStore.setFlushListener(null, false);
        this.shouldMatchPositionAfterPut();
    }

    private void shouldMatchPositionAfterPut() {
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", (Headers)new RecordHeaders()));
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", (Headers)new RecordHeaders()));
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", (Headers)new RecordHeaders()));
        Assert.assertEquals((Object)Position.emptyPosition(), (Object)this.cachingStore.getPosition());
        Assert.assertEquals((Object)Position.emptyPosition(), (Object)this.underlyingStore.getPosition());
        this.cachingStore.flush();
        Assert.assertEquals((Object)Position.fromMap((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"", (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)0, (Object)2L)}))})), (Object)this.cachingStore.getPosition());
        Assert.assertEquals((Object)Position.fromMap((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"", (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)0, (Object)2L)}))})), (Object)this.underlyingStore.getPosition());
    }

    @Test
    public void shouldPutFetchAllKeysFromCache() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        try (KeyValueIterator all = this.cachingStore.fetch(this.keyA, this.keyB);){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        all = this.cachingStore.fetch(null, this.keyB);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (all != null) {
                if (var2_2 != null) {
                    try {
                        all.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    all.close();
                }
            }
        }
        all = this.cachingStore.fetch(null, this.keyB);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (all != null) {
                if (var2_2 != null) {
                    try {
                        all.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    all.close();
                }
            }
        }
        all = this.cachingStore.fetch(null, this.keyB);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (all != null) {
                if (var2_2 != null) {
                    try {
                        all.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    all.close();
                }
            }
        }
    }

    @Test
    public void shouldPutBackwardFetchAllKeysFromCache() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        try (KeyValueIterator all = this.cachingStore.backwardFetch(this.keyA, this.keyB);){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        all = this.cachingStore.backwardFetch(null, this.keyB);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (all != null) {
                if (var2_2 != null) {
                    try {
                        all.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    all.close();
                }
            }
        }
        all = this.cachingStore.backwardFetch(null, this.keyB);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (all != null) {
                if (var2_2 != null) {
                    try {
                        all.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    all.close();
                }
            }
        }
        all = this.cachingStore.backwardFetch(null, null);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)all.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)all.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (all != null) {
                if (var2_2 != null) {
                    try {
                        all.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    all.close();
                }
            }
        }
    }

    @Test
    public void shouldCloseWrappedStoreAndCacheAfterErrorDuringCacheFlush() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on flush"));
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((CachingSessionStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on close"));
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((CachingSessionStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseCacheAfterErrorDuringWrappedStoreClose() {
        this.setUpCloseTests();
        EasyMock.reset((Object[])new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay((Object[])new Object[]{this.cache});
        EasyMock.reset((Object[])new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("Simulating an error on close"));
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        Assert.assertThrows(RuntimeException.class, () -> ((CachingSessionStore)this.cachingStore).close());
        EasyMock.verify((Object[])new Object[]{this.cache, this.underlyingStore});
    }

    private void setUpCloseTests() {
        this.underlyingStore = (SessionStore)EasyMock.createNiceMock(SessionStore.class);
        EasyMock.expect((Object)this.underlyingStore.name()).andStubReturn((Object)"store-name");
        EasyMock.expect((Object)this.underlyingStore.isOpen()).andStubReturn((Object)true);
        EasyMock.replay((Object[])new Object[]{this.underlyingStore});
        this.cachingStore = new CachingSessionStore(this.underlyingStore, 100L);
        this.cache = (ThreadCache)EasyMock.niceMock(ThreadCache.class);
        InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        context.setRecordContext(new ProcessorRecordContext(10L, 0L, 0, TOPIC, (Headers)new RecordHeaders()));
        this.cachingStore.init(context, (StateStore)this.cachingStore);
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        try (KeyValueIterator some = this.cachingStore.findSessions(this.keyAA, this.keyB, 0L, 0L);){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        some = this.cachingStore.findSessions(null, this.keyAA, 0L, 0L);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (some != null) {
                if (var2_2 != null) {
                    try {
                        some.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    some.close();
                }
            }
        }
        some = this.cachingStore.findSessions(this.keyAA, this.keyB, 0L, 0L);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (some != null) {
                if (var2_2 != null) {
                    try {
                        some.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    some.close();
                }
            }
        }
        some = this.cachingStore.findSessions(null, null, 0L, 0L);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (some != null) {
                if (var2_2 != null) {
                    try {
                        some.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    some.close();
                }
            }
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCache() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1".getBytes());
        Assert.assertEquals((long)3L, (long)this.cache.size());
        try (KeyValueIterator some = this.cachingStore.backwardFindSessions(this.keyAA, this.keyB, 0L, 0L);){
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        some = this.cachingStore.backwardFindSessions(null, this.keyAA, 0L, 0L);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (some != null) {
                if (var2_2 != null) {
                    try {
                        some.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    some.close();
                }
            }
        }
        some = this.cachingStore.backwardFindSessions(this.keyAA, this.keyB, 0L, 0L);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (some != null) {
                if (var2_2 != null) {
                    try {
                        some.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    some.close();
                }
            }
        }
        some = this.cachingStore.backwardFindSessions(null, null, 0L, 0L);
        var2_2 = null;
        try {
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "1");
            StreamsTestUtils.verifyWindowedKeyValue((KeyValue<Windowed<Bytes>, byte[]>)((KeyValue)some.next()), (Windowed<Bytes>)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1");
            Assert.assertFalse((boolean)some.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (some != null) {
                if (var2_2 != null) {
                    try {
                        some.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    some.close();
                }
            }
        }
    }

    @Test
    public void shouldFetchAllSessionsWithSameRecordKey() {
        List expected = Arrays.asList(KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), (Object)"1".getBytes()), KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(10L, 10L)), (Object)"2".getBytes()), KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(100L, 100L)), (Object)"3".getBytes()), KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(1000L, 1000L)), (Object)"4".getBytes()));
        for (KeyValue keyValue : expected) {
            this.cachingStore.put((Windowed)keyValue.key, (byte[])keyValue.value);
        }
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "5".getBytes());
        List results = StreamsTestUtils.toList(this.cachingStore.fetch(this.keyA));
        StreamsTestUtils.verifyKeyValueList(expected, results);
    }

    @Test
    public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
        List expected = Arrays.asList(KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), (Object)"1".getBytes()), KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(10L, 10L)), (Object)"2".getBytes()), KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(100L, 100L)), (Object)"3".getBytes()), KeyValue.pair((Object)new Windowed((Object)this.keyA, (Window)new SessionWindow(1000L, 1000L)), (Object)"4".getBytes()));
        for (KeyValue keyValue : expected) {
            this.cachingStore.put((Windowed)keyValue.key, (byte[])keyValue.value);
        }
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L)), "5".getBytes());
        List results = StreamsTestUtils.toList(this.cachingStore.backwardFetch(this.keyA));
        Collections.reverse(results);
        StreamsTestUtils.verifyKeyValueList(expected, results);
    }

    @Test
    public void shouldFlushItemsToStoreOnEviction() {
        List<KeyValue<Windowed<Bytes>, byte[]>> added = this.addSessionsUntilOverflow("a", "b", "c", "d");
        Assert.assertEquals((long)(added.size() - 1), (long)this.cache.size());
        try (KeyValueIterator iterator = this.cachingStore.findSessions((Bytes)((Windowed)added.get((int)0).key).key(), 0L, 0L);){
            KeyValue next = (KeyValue)iterator.next();
            Assert.assertEquals((Object)added.get((int)0).key, (Object)next.key);
            Assert.assertArrayEquals((byte[])((byte[])added.get((int)0).value), (byte[])((byte[])next.value));
        }
    }

    @Test
    public void shouldQueryItemsInCacheAndStore() {
        List added = this.addSessionsUntilOverflow("a");
        List actual = StreamsTestUtils.toList(this.cachingStore.findSessions(Bytes.wrap((byte[])"a".getBytes(StandardCharsets.UTF_8)), 0L, (long)added.size() * 10L));
        StreamsTestUtils.verifyKeyValueList(added, actual);
    }

    @Test
    public void shouldRemove() {
        Windowed a = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        Windowed b = new Windowed((Object)this.keyB, (Window)new SessionWindow(0L, 0L));
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.put(b, "2".getBytes());
        this.cachingStore.remove(a);
        try (KeyValueIterator rangeIter = this.cachingStore.findSessions(this.keyA, 0L, 0L);){
            Assert.assertFalse((boolean)rangeIter.hasNext());
            Assert.assertNull((Object)this.cachingStore.fetchSession(this.keyA, 0L, 0L));
            MatcherAssert.assertThat((Object)this.cachingStore.fetchSession(this.keyB, 0L, 0L), (Matcher)Matchers.equalTo((Object)"2".getBytes()));
        }
    }

    @Test
    public void shouldFetchCorrectlyAcrossSegments() {
        Windowed a1 = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        Windowed a2 = new Windowed((Object)this.keyA, (Window)new SessionWindow(100L, 100L));
        Windowed a3 = new Windowed((Object)this.keyA, (Window)new SessionWindow(200L, 200L));
        Windowed a4 = new Windowed((Object)this.keyA, (Window)new SessionWindow(300L, 300L));
        Windowed a5 = new Windowed((Object)this.keyA, (Window)new SessionWindow(400L, 400L));
        Windowed a6 = new Windowed((Object)this.keyA, (Window)new SessionWindow(500L, 500L));
        this.cachingStore.put(a1, "1".getBytes());
        this.cachingStore.put(a2, "2".getBytes());
        this.cachingStore.put(a3, "3".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(a4, "4".getBytes());
        this.cachingStore.put(a5, "5".getBytes());
        this.cachingStore.put(a6, "6".getBytes());
        try (KeyValueIterator results = this.cachingStore.findSessions(this.keyA, 0L, 500L);){
            Assert.assertEquals((Object)a1, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a2, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a3, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a4, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a5, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a6, (Object)((KeyValue)results.next()).key);
            Assert.assertFalse((boolean)results.hasNext());
        }
    }

    @Test
    public void shouldBackwardFetchCorrectlyAcrossSegments() {
        Windowed a1 = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        Windowed a2 = new Windowed((Object)this.keyA, (Window)new SessionWindow(100L, 100L));
        Windowed a3 = new Windowed((Object)this.keyA, (Window)new SessionWindow(200L, 200L));
        Windowed a4 = new Windowed((Object)this.keyA, (Window)new SessionWindow(300L, 300L));
        Windowed a5 = new Windowed((Object)this.keyA, (Window)new SessionWindow(400L, 400L));
        Windowed a6 = new Windowed((Object)this.keyA, (Window)new SessionWindow(500L, 500L));
        this.cachingStore.put(a1, "1".getBytes());
        this.cachingStore.put(a2, "2".getBytes());
        this.cachingStore.put(a3, "3".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(a4, "4".getBytes());
        this.cachingStore.put(a5, "5".getBytes());
        this.cachingStore.put(a6, "6".getBytes());
        try (KeyValueIterator results = this.cachingStore.backwardFindSessions(this.keyA, 0L, 500L);){
            Assert.assertEquals((Object)a6, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a5, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a4, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a3, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a2, (Object)((KeyValue)results.next()).key);
            Assert.assertEquals((Object)a1, (Object)((KeyValue)results.next()).key);
            Assert.assertFalse((boolean)results.hasNext());
        }
    }

    @Test
    public void shouldFetchRangeCorrectlyAcrossSegments() {
        Windowed a1 = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        Windowed aa1 = new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L));
        Windowed a2 = new Windowed((Object)this.keyA, (Window)new SessionWindow(100L, 100L));
        Windowed a3 = new Windowed((Object)this.keyA, (Window)new SessionWindow(200L, 200L));
        Windowed aa3 = new Windowed((Object)this.keyAA, (Window)new SessionWindow(200L, 200L));
        this.cachingStore.put(a1, "1".getBytes());
        this.cachingStore.put(aa1, "1".getBytes());
        this.cachingStore.put(a2, "2".getBytes());
        this.cachingStore.put(a3, "3".getBytes());
        this.cachingStore.put(aa3, "3".getBytes());
        KeyValueIterator rangeResults = this.cachingStore.findSessions(this.keyA, this.keyAA, 0L, 200L);
        ArrayList<Object> keys = new ArrayList<Object>();
        while (rangeResults.hasNext()) {
            keys.add(((KeyValue)rangeResults.next()).key);
        }
        rangeResults.close();
        Assert.assertEquals(Arrays.asList(a1, aa1, a2, a3, aa3), keys);
    }

    @Test
    public void shouldBackwardFetchRangeCorrectlyAcrossSegments() {
        Windowed a1 = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        Windowed aa1 = new Windowed((Object)this.keyAA, (Window)new SessionWindow(0L, 0L));
        Windowed a2 = new Windowed((Object)this.keyA, (Window)new SessionWindow(100L, 100L));
        Windowed a3 = new Windowed((Object)this.keyA, (Window)new SessionWindow(200L, 200L));
        Windowed aa3 = new Windowed((Object)this.keyAA, (Window)new SessionWindow(200L, 200L));
        this.cachingStore.put(a1, "1".getBytes());
        this.cachingStore.put(aa1, "1".getBytes());
        this.cachingStore.put(a2, "2".getBytes());
        this.cachingStore.put(a3, "3".getBytes());
        this.cachingStore.put(aa3, "3".getBytes());
        KeyValueIterator rangeResults = this.cachingStore.backwardFindSessions(this.keyA, this.keyAA, 0L, 200L);
        ArrayList<Object> keys = new ArrayList<Object>();
        while (rangeResults.hasNext()) {
            keys.add(((KeyValue)rangeResults.next()).key);
        }
        rangeResults.close();
        Assert.assertEquals(Arrays.asList(aa3, a3, a2, aa1, a1), keys);
    }

    @Test
    public void shouldSetFlushListener() {
        Assert.assertTrue((boolean)this.cachingStore.setFlushListener(null, true));
        Assert.assertTrue((boolean)this.cachingStore.setFlushListener(null, false));
    }

    @Test
    public void shouldForwardChangedValuesDuringFlush() {
        Windowed a = new Windowed((Object)this.keyA, (Window)new SessionWindow(2L, 4L));
        Windowed b = new Windowed((Object)this.keyA, (Window)new SessionWindow(1L, 2L));
        Windowed aDeserialized = new Windowed((Object)"a", (Window)new SessionWindow(2L, 4L));
        Windowed bDeserialized = new Windowed((Object)"a", (Window)new SessionWindow(1L, 2L));
        CacheFlushListenerStub flushListener = new CacheFlushListenerStub(new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), new StringDeserializer());
        this.cachingStore.setFlushListener(flushListener, true);
        this.cachingStore.put(b, "1".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, Change>(bDeserialized, new Change((Object)"1", null), DEFAULT_TIMESTAMP)), flushListener.forwarded);
        flushListener.forwarded.clear();
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, Change>(aDeserialized, new Change((Object)"1", null), DEFAULT_TIMESTAMP)), flushListener.forwarded);
        flushListener.forwarded.clear();
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, Change>(aDeserialized, new Change((Object)"2", (Object)"1"), DEFAULT_TIMESTAMP)), flushListener.forwarded);
        flushListener.forwarded.clear();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, Change>(aDeserialized, new Change(null, (Object)"2"), DEFAULT_TIMESTAMP)), flushListener.forwarded);
        flushListener.forwarded.clear();
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(Collections.emptyList(), flushListener.forwarded);
        flushListener.forwarded.clear();
    }

    @Test
    public void shouldNotForwardChangedValuesDuringFlushWhenSendOldValuesDisabled() {
        Windowed a = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        Windowed aDeserialized = new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L));
        CacheFlushListenerStub flushListener = new CacheFlushListenerStub(new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), new StringDeserializer());
        this.cachingStore.setFlushListener(flushListener, false);
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.flush();
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.flush();
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(aDeserialized, new Change((Object)"1", null), DEFAULT_TIMESTAMP), new KeyValueTimestamp<Windowed, Change>(aDeserialized, new Change((Object)"2", null), DEFAULT_TIMESTAMP), new KeyValueTimestamp<Windowed, Change>(aDeserialized, new Change(null, null), DEFAULT_TIMESTAMP)), flushListener.forwarded);
        flushListener.forwarded.clear();
        this.cachingStore.put(a, "1".getBytes());
        this.cachingStore.put(a, "2".getBytes());
        this.cachingStore.remove(a);
        this.cachingStore.flush();
        Assert.assertEquals(Collections.emptyList(), flushListener.forwarded);
        flushListener.forwarded.clear();
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFindSessionsAndEqualKeyRangeFindSessions() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 1L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(2L, 3L)), "2".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(4L, 5L)), "3".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(6L, 7L)), "4".getBytes());
        try (KeyValueIterator singleKeyIterator = this.cachingStore.findSessions(this.keyAA, 0L, 10L);
             KeyValueIterator keyRangeIterator = this.cachingStore.findSessions(this.keyAA, this.keyAA, 0L, 10L);){
            Assert.assertEquals((Object)singleKeyIterator.next(), (Object)keyRangeIterator.next());
            Assert.assertEquals((Object)singleKeyIterator.next(), (Object)keyRangeIterator.next());
            Assert.assertFalse((boolean)singleKeyIterator.hasNext());
            Assert.assertFalse((boolean)keyRangeIterator.hasNext());
        }
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFindSessionsBackwardsAndEqualKeyRangeFindSessions() {
        this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 1L)), "1".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(2L, 3L)), "2".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyAA, (Window)new SessionWindow(4L, 5L)), "3".getBytes());
        this.cachingStore.put(new Windowed((Object)this.keyB, (Window)new SessionWindow(6L, 7L)), "4".getBytes());
        try (KeyValueIterator singleKeyIterator = this.cachingStore.backwardFindSessions(this.keyAA, 0L, 10L);
             KeyValueIterator keyRangeIterator = this.cachingStore.backwardFindSessions(this.keyAA, this.keyAA, 0L, 10L);){
            Assert.assertEquals((Object)singleKeyIterator.next(), (Object)keyRangeIterator.next());
            Assert.assertEquals((Object)singleKeyIterator.next(), (Object)keyRangeIterator.next());
            Assert.assertFalse((boolean)singleKeyIterator.hasNext());
            Assert.assertFalse((boolean)keyRangeIterator.hasNext());
        }
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        Windowed a1 = new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L));
        this.cachingStore.put(a1, "1".getBytes());
        Assert.assertEquals((long)1L, (long)this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals((long)0L, (long)this.cache.size());
    }

    @Test
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.fetch(this.keyA));
    }

    @Test
    public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.findSessions(this.keyA, 0L, Long.MAX_VALUE));
    }

    @Test
    public void shouldThrowIfTryingToRemoveFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.remove(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L))));
    }

    @Test
    public void shouldThrowIfTryingToPutIntoClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> this.cachingStore.put(new Windowed((Object)this.keyA, (Window)new SessionWindow(0L, 0L)), "1".getBytes()));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFindSessionsNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.cachingStore.findSessions(null, 1L, 2L));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.cachingStore.fetch(null));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnRemoveNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.cachingStore.remove(null));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.cachingStore.put(null, "1".getBytes()));
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWhenBackwardWithNegativeFromKey() {
        Bytes keyFrom = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)-1));
        Bytes keyTo = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)1));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class);
             KeyValueIterator iterator = this.cachingStore.backwardFindSessions(keyFrom, keyTo, 0L, 10L);){
            Assert.assertFalse((boolean)iterator.hasNext());
            List<String> messages = appender.getMessages();
            MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
        }
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        Bytes keyFrom = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)-1));
        Bytes keyTo = Bytes.wrap((byte[])Serdes.Integer().serializer().serialize("", (Object)1));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(CachingSessionStore.class);
             KeyValueIterator iterator = this.cachingStore.findSessions(keyFrom, keyTo, 0L, 10L);){
            Assert.assertFalse((boolean)iterator.hasNext());
            List<String> messages = appender.getMessages();
            MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
        }
    }

    private List<KeyValue<Windowed<Bytes>, byte[]>> addSessionsUntilOverflow(String ... sessionIds) {
        Random random = new Random();
        ArrayList<KeyValue<Windowed<Bytes>, byte[]>> results = new ArrayList<KeyValue<Windowed<Bytes>, byte[]>>();
        while (this.cache.size() == (long)results.size()) {
            String sessionId = sessionIds[random.nextInt(sessionIds.length)];
            this.addSingleSession(sessionId, results);
        }
        return results;
    }

    private void addSingleSession(String sessionId, List<KeyValue<Windowed<Bytes>, byte[]>> allSessions) {
        int timestamp = allSessions.size() * 10;
        Windowed key = new Windowed((Object)Bytes.wrap((byte[])sessionId.getBytes()), (Window)new SessionWindow((long)timestamp, (long)timestamp));
        byte[] value = "1".getBytes();
        this.cachingStore.put(key, value);
        allSessions.add((KeyValue<Windowed<Bytes>, byte[]>)KeyValue.pair((Object)key, (Object)value));
    }

    public static class CacheFlushListenerStub<K, V>
    implements CacheFlushListener<byte[], byte[]> {
        final Deserializer<K> keyDeserializer;
        final Deserializer<V> valueDeserializer;
        final List<KeyValueTimestamp<K, Change<V>>> forwarded = new LinkedList<KeyValueTimestamp<K, Change<V>>>();

        CacheFlushListenerStub(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
            this.keyDeserializer = keyDeserializer;
            this.valueDeserializer = valueDeserializer;
        }

        public void apply(Record<byte[], Change<byte[]>> record) {
            this.forwarded.add(new KeyValueTimestamp<Object, Change>(this.keyDeserializer.deserialize(null, (byte[])record.key()), new Change(this.valueDeserializer.deserialize(null, (byte[])((Change)record.value()).newValue), this.valueDeserializer.deserialize(null, (byte[])((Change)record.value()).oldValue)), record.timestamp()));
        }
    }
}

