/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.tools.ConsoleConsumer;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamAggregationIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static volatile int testNo = 0;
    private final MockTime mockTime;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String userSessionsStream;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private Initializer<Integer> initializer;
    private Aggregator<String, String, Integer> aggregator;
    private KStream<Integer, String> stream;

    public KStreamAggregationIntegrationTest() {
        this.mockTime = KStreamAggregationIntegrationTest.CLUSTER.time;
        this.userSessionsStream = "user-sessions";
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String applicationId = "kgrouped-stream-test-" + ++testNo;
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        KeyValueMapper mapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.groupedStream = this.stream.groupBy(mapper, Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.reducer = (value1, value2) -> value1 + ":" + value2;
        this.initializer = () -> 0;
        this.aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.reduce(this.reducer, Materialized.as((String)"reduce-by-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), 10);
        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair((Object)"A", (Object)"A"), KeyValue.pair((Object)"A", (Object)"A:A"), KeyValue.pair((Object)"B", (Object)"B"), KeyValue.pair((Object)"B", (Object)"B:B"), KeyValue.pair((Object)"C", (Object)"C"), KeyValue.pair((Object)"C", (Object)"C:C"), KeyValue.pair((Object)"D", (Object)"D"), KeyValue.pair((Object)"D", (Object)"D:D"), KeyValue.pair((Object)"E", (Object)"E"), KeyValue.pair((Object)"E", (Object)"E:E"))));
    }

    private static <K extends Comparable, V extends Comparable> int compare(KeyValue<K, V> o1, KeyValue<K, V> o2) {
        int keyComparison = ((Comparable)o1.key).compareTo(o2.key);
        if (keyComparison == 0) {
            return ((Comparable)o1.value).compareTo(o2.value);
        }
        return keyComparison;
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        String[] allRecords;
        long firstBatchTimestamp = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = this.mockTime.milliseconds();
        this.produceMessages(secondBatchTimestamp);
        this.produceMessages(secondBatchTimestamp);
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).reduce(this.reducer).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)Serdes.String()));
        this.startStreams();
        List windowedOutput = this.receiveMessages((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new StringDeserializer(), String.class, 15);
        String resultFromConsoleConsumer = this.readWindowedKeyedMessagesViaConsoleConsumer((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new StringDeserializer(), String.class, 15, false);
        Comparator<KeyValue> comparator = Comparator.comparing(o -> (String)((Windowed)o.key).key()).thenComparing(o -> (String)o.value);
        Collections.sort(windowedOutput, comparator);
        long firstBatchWindow = firstBatchTimestamp / 500L * 500L;
        long secondBatchWindow = secondBatchTimestamp / 500L * 500L;
        List<KeyValue> expectResult = Arrays.asList(new KeyValue((Object)new Windowed((Object)"A", (Window)new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), (Object)"A"), new KeyValue((Object)new Windowed((Object)"A", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"A"), new KeyValue((Object)new Windowed((Object)"A", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"A:A"), new KeyValue((Object)new Windowed((Object)"B", (Window)new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), (Object)"B"), new KeyValue((Object)new Windowed((Object)"B", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"B"), new KeyValue((Object)new Windowed((Object)"B", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"B:B"), new KeyValue((Object)new Windowed((Object)"C", (Window)new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), (Object)"C"), new KeyValue((Object)new Windowed((Object)"C", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"C"), new KeyValue((Object)new Windowed((Object)"C", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"C:C"), new KeyValue((Object)new Windowed((Object)"D", (Window)new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), (Object)"D"), new KeyValue((Object)new Windowed((Object)"D", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"D"), new KeyValue((Object)new Windowed((Object)"D", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"D:D"), new KeyValue((Object)new Windowed((Object)"E", (Window)new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), (Object)"E"), new KeyValue((Object)new Windowed((Object)"E", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"E"), new KeyValue((Object)new Windowed((Object)"E", (Window)new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), (Object)"E:E"));
        MatcherAssert.assertThat(windowedOutput, (Matcher)Is.is(expectResult));
        HashSet<String> expectResultString = new HashSet<String>(expectResult.size());
        for (KeyValue eachRecord : expectResult) {
            expectResultString.add(eachRecord.toString());
        }
        for (String record : allRecords = resultFromConsoleConsumer.split("\n")) {
            Assert.assertTrue((boolean)expectResultString.contains("KeyValue(" + record + ")"));
        }
    }

    @Test
    public void shouldAggregate() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.aggregate(this.initializer, this.aggregator, Materialized.as((String)"aggregate-by-selected-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new IntegerDeserializer(), 10);
        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair((Object)"A", (Object)1), KeyValue.pair((Object)"A", (Object)2), KeyValue.pair((Object)"B", (Object)1), KeyValue.pair((Object)"B", (Object)2), KeyValue.pair((Object)"C", (Object)1), KeyValue.pair((Object)"C", (Object)2), KeyValue.pair((Object)"D", (Object)1), KeyValue.pair((Object)"D", (Object)2), KeyValue.pair((Object)"E", (Object)1), KeyValue.pair((Object)"E", (Object)2))));
    }

    @Test
    public void shouldAggregateWindowed() throws Exception {
        String[] allRecords;
        long firstTimestamp = this.mockTime.milliseconds();
        this.mockTime.sleep(1000L);
        this.produceMessages(firstTimestamp);
        long secondTimestamp = this.mockTime.milliseconds();
        this.produceMessages(secondTimestamp);
        this.produceMessages(secondTimestamp);
        Serde windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class);
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).aggregate(this.initializer, this.aggregator, Materialized.with(null, (Serde)Serdes.Integer())).toStream().to(this.outputTopic, Produced.with((Serde)windowedSerde, (Serde)Serdes.Integer()));
        this.startStreams();
        List windowedMessages = this.receiveMessagesWithTimestamp((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new IntegerDeserializer(), String.class, 15);
        String resultFromConsoleConsumer = this.readWindowedKeyedMessagesViaConsoleConsumer((Deserializer)new TimeWindowedDeserializer(), (Deserializer)new IntegerDeserializer(), String.class, 15, true);
        Comparator<KeyValue> comparator = Comparator.comparing(o -> (String)((Windowed)o.key).key()).thenComparingInt(o -> (Integer)((KeyValue)o.value).key);
        Collections.sort(windowedMessages, comparator);
        long firstWindow = firstTimestamp / 500L * 500L;
        long secondWindow = secondTimestamp / 500L * 500L;
        List<KeyValue> expectResult = Arrays.asList(new KeyValue((Object)new Windowed((Object)"A", (Window)new TimeWindow(firstWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)firstTimestamp)), new KeyValue((Object)new Windowed((Object)"A", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"A", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)2, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"B", (Window)new TimeWindow(firstWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)firstTimestamp)), new KeyValue((Object)new Windowed((Object)"B", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"B", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)2, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"C", (Window)new TimeWindow(firstWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)firstTimestamp)), new KeyValue((Object)new Windowed((Object)"C", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"C", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)2, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"D", (Window)new TimeWindow(firstWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)firstTimestamp)), new KeyValue((Object)new Windowed((Object)"D", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"D", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)2, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"E", (Window)new TimeWindow(firstWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)firstTimestamp)), new KeyValue((Object)new Windowed((Object)"E", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)1, (Object)secondTimestamp)), new KeyValue((Object)new Windowed((Object)"E", (Window)new TimeWindow(secondWindow, Long.MAX_VALUE)), (Object)KeyValue.pair((Object)2, (Object)secondTimestamp)));
        MatcherAssert.assertThat(windowedMessages, (Matcher)Is.is(expectResult));
        HashSet<String> expectResultString = new HashSet<String>(expectResult.size());
        for (KeyValue eachRecord : expectResult) {
            expectResultString.add("CreateTime:" + ((KeyValue)eachRecord.value).value + ", " + ((Windowed)eachRecord.key).toString() + ", " + ((KeyValue)eachRecord.value).key);
        }
        for (String record : allRecords = resultFromConsoleConsumer.split("\n")) {
            Assert.assertTrue((boolean)expectResultString.contains(record));
        }
    }

    private void shouldCountHelper() throws Exception {
        this.startStreams();
        this.produceMessages(this.mockTime.milliseconds());
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), 10);
        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair((Object)"A", (Object)1L), KeyValue.pair((Object)"A", (Object)2L), KeyValue.pair((Object)"B", (Object)1L), KeyValue.pair((Object)"B", (Object)2L), KeyValue.pair((Object)"C", (Object)1L), KeyValue.pair((Object)"C", (Object)2L), KeyValue.pair((Object)"D", (Object)1L), KeyValue.pair((Object)"D", (Object)2L), KeyValue.pair((Object)"E", (Object)1L), KeyValue.pair((Object)"E", (Object)2L))));
    }

    @Test
    public void shouldCount() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count(Materialized.as((String)"count-by-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.shouldCountHelper();
    }

    @Test
    public void shouldCountWithInternalStore() throws Exception {
        this.produceMessages(this.mockTime.milliseconds());
        this.groupedStream.count().toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.shouldCountHelper();
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long timestamp = this.mockTime.milliseconds();
        this.produceMessages(timestamp);
        this.produceMessages(timestamp);
        this.stream.groupByKey(Grouped.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count().toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.startStreams();
        List results = this.receiveMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), 10);
        Collections.sort(results, KStreamAggregationIntegrationTest::compare);
        long window = timestamp / 500L * 500L;
        MatcherAssert.assertThat(results, (Matcher)Is.is(Arrays.asList(KeyValue.pair((Object)("1@" + window), (Object)1L), KeyValue.pair((Object)("1@" + window), (Object)2L), KeyValue.pair((Object)("2@" + window), (Object)1L), KeyValue.pair((Object)("2@" + window), (Object)2L), KeyValue.pair((Object)("3@" + window), (Object)1L), KeyValue.pair((Object)("3@" + window), (Object)2L), KeyValue.pair((Object)("4@" + window), (Object)1L), KeyValue.pair((Object)("4@" + window), (Object)2L), KeyValue.pair((Object)("5@" + window), (Object)1L), KeyValue.pair((Object)("5@" + window), (Object)2L))));
    }

    @Test
    public void shouldCountSessionWindows() throws Exception {
        long sessionGap = 300000L;
        long t1 = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        List t1Messages = Arrays.asList(new KeyValue((Object)"bob", (Object)"start"), new KeyValue((Object)"penny", (Object)"start"), new KeyValue((Object)"jo", (Object)"pause"), new KeyValue((Object)"emily", (Object)"pause"));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t1);
        long t2 = t1 + 150000L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"emily", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t2);
        long t3 = t1 + 300000L + 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"pause"), new KeyValue((Object)"penny", (Object)"stop")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t3);
        long t4 = t3 + 150000L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"resume"), new KeyValue((Object)"jo", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t4);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(11);
        this.builder.stream(this.userSessionsStream, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(300000L))).count().toStream().transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>(){
            private ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public KeyValue<Object, Object> transform(Windowed<String> key, Long value) {
                results.put(key, KeyValue.pair((Object)value, (Object)this.context.timestamp()));
                latch.countDown();
                return null;
            }

            public void close() {
            }
        }, new String[0]);
        this.startStreams();
        latch.await(30L, TimeUnit.SECONDS);
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t1)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t4, t4))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"emily", (Window)new SessionWindow(t1, t2))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t2)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t3, t4))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t3, t3))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t3)));
    }

    @Test
    public void shouldReduceSessionWindows() throws Exception {
        long sessionGap = 1000L;
        long t1 = this.mockTime.milliseconds();
        List t1Messages = Arrays.asList(new KeyValue((Object)"bob", (Object)"start"), new KeyValue((Object)"penny", (Object)"start"), new KeyValue((Object)"jo", (Object)"pause"), new KeyValue((Object)"emily", (Object)"pause"));
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t1);
        long t2 = t1 + 500L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"emily", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t2);
        long t3 = t1 + 1000L + 1L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"pause"), new KeyValue((Object)"penny", (Object)"stop")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t3);
        long t4 = t3 + 500L;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"resume"), new KeyValue((Object)"jo", (Object)"resume")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), t4);
        HashMap results = new HashMap();
        CountDownLatch latch = new CountDownLatch(11);
        String userSessionsStore = "UserSessionsStore";
        this.builder.stream(this.userSessionsStream, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1000L))).reduce((value1, value2) -> value1 + ":" + value2, Materialized.as((String)"UserSessionsStore")).toStream().foreach((key, value) -> {
            results.put(key, value);
            latch.countDown();
        });
        this.startStreams();
        latch.await(30L, TimeUnit.SECONDS);
        ReadOnlySessionStore sessionStore = (ReadOnlySessionStore)this.kafkaStreams.store("UserSessionsStore", QueryableStoreTypes.sessionStore());
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)"start"));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)"start"));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t1, t1))), (Matcher)CoreMatchers.equalTo((Object)"pause"));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new SessionWindow(t4, t4))), (Matcher)CoreMatchers.equalTo((Object)"resume"));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"emily", (Window)new SessionWindow(t1, t2))), (Matcher)CoreMatchers.equalTo((Object)"pause:resume"));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new SessionWindow(t3, t4))), (Matcher)CoreMatchers.equalTo((Object)"pause:resume"));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new SessionWindow(t3, t3))), (Matcher)CoreMatchers.equalTo((Object)"stop"));
        KeyValueIterator bob = sessionStore.fetch((Object)"bob");
        MatcherAssert.assertThat((Object)bob.next(), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)new Windowed((Object)"bob", (Window)new SessionWindow(t1, t1)), (Object)"start")));
        MatcherAssert.assertThat((Object)bob.next(), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)new Windowed((Object)"bob", (Window)new SessionWindow(t3, t4)), (Object)"pause:resume")));
        Assert.assertFalse((boolean)bob.hasNext());
    }

    @Test
    public void shouldCountUnlimitedWindows() throws Exception {
        long startTime = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS) + 1L;
        long incrementTime = Duration.ofDays(1L).toMillis();
        long t1 = this.mockTime.milliseconds() - TimeUnit.MILLISECONDS.convert(1L, TimeUnit.HOURS);
        List t1Messages = Arrays.asList(new KeyValue((Object)"bob", (Object)"start"), new KeyValue((Object)"penny", (Object)"start"), new KeyValue((Object)"jo", (Object)"pause"), new KeyValue((Object)"emily", (Object)"pause"));
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties());
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, t1Messages, producerConfig, t1);
        long t2 = t1 + incrementTime;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Collections.singletonList(new KeyValue((Object)"emily", (Object)"resume")), producerConfig, t2);
        long t3 = t2 + incrementTime;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"pause"), new KeyValue((Object)"penny", (Object)"stop")), producerConfig, t3);
        long t4 = t3 + incrementTime;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.userSessionsStream, Arrays.asList(new KeyValue((Object)"bob", (Object)"resume"), new KeyValue((Object)"jo", (Object)"resume")), producerConfig, t4);
        final HashMap results = new HashMap();
        final CountDownLatch latch = new CountDownLatch(5);
        this.builder.stream(this.userSessionsStream, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)UnlimitedWindows.of().startOn(Instant.ofEpochMilli(startTime))).count().toStream().transform(() -> new Transformer<Windowed<String>, Long, KeyValue<Object, Object>>(){
            private ProcessorContext context;

            public void init(ProcessorContext context) {
                this.context = context;
            }

            public KeyValue<Object, Object> transform(Windowed<String> key, Long value) {
                results.put(key, KeyValue.pair((Object)value, (Object)this.context.timestamp()));
                latch.countDown();
                return null;
            }

            public void close() {
            }
        }, new String[0]);
        this.startStreams();
        Assert.assertTrue((boolean)latch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"bob", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)2L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"penny", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t3)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"jo", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t4)));
        MatcherAssert.assertThat(results.get(new Windowed((Object)"emily", (Window)new UnlimitedWindow(startTime))), (Matcher)CoreMatchers.equalTo((Object)KeyValue.pair((Object)1L, (Object)t2)));
    }

    private void produceMessages(long timestamp) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)3, (Object)"C"), new KeyValue((Object)4, (Object)"D"), new KeyValue((Object)5, (Object)"E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.outputTopic = "output-" + testNo;
        this.userSessionsStream = this.userSessionsStream + "-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopics(this.userSessionsStream, this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValue<K, V>> receiveMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int numMessages) throws InterruptedException {
        return this.receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
    }

    private <K, V> List<KeyValue<K, V>> receiveMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Class innerClass, int numMessages) throws InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "kgroupedstream-test-" + testNo);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
            consumerProperties.setProperty("default.windowed.key.serde.inner", Serdes.serdeFrom((Class)innerClass).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }

    private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Class innerClass, int numMessages) throws InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "kgroupedstream-test-" + testNo);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        if (keyDeserializer instanceof TimeWindowedDeserializer || keyDeserializer instanceof SessionWindowedDeserializer) {
            consumerProperties.setProperty("default.windowed.key.serde.inner", Serdes.serdeFrom((Class)innerClass).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(consumerProperties, this.outputTopic, numMessages, 60000L);
    }

    private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Class innerClass, int numMessages, boolean printTimestamp) {
        ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
        PrintStream originalStream = System.out;
        try (PrintStream newStream = new PrintStream(newConsole);){
            System.setOut(newStream);
            String keySeparator = ", ";
            String[] args = new String[]{"--bootstrap-server", CLUSTER.bootstrapServers(), "--from-beginning", "--property", "print.key=true", "--property", "print.timestamp=" + printTimestamp, "--topic", this.outputTopic, "--max-messages", String.valueOf(numMessages), "--property", "key.deserializer=" + keyDeserializer.getClass().getName(), "--property", "value.deserializer=" + valueDeserializer.getClass().getName(), "--property", "key.separator=, ", "--property", "key.deserializer.default.windowed.key.serde.inner=" + Serdes.serdeFrom((Class)innerClass).getClass().getName()};
            ConsoleConsumer.messageCount_$eq((int)0);
            ConsoleConsumer.run((ConsoleConsumer.ConsumerConfig)new ConsoleConsumer.ConsumerConfig(args));
            newStream.flush();
            System.setOut(originalStream);
            String string = newConsole.toString();
            return string;
        }
    }
}

