/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Map;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
import org.apache.kafka.streams.state.internals.SerdeThatDoesntHandleNull;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ValueAndTimestampSerde;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MeteredTimestampedWindowStoreTest {
    private static final String STORE_NAME = "mocked-store";
    private static final String STORE_TYPE = "scope";
    private static final String CHANGELOG_TOPIC = "changelog-topic";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap((byte[])"key".getBytes());
    private static final long TIMESTAMP = 97L;
    private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make((Object)"value", (long)97L);
    private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "\u0000\u0000\u0000\u0000\u0000\u0000\u0000avalue".getBytes();
    private static final int WINDOW_SIZE_MS = 10;
    private InternalMockProcessorContext context;
    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
    private final WindowStore<Bytes, byte[]> innerStoreMock = (WindowStore)EasyMock.createNiceMock(WindowStore.class);
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
    private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, "scope", (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde((Serde)new SerdeThatDoesntHandleNull()));

    public MeteredTimestampedWindowStoreTest() {
        EasyMock.expect((Object)this.innerStoreMock.name()).andStubReturn((Object)STORE_NAME);
    }

    @Before
    public void setUp() {
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)new MockTime());
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), streamsMetrics, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, streamsMetrics), Time.SYSTEM, this.taskId);
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        WindowStore inner = (WindowStore)EasyMock.mock(WindowStore.class);
        MeteredTimestampedWindowStore outer = new MeteredTimestampedWindowStore(inner, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde((Serde)new SerdeThatDoesntHandleNull()));
        EasyMock.expect((Object)inner.name()).andStubReturn((Object)"store");
        inner.init((ProcessorContext)this.context, (StateStore)outer);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{inner});
        outer.init((ProcessorContext)this.context, (StateStore)outer);
        EasyMock.verify((Object[])new Object[]{inner});
    }

    @Test
    public void shouldDelegateInit() {
        WindowStore inner = (WindowStore)EasyMock.mock(WindowStore.class);
        MeteredTimestampedWindowStore outer = new MeteredTimestampedWindowStore(inner, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde((Serde)new SerdeThatDoesntHandleNull()));
        EasyMock.expect((Object)inner.name()).andStubReturn((Object)"store");
        inner.init((StateStoreContext)this.context, (StateStore)outer);
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{inner});
        outer.init((StateStoreContext)this.context, (StateStore)outer);
        EasyMock.verify((Object[])new Object[]{inner});
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        this.context.addChangelogForStore(STORE_NAME, CHANGELOG_TOPIC);
        this.doShouldPassChangelogTopicNameToStateStoreSerde(CHANGELOG_TOPIC);
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        String defaultChangelogTopicName = ProcessorStateManager.storeChangelogTopic((String)this.context.applicationId(), (String)STORE_NAME, (String)this.taskId.topologyName());
        this.doShouldPassChangelogTopicNameToStateStoreSerde(defaultChangelogTopicName);
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String topic) {
        Serde keySerde = (Serde)EasyMock.niceMock(Serde.class);
        Serializer keySerializer = (Serializer)EasyMock.mock(Serializer.class);
        Serde valueSerde = (Serde)EasyMock.niceMock(Serde.class);
        Deserializer valueDeserializer = (Deserializer)EasyMock.mock(Deserializer.class);
        Serializer valueSerializer = (Serializer)EasyMock.mock(Serializer.class);
        EasyMock.expect((Object)keySerde.serializer()).andStubReturn((Object)keySerializer);
        EasyMock.expect((Object)keySerializer.serialize(topic, (Object)KEY)).andStubReturn((Object)KEY.getBytes());
        EasyMock.expect((Object)valueSerde.deserializer()).andStubReturn((Object)valueDeserializer);
        EasyMock.expect((Object)valueDeserializer.deserialize(topic, VALUE_AND_TIMESTAMP_BYTES)).andStubReturn(VALUE_AND_TIMESTAMP);
        EasyMock.expect((Object)valueSerde.serializer()).andStubReturn((Object)valueSerializer);
        EasyMock.expect((Object)valueSerializer.serialize(topic, VALUE_AND_TIMESTAMP)).andStubReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        EasyMock.expect((Object)this.innerStoreMock.fetch((Object)KEY_BYTES, 97L)).andStubReturn((Object)VALUE_AND_TIMESTAMP_BYTES);
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock, keySerializer, keySerde, valueDeserializer, valueSerializer, valueSerde});
        this.store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), keySerde, valueSerde);
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.fetch((Object)KEY, 97L);
        this.store.put((Object)KEY, VALUE_AND_TIMESTAMP, 97L);
        EasyMock.verify((Object[])new Object[]{keySerializer, valueDeserializer, valueSerializer});
    }

    @Test
    public void shouldCloseUnderlyingStore() {
        this.innerStoreMock.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init((StateStoreContext)this.context, this.store);
        this.store.close();
        EasyMock.verify((Object[])new Object[]{this.innerStoreMock});
    }

    @Test
    public void shouldNotExceptionIfFetchReturnsNull() {
        EasyMock.expect((Object)this.innerStoreMock.fetch((Object)Bytes.wrap((byte[])"a".getBytes()), 0L)).andReturn(null);
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        this.store.init((StateStoreContext)this.context, this.store);
        Assert.assertNull((Object)this.store.fetch((Object)"a", 0L));
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
        EasyMock.expect((Object)this.innerStoreMock.name()).andStubReturn((Object)STORE_NAME);
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        MeteredTimestampedWindowStore store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), null, null);
        store.init((StateStoreContext)this.context, this.innerStoreMock);
        try {
            store.put((Object)KEY, (Object)ValueAndTimestamp.make((Object)42L, (long)60000L), 60000L);
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from processor context.");
            }
            throw exception;
        }
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
        EasyMock.expect((Object)this.innerStoreMock.name()).andStubReturn((Object)STORE_NAME);
        EasyMock.replay((Object[])new Object[]{this.innerStoreMock});
        MeteredTimestampedWindowStore store = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, (Time)new MockTime(), Serdes.String(), (Serde)new ValueAndTimestampSerde(Serdes.Long()));
        store.init((StateStoreContext)this.context, this.innerStoreMock);
        try {
            store.put((Object)KEY, (Object)ValueAndTimestamp.make((Object)42L, (long)60000L), 60000L);
        }
        catch (StreamsException exception) {
            if (exception.getCause() instanceof ClassCastException) {
                Assert.fail((String)"Serdes are not correctly set from constructor parameters.");
            }
            throw exception;
        }
    }
}

