/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
import org.apache.kafka.streams.state.internals.MeteredSessionStore;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SessionWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final Merger<String, String> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + "+" + aggTwo;
    private SessionWindowedKStream<String, String> stream;
    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;
    private boolean emitFinal;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({EmitStrategy.StrategyType.ON_WINDOW_UPDATE}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE});
    }

    @Before
    public void before() {
        EmitStrategy emitStrategy = EmitStrategy.StrategyType.forType((EmitStrategy.StrategyType)this.type);
        this.emitFinal = this.type.equals((Object)EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
        this.props.setProperty("__emit.interval.ms.kstreams.windowed.aggregation__", "0");
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.stream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(500L))).emitStrategy(emitStrategy);
    }

    @Test
    public void shouldCountSessionWindowedWithCachingDisabled() {
        this.props.put("statestore.cache.max.bytes", (Object)0);
        this.shouldCountSessionWindowed();
    }

    @Test
    public void shouldCountSessionWindowedWithCachingEnabled() {
        this.shouldCountSessionWindowed();
    }

    private void shouldCountSessionWindowed() {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        this.stream.count().toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
        }
        ArrayList processed = supplier.theCapturedProcessor().processed();
        if (this.emitFinal) {
            Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), 2L, 15L)), processed);
        } else {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 10L)), 1L, 10L), new KeyValueTimestamp<Windowed, Object>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 10L)), null, 10L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), 2L, 15L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), 1L, 600L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new SessionWindow(600L, 600L)), 1L, 600L), new KeyValueTimestamp<Windowed, Object>(new Windowed((Object)"2", (Window)new SessionWindow(600L, 600L)), null, 600L), new KeyValueTimestamp<Windowed, Long>(new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), 2L, 600L)), processed);
        }
    }

    @Test
    public void shouldReduceWindowed() {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        this.stream.reduce(MockReducer.STRING_ADDER).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
        }
        ArrayList processed = supplier.theCapturedProcessor().processed();
        if (this.emitFinal) {
            Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), "1+2", 15L)), processed);
        } else {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 10L)), "1", 10L), new KeyValueTimestamp<Windowed, Object>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 10L)), null, 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), "1+2", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), "3", 600L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"2", (Window)new SessionWindow(600L, 600L)), "1", 600L), new KeyValueTimestamp<Windowed, Object>(new Windowed((Object)"2", (Window)new SessionWindow(600L, 600L)), null, 600L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), "1+2", 600L)), processed);
        }
    }

    @Test
    public void shouldAggregateSessionWindowed() {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
        }
        ArrayList processed = supplier.theCapturedProcessor().processed();
        if (this.emitFinal) {
            Assert.assertEquals(Collections.singletonList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), "0+0+1+2", 15L)), processed);
        } else {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 10L)), "0+1", 10L), new KeyValueTimestamp<Windowed, Object>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 10L)), null, 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), "0+0+1+2", 15L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), "0+3", 600L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"2", (Window)new SessionWindow(600L, 600L)), "0+1", 600L), new KeyValueTimestamp<Windowed, Object>(new Windowed((Object)"2", (Window)new SessionWindow(600L, 600L)), null, 600L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), "0+0+1+2", 600L)), processed);
        }
    }

    @Test
    public void shouldMaterializeCount() {
        this.stream.count(Materialized.as((String)"count-store"));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            SessionStore store = driver.getSessionStore("count-store");
            List data = StreamsTestUtils.toList(store.fetch((Object)"1", (Object)"2"));
            if (!this.emitFinal) {
                MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), (Object)2L))));
            } else {
                MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), (Object)2L))));
            }
        }
    }

    @Test
    public void shouldMaterializeReduced() {
        this.stream.reduce(MockReducer.STRING_ADDER, Materialized.as((String)"reduced"));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            SessionStore sessionStore = driver.getSessionStore("reduced");
            List data = StreamsTestUtils.toList(sessionStore.fetch((Object)"1", (Object)"2"));
            if (!this.emitFinal) {
                MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), (Object)"1+2"), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), (Object)"3"), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), (Object)"1+2"))));
            } else {
                MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), (Object)"3"), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), (Object)"1+2"))));
            }
        }
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as((String)"aggregated").withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            this.processData(driver);
            SessionStore sessionStore = driver.getSessionStore("aggregated");
            List data = StreamsTestUtils.toList(sessionStore.fetch((Object)"1", (Object)"2"));
            if (!this.emitFinal) {
                MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(10L, 15L)), (Object)"0+0+1+2"), KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), (Object)"0+3"), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), (Object)"0+0+1+2"))));
            } else {
                MatcherAssert.assertThat(data, (Matcher)CoreMatchers.equalTo(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"1", (Window)new SessionWindow(600L, 600L)), (Object)"0+3"), KeyValue.pair((Object)new Windowed((Object)"2", (Window)new SessionWindow(599L, 600L)), (Object)"0+0+1+2"))));
            }
        }
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(null, MockAggregator.TOSTRING_ADDER, this.sessionMerger));
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(MockInitializer.STRING_INIT, null, this.sessionMerger));
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null));
    }

    @Test
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.reduce(null));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(null, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(MockInitializer.STRING_INIT, null, this.sessionMerger, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, (Materialized)null));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.reduce(null, Materialized.as((String)"store")));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.reduce(MockReducer.STRING_ADDER, (Materialized)null));
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.reduce(MockReducer.STRING_ADDER, (Named)null));
    }

    @Test
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> this.stream.count((Materialized)null));
    }

    @Test
    public void shouldNotEnableCachingWithEmitFinal() {
        if (!this.emitFinal) {
            return;
        }
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as((String)"aggregated").withValueSerde(Serdes.String()));
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            SessionStore store = driver.getSessionStore("aggregated");
            WrappedStateStore changeLogging = (WrappedStateStore)((WrappedStateStore)store).wrapped();
            MatcherAssert.assertThat((Object)store, (Matcher)IsInstanceOf.instanceOf(MeteredSessionStore.class));
            MatcherAssert.assertThat((Object)changeLogging, (Matcher)IsInstanceOf.instanceOf(ChangeLoggingSessionBytesStore.class));
            MatcherAssert.assertThat((Object)changeLogging.wrapped(), (Matcher)IsInstanceOf.instanceOf(RocksDBTimeOrderedSessionStore.class));
        }
    }

    private void processData(TopologyTestDriver driver) {
        TestInputTopic inputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        inputTopic.pipeInput((Object)"1", (Object)"1", 10L);
        inputTopic.pipeInput((Object)"1", (Object)"2", 15L);
        inputTopic.pipeInput((Object)"1", (Object)"3", 600L);
        inputTopic.pipeInput((Object)"2", (Object)"1", 600L);
        inputTopic.pipeInput((Object)"2", (Object)"2", 599L);
    }
}

