/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class RepartitionOptimizingIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC = "input";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String AGGREGATION_TOPIC = "outputTopic_1";
    private static final String REDUCE_TOPIC = "outputTopic_2";
    private static final String JOINED_TOPIC = "joinedOutputTopic";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int FOUR_REPARTITION_TOPICS = 4;
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private Properties streamsConfiguration;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n      --> KSTREAM-SINK-0000000039\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n      <-- KSTREAM-FILTER-0000000040\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n      <-- KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> KSTREAM-WINDOWED-0000000033\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> KSTREAM-REDUCE-0000000023\n      <-- KSTREAM-FILTER-0000000020\n    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-JOINTHIS-0000000035\n      <-- KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-JOINOTHER-0000000036\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- KSTREAM-SOURCE-0000000041\n    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000034\n    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000033\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- KSTREAM-PEEK-0000000021\n    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n      <-- KSTREAM-MERGE-0000000037\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> KSTREAM-FILTER-0000000031\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> KSTREAM-FILTER-0000000025\n      <-- KSTREAM-FILTER-0000000020\n    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000008\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000016 (stores: [])\n      --> KSTREAM-SINK-0000000015\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000025 (stores: [])\n      --> KSTREAM-SINK-0000000024\n      <-- KSTREAM-PEEK-0000000021\n    Processor: KSTREAM-FILTER-0000000031 (stores: [])\n      --> KSTREAM-SINK-0000000030\n      <-- KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n      <-- KSTREAM-FILTER-0000000009\n    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n      <-- KSTREAM-FILTER-0000000016\n    Sink: KSTREAM-SINK-0000000024 (topic: KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n      <-- KSTREAM-FILTER-0000000025\n    Sink: KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n      <-- KSTREAM-FILTER-0000000031\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n      --> KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- KSTREAM-SOURCE-0000000010\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n      <-- KSTREAM-AGGREGATE-0000000007\n    Source: KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n      --> KSTREAM-WINDOWED-0000000033\n    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-JOINTHIS-0000000035\n      <-- KSTREAM-SOURCE-0000000032\n    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-JOINOTHER-0000000036\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000034\n    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n      --> KSTREAM-MERGE-0000000037\n      <-- KSTREAM-WINDOWED-0000000033\n    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n      <-- KSTREAM-MERGE-0000000037\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n      --> KSTREAM-AGGREGATE-0000000014\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- KSTREAM-SOURCE-0000000017\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n\n  Sub-topology: 3\n    Source: KSTREAM-SOURCE-0000000026 (topics: [KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n      --> KSTREAM-REDUCE-0000000023\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- KSTREAM-SOURCE-0000000026\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n\n";

    public RepartitionOptimizingIntegrationTest() {
        this.mockTime = RepartitionOptimizingIntegrationTest.CLUSTER.time;
    }

    @Before
    public void setUp() throws Exception {
        Properties props = new Properties();
        props.put("cache.max.bytes.buffering", (Object)10240);
        props.put("commit.interval.ms", (Object)5000);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("maybe-optimized-test-app", CLUSTER.bootstrapServers(), Serdes.String().getClass().getName(), Serdes.String().getClass().getName(), props);
        CLUSTER.createTopics(INPUT_TOPIC, COUNT_TOPIC, AGGREGATION_TOPIC, REDUCE_TOPIC, JOINED_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @After
    public void tearDown() throws Exception {
        CLUSTER.deleteAllTopicsAndWait(30000L);
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
        this.runIntegrationTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
        this.runIntegrationTest("none", 4);
    }

    private void runIntegrationTest(String optimizationConfig, int expectedNumberRepartitionTopics) throws Exception {
        Initializer initializer = () -> 0;
        Aggregator aggregator = (k, v, agg) -> agg + v.length();
        Reducer reducer = (v1, v2) -> v1 + ":" + v2;
        ArrayList processorValueCollector = new ArrayList();
        StreamsBuilder builder = new StreamsBuilder();
        KStream sourceStream = builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream mappedStream = sourceStream.map((k, v) -> KeyValue.pair((Object)k.toUpperCase(Locale.getDefault()), (Object)v));
        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault())).process(() -> new SimpleProcessor(processorValueCollector), new String[0]);
        KStream countStream = mappedStream.groupByKey().count(Materialized.with((Serde)Serdes.String(), (Serde)Serdes.Long())).toStream();
        countStream.to(COUNT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        mappedStream.groupByKey().aggregate(initializer, aggregator, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.Integer())).toStream().to(AGGREGATION_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey().reduce(reducer, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String())).toStream().to(REDUCE_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        mappedStream.filter((k, v) -> k.equals("A")).join(countStream, (v1, v2) -> v1 + ":" + v2.toString(), JoinWindows.of((Duration)Duration.ofMillis(5000L)), Joined.with((Serde)Serdes.String(), (Serde)Serdes.String(), (Serde)Serdes.Long())).to(JOINED_TOPIC);
        this.streamsConfiguration.setProperty("topology.optimization", optimizationConfig);
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, this.getKeyValues(), producerConfig, (Time)this.mockTime);
        Properties consumerConfig1 = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        Properties consumerConfig2 = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, IntegerDeserializer.class);
        Properties consumerConfig3 = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        Topology topology = builder.build(this.streamsConfiguration);
        String topologyString = topology.describe().toString();
        if (optimizationConfig.equals("all")) {
            Assert.assertEquals((Object)EXPECTED_OPTIMIZED_TOPOLOGY, (Object)topologyString);
        } else {
            Assert.assertEquals((Object)EXPECTED_UNOPTIMIZED_TOPOLOGY, (Object)topologyString);
        }
        Assert.assertEquals((long)expectedNumberRepartitionTopics, (long)this.getCountOfRepartitionTopicsFound(topologyString));
        KafkaStreams streams = new KafkaStreams(topology, this.streamsConfiguration);
        streams.start();
        List expectedCountKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)3L), KeyValue.pair((Object)"B", (Object)3L), KeyValue.pair((Object)"C", (Object)3L));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues);
        List expectedAggKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)9), KeyValue.pair((Object)"B", (Object)9), KeyValue.pair((Object)"C", (Object)9));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues);
        List expectedReduceKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)"foo:bar:baz"), KeyValue.pair((Object)"B", (Object)"foo:bar:baz"), KeyValue.pair((Object)"C", (Object)"foo:bar:baz"));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedReduceKeyValues);
        List expectedJoinKeyValues = Arrays.asList(KeyValue.pair((Object)"A", (Object)"foo:3"), KeyValue.pair((Object)"A", (Object)"bar:3"), KeyValue.pair((Object)"A", (Object)"baz:3"));
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues);
        List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ");
        MatcherAssert.assertThat((Object)3, (org.hamcrest.Matcher)CoreMatchers.equalTo((Object)processorValueCollector.size()));
        MatcherAssert.assertThat(processorValueCollector, (org.hamcrest.Matcher)CoreMatchers.equalTo(expectedCollectedProcessorValues));
        streams.close(Duration.ofSeconds(5L));
    }

    private int getCountOfRepartitionTopicsFound(String topologyString) {
        Matcher matcher = this.repartitionTopicPattern.matcher(topologyString);
        ArrayList<String> repartitionTopicsFound = new ArrayList<String>();
        while (matcher.find()) {
            repartitionTopicsFound.add(matcher.group());
        }
        return repartitionTopicsFound.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList<KeyValue<String, String>> keyValueList = new ArrayList<KeyValue<String, String>>();
        String[] keys = new String[]{"a", "b", "c"};
        String[] values = new String[]{"foo", "bar", "baz"};
        for (String key : keys) {
            for (String value : values) {
                keyValueList.add((KeyValue<String, String>)KeyValue.pair((Object)key, (Object)value));
            }
        }
        return keyValueList;
    }

    private static class SimpleProcessor
    extends AbstractProcessor<String, String> {
        final List<String> valueList;

        SimpleProcessor(List<String> valueList) {
            this.valueList = valueList;
        }

        public void process(String key, String value) {
            this.valueList.add(value);
        }
    }
}

