/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Filter;
import org.rocksdb.Options;
import org.rocksdb.TableFormatConfig;

public class RocksDBStoreTest {
    private static boolean enableBloomFilters = false;
    static final String DB_NAME = "db-name";
    private File dir;
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();
    InternalMockProcessorContext context;
    RocksDBStore rocksDBStore;

    @Before
    public void setUp() {
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        this.rocksDBStore = this.getRocksDBStore();
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
    }

    RocksDBStore getRocksDBStore() {
        return new RocksDBStore(DB_NAME);
    }

    @After
    public void tearDown() {
        this.rocksDBStore.close();
    }

    @Test
    public void shouldRespectBulkloadOptionsDuringInit() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        StateRestoreListener restoreListener = this.context.getRestoreListener(this.rocksDBStore.name());
        restoreListener.onRestoreStart(null, this.rocksDBStore.name(), 0L, 0L);
        Assert.assertThat((Object)this.rocksDBStore.getOptions().level0FileNumCompactionTrigger(), (Matcher)CoreMatchers.equalTo((Object)0x40000000));
        Assert.assertThat((Object)this.rocksDBStore.getOptions().level0SlowdownWritesTrigger(), (Matcher)CoreMatchers.equalTo((Object)0x40000000));
        Assert.assertThat((Object)this.rocksDBStore.getOptions().level0StopWritesTrigger(), (Matcher)CoreMatchers.equalTo((Object)0x40000000));
        restoreListener.onRestoreEnd(null, this.rocksDBStore.name(), 0L);
        Assert.assertThat((Object)this.rocksDBStore.getOptions().level0FileNumCompactionTrigger(), (Matcher)CoreMatchers.equalTo((Object)10));
        Assert.assertThat((Object)this.rocksDBStore.getOptions().level0SlowdownWritesTrigger(), (Matcher)CoreMatchers.equalTo((Object)20));
        Assert.assertThat((Object)this.rocksDBStore.getOptions().level0StopWritesTrigger(), (Matcher)CoreMatchers.equalTo((Object)36));
    }

    @Test
    public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        String message = "how can a 4 ounce bird carry a 2lb coconut";
        int intKey = 1;
        for (int i = 0; i < 2000000; ++i) {
            this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize(null, (Object)("theKeyIs" + intKey++))), this.stringSerializer.serialize(null, (Object)"how can a 4 ounce bird carry a 2lb coconut"));
        }
        ArrayList<KeyValue<byte[], byte[]>> restoreBytes = new ArrayList<KeyValue<byte[], byte[]>>();
        byte[] restoredKey = "restoredKey".getBytes(StandardCharsets.UTF_8);
        byte[] restoredValue = "restoredValue".getBytes(StandardCharsets.UTF_8);
        restoreBytes.add(KeyValue.pair((Object)restoredKey, (Object)restoredValue));
        this.context.restore(DB_NAME, restoreBytes);
        Assert.assertThat((Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"restoredKey")))), (Matcher)CoreMatchers.equalTo((Object)"restoredValue"));
    }

    @Test
    public void shouldCallRocksDbConfigSetter() {
        MockRocksDbConfigSetter.called = false;
        this.rocksDBStore.openDB((ProcessorContext)this.context);
        Assert.assertTrue((boolean)MockRocksDbConfigSetter.called);
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
        File tmpDir = TestUtils.tempDirectory();
        InternalMockProcessorContext tmpContext = new InternalMockProcessorContext(tmpDir, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()));
        Assert.assertTrue((boolean)tmpDir.setReadOnly());
        try {
            this.rocksDBStore.openDB((ProcessorContext)tmpContext);
            Assert.fail((String)"Should have thrown ProcessorStateException");
        }
        catch (ProcessorStateException processorStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldPutAll() {
        ArrayList<KeyValue> entries = new ArrayList<KeyValue>();
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"1")), (Object)this.stringSerializer.serialize(null, (Object)"a")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"2")), (Object)this.stringSerializer.serialize(null, (Object)"b")));
        entries.add(new KeyValue((Object)new Bytes(this.stringSerializer.serialize(null, (Object)"3")), (Object)this.stringSerializer.serialize(null, (Object)"c")));
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        this.rocksDBStore.putAll(entries);
        this.rocksDBStore.flush();
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
    }

    @Test
    public void shouldTogglePrepareForBulkloadSetting() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = (RocksDBStore.RocksDBBatchingRestoreCallback)this.rocksDBStore.batchingStateRestoreCallback;
        restoreListener.onRestoreStart(null, null, 0L, 0L);
        Assert.assertTrue((String)"Should have set bulk loading to true", (boolean)this.rocksDBStore.isPrepareForBulkload());
        restoreListener.onRestoreEnd(null, null, 0L);
        Assert.assertFalse((String)"Should have set bulk loading to false", (boolean)this.rocksDBStore.isPrepareForBulkload());
    }

    @Test
    public void shouldTogglePrepareForBulkloadSettingWhenPrexistingSstFiles() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        RocksDBStore.RocksDBBatchingRestoreCallback restoreListener = (RocksDBStore.RocksDBBatchingRestoreCallback)this.rocksDBStore.batchingStateRestoreCallback;
        restoreListener.onRestoreStart(null, null, 0L, 0L);
        Assert.assertTrue((String)"Should have not set bulk loading to true", (boolean)this.rocksDBStore.isPrepareForBulkload());
        restoreListener.onRestoreEnd(null, null, 0L);
        Assert.assertFalse((String)"Should have set bulk loading to false", (boolean)this.rocksDBStore.isPrepareForBulkload());
    }

    @Test
    public void shouldRestoreAll() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
    }

    @Test
    public void shouldPutOnlyIfAbsentValue() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        Bytes keyBytes = new Bytes(this.stringSerializer.serialize(null, (Object)"one"));
        byte[] valueBytes = this.stringSerializer.serialize(null, (Object)"A");
        byte[] valueBytesUpdate = this.stringSerializer.serialize(null, (Object)"B");
        this.rocksDBStore.putIfAbsent(keyBytes, valueBytes);
        this.rocksDBStore.putIfAbsent(keyBytes, valueBytesUpdate);
        String retrievedValue = (String)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(keyBytes));
        Assert.assertEquals((Object)"A", (Object)retrievedValue);
    }

    @Test
    public void shouldHandleDeletesOnRestoreAll() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), null));
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        KeyValueIterator iterator = this.rocksDBStore.all();
        HashSet<Object> keys = new HashSet<Object>();
        while (iterator.hasNext()) {
            keys.add(this.stringDeserializer.deserialize(null, ((Bytes)((KeyValue)iterator.next()).key).get()));
        }
        Assert.assertThat(keys, (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"2", "3"})));
    }

    @Test
    public void shouldHandleDeletesAndPutbackOnRestoreAll() {
        ArrayList<KeyValue<byte[], byte[]>> entries = new ArrayList<KeyValue<byte[], byte[]>>();
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"a".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), null));
        entries.add(new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"restored".getBytes(StandardCharsets.UTF_8)));
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        KeyValueIterator iterator = this.rocksDBStore.all();
        HashSet<Object> keys = new HashSet<Object>();
        while (iterator.hasNext()) {
            keys.add(this.stringDeserializer.deserialize(null, ((Bytes)((KeyValue)iterator.next()).key).get()));
        }
        Assert.assertThat(keys, (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"1", "2", "3"})));
        Assert.assertEquals((Object)"restored", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
    }

    @Test
    public void shouldRestoreThenDeleteOnRestoreAll() {
        List<KeyValue<byte[], byte[]>> entries = this.getKeyValueEntries();
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), entries);
        Assert.assertEquals((Object)"a", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"1")))));
        Assert.assertEquals((Object)"b", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"2")))));
        Assert.assertEquals((Object)"c", (Object)this.stringDeserializer.deserialize(null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize(null, (Object)"3")))));
        entries.clear();
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        entries.add((KeyValue<byte[], byte[]>)new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), null));
        this.context.restore(this.rocksDBStore.name(), entries);
        KeyValueIterator iterator = this.rocksDBStore.all();
        HashSet<Object> keys = new HashSet<Object>();
        while (iterator.hasNext()) {
            keys.add(this.stringDeserializer.deserialize(null, ((Bytes)((KeyValue)iterator.next()).key).get()));
        }
        Assert.assertThat(keys, (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"2", "3"})));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullPut() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        try {
            this.rocksDBStore.put(null, this.stringSerializer.serialize(null, (Object)"someVal"));
            Assert.fail((String)"Should have thrown NullPointerException on null put()");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullPutAll() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        try {
            this.rocksDBStore.put(null, this.stringSerializer.serialize(null, (Object)"someVal"));
            Assert.fail((String)"Should have thrown NullPointerException on null put()");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullGet() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        try {
            this.rocksDBStore.get(null);
            Assert.fail((String)"Should have thrown NullPointerException on null get()");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnDelete() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        try {
            this.rocksDBStore.delete(null);
            Assert.fail((String)"Should have thrown NullPointerException on deleting null key");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnRange() {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        try {
            this.rocksDBStore.range(null, new Bytes(this.stringSerializer.serialize(null, (Object)"2")));
            Assert.fail((String)"Should have thrown NullPointerException on deleting null key");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        Utils.delete((File)this.dir);
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize(null, (Object)"anyKey")), this.stringSerializer.serialize(null, (Object)"anyValue"));
        this.rocksDBStore.flush();
    }

    @Test
    public void shouldHandleToggleOfEnablingBloomFilters() {
        byte[] valBytes;
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("rocksdb.config.setter", TestingBloomFilterRocksDBConfigSetter.class);
        this.rocksDBStore = this.getRocksDBStore();
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig((Map)props));
        enableBloomFilters = false;
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        ArrayList<String> expectedValues = new ArrayList<String>();
        expectedValues.add("a");
        expectedValues.add("b");
        expectedValues.add("c");
        List<KeyValue<byte[], byte[]>> keyValues = this.getKeyValueEntries();
        for (KeyValue<byte[], byte[]> keyValue : keyValues) {
            this.rocksDBStore.put(new Bytes((byte[])keyValue.key), (byte[])keyValue.value);
        }
        int expectedIndex = 0;
        for (KeyValue<byte[], byte[]> keyValue : keyValues) {
            valBytes = this.rocksDBStore.get(new Bytes((byte[])keyValue.key));
            Assert.assertThat((Object)new String(valBytes, StandardCharsets.UTF_8), (Matcher)CoreMatchers.is(expectedValues.get(expectedIndex++)));
        }
        Assert.assertFalse((boolean)TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
        this.rocksDBStore.close();
        expectedIndex = 0;
        enableBloomFilters = true;
        this.rocksDBStore.init((ProcessorContext)this.context, (StateStore)this.rocksDBStore);
        for (KeyValue<byte[], byte[]> keyValue : keyValues) {
            valBytes = this.rocksDBStore.get(new Bytes((byte[])keyValue.key));
            Assert.assertThat((Object)new String(valBytes, StandardCharsets.UTF_8), (Matcher)CoreMatchers.is(expectedValues.get(expectedIndex++)));
        }
        Assert.assertTrue((boolean)TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
    }

    private List<KeyValue<byte[], byte[]>> getKeyValueEntries() {
        ArrayList<KeyValue<byte[], byte[]>> entries = new ArrayList<KeyValue<byte[], byte[]>>();
        entries.add(new KeyValue((Object)"1".getBytes(StandardCharsets.UTF_8), (Object)"a".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"2".getBytes(StandardCharsets.UTF_8), (Object)"b".getBytes(StandardCharsets.UTF_8)));
        entries.add(new KeyValue((Object)"3".getBytes(StandardCharsets.UTF_8), (Object)"c".getBytes(StandardCharsets.UTF_8)));
        return entries;
    }

    public static class TestingBloomFilterRocksDBConfigSetter
    implements RocksDBConfigSetter {
        static boolean bloomFiltersSet;

        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
            tableConfig.setBlockCacheSize(0x3200000L);
            tableConfig.setBlockSize(4096L);
            if (enableBloomFilters) {
                tableConfig.setFilter((Filter)new BloomFilter());
                options.optimizeFiltersForHits();
                bloomFiltersSet = true;
            } else {
                options.setOptimizeFiltersForHits(false);
                bloomFiltersSet = false;
            }
            options.setTableFormatConfig((TableFormatConfig)tableConfig);
        }
    }

    public static class MockRocksDbConfigSetter
    implements RocksDBConfigSetter {
        static boolean called;

        public void setConfig(String storeName, Options options, Map<String, Object> configs) {
            called = true;
            options.setLevel0FileNumCompactionTrigger(10);
        }
    }
}

