/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class StreamsBuilderTest {
    private static final String STREAM_TOPIC = "stream-topic";
    private static final String STREAM_TOPIC_TWO = "stream-topic-two";
    private static final String TABLE_TOPIC = "table-topic";
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void shouldNotThrowNullPointerIfOptimizationsNotSpecified() {
        Properties properties = new Properties();
        StreamsBuilder builder = new StreamsBuilder();
        builder.build(properties);
    }

    @Test
    public void shouldAllowJoinUnmaterializedFilteredKTable() {
        KTable filteredKTable = this.builder.table(TABLE_TOPIC).filter(MockPredicate.allGoodPredicate());
        this.builder.stream(STREAM_TOPIC).join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
        Assert.assertTrue((boolean)topology.processorConnectedStateStores("KTABLE-FILTER-0000000003").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedFilteredKTable() {
        KTable filteredKTable = this.builder.table(TABLE_TOPIC).filter(MockPredicate.allGoodPredicate(), Materialized.as((String)"store"));
        this.builder.stream(STREAM_TOPIC).join(filteredKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-FILTER-0000000003"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedMapValuedKTable() {
        KTable mappedKTable = this.builder.table(TABLE_TOPIC).mapValues(MockMapper.noOpValueMapper());
        this.builder.stream(STREAM_TOPIC).join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
        Assert.assertTrue((boolean)topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedMapValuedKTable() {
        KTable mappedKTable = this.builder.table(TABLE_TOPIC).mapValues(MockMapper.noOpValueMapper(), Materialized.as((String)"store"));
        this.builder.stream(STREAM_TOPIC).join(mappedKTable, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000005"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-MAPVALUES-0000000003"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinUnmaterializedJoinedKTable() {
        KTable table1 = this.builder.table("table-topic1");
        KTable table2 = this.builder.table("table-topic2");
        this.builder.stream(STREAM_TOPIC).join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{((StateStore)topology.stateStores().get(0)).name(), ((StateStore)topology.stateStores().get(1)).name()})));
        Assert.assertTrue((boolean)topology.processorConnectedStateStores("KTABLE-MERGE-0000000007").isEmpty());
    }

    @Test
    public void shouldAllowJoinMaterializedJoinedKTable() {
        KTable table1 = this.builder.table("table-topic1");
        KTable table2 = this.builder.table("table-topic2");
        this.builder.stream(STREAM_TOPIC).join(table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Materialized.as((String)"store")), MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)3));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000010"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-MERGE-0000000007"), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
    }

    @Test
    public void shouldAllowJoinMaterializedSourceKTable() {
        KTable table = this.builder.table(TABLE_TOPIC);
        this.builder.stream(STREAM_TOPIC).join(table, MockValueJoiner.TOSTRING_JOINER);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)1));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KTABLE-SOURCE-0000000002"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
        MatcherAssert.assertThat((Object)topology.processorConnectedStateStores("KSTREAM-JOIN-0000000004"), (Matcher)CoreMatchers.equalTo(Collections.singleton(((StateStore)topology.stateStores().get(0)).name())));
    }

    @Test
    public void shouldProcessingFromSinkTopic() {
        KStream source = this.builder.stream("topic-source");
        source.to("topic-sink");
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        source.process(processorSupplier, new String[0]);
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("topic-source", (Object)"A", (Object)"aa"));
        }
        Assert.assertEquals(Collections.singletonList("A:aa (ts: 0)"), processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldProcessViaThroughTopic() {
        KStream source = this.builder.stream("topic-source");
        KStream through = source.through("topic-sink");
        MockProcessorSupplier sourceProcessorSupplier = new MockProcessorSupplier();
        source.process(sourceProcessorSupplier, new String[0]);
        MockProcessorSupplier throughProcessorSupplier = new MockProcessorSupplier();
        through.process(throughProcessorSupplier, new String[0]);
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("topic-source", (Object)"A", (Object)"aa"));
        }
        Assert.assertEquals(Collections.singletonList("A:aa (ts: 0)"), sourceProcessorSupplier.theCapturedProcessor().processed);
        Assert.assertEquals(Collections.singletonList("A:aa (ts: 0)"), throughProcessorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldMergeStreams() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream("topic-1");
        KStream source2 = this.builder.stream("topic-2");
        KStream merged = source1.merge(source2);
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer(), 0L);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("topic-1", (Object)"A", (Object)"aa"));
            driver.pipeInput(recordFactory.create("topic-2", (Object)"B", (Object)"bb"));
            driver.pipeInput(recordFactory.create("topic-2", (Object)"C", (Object)"cc"));
            driver.pipeInput(recordFactory.create("topic-1", (Object)"D", (Object)"dd"));
        }
        Assert.assertEquals(Arrays.asList("A:aa (ts: 0)", "B:bb (ts: 0)", "C:cc (ts: 0)", "D:dd (ts: 0)"), processorSupplier.theCapturedProcessor().processed);
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeTable() {
        HashMap results = new HashMap();
        String topic = "topic";
        ForeachAction action = results::put;
        this.builder.table("topic", Materialized.as((String)"store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String())).toStream().foreach(action);
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new LongSerializer(), (Serializer)new StringSerializer());
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("topic", (Object)1L, (Object)"value1"));
            driver.pipeInput(recordFactory.create("topic", (Object)2L, (Object)"value2"));
            KeyValueStore store = driver.getKeyValueStore("store");
            MatcherAssert.assertThat((Object)store.get((Object)1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
            MatcherAssert.assertThat((Object)store.get((Object)2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
            MatcherAssert.assertThat(results.get(1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
            MatcherAssert.assertThat(results.get(2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
        }
    }

    @Test
    public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() {
        String topic = "topic";
        this.builder.globalTable("topic", Materialized.as((String)"store").withKeySerde(Serdes.Long()).withValueSerde(Serdes.String()));
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new LongSerializer(), (Serializer)new StringSerializer());
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            driver.pipeInput(recordFactory.create("topic", (Object)1L, (Object)"value1"));
            driver.pipeInput(recordFactory.create("topic", (Object)2L, (Object)"value2"));
            KeyValueStore store = driver.getKeyValueStore("store");
            MatcherAssert.assertThat((Object)store.get((Object)1L), (Matcher)CoreMatchers.equalTo((Object)"value1"));
            MatcherAssert.assertThat((Object)store.get((Object)2L), (Matcher)CoreMatchers.equalTo((Object)"value2"));
        }
    }

    @Test
    public void shouldNotMaterializeStoresIfNotRequired() {
        String topic = "topic";
        this.builder.table("topic", Materialized.with((Serde)Serdes.Long(), (Serde)Serdes.String()));
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        MatcherAssert.assertThat((Object)topology.stateStores().size(), (Matcher)CoreMatchers.equalTo((Object)0));
    }

    @Test
    public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
        String topic = "topic";
        this.builder.table("topic", Materialized.as((String)"store"));
        Properties props = StreamsTestUtils.getStreamsConfig();
        props.put("topology.optimization", "all");
        Topology topology = this.builder.build(props);
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)props));
        MatcherAssert.assertThat((Object)internalTopologyBuilder.build().storeToChangelogTopic(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap("store", "topic")));
        MatcherAssert.assertThat(internalTopologyBuilder.getStateStores().keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.getStateStores().get("store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)false));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.TopicsInfo)internalTopologyBuilder.topicGroups().get((Object)Integer.valueOf((int)0))).stateChangelogTopics.isEmpty(), (Matcher)CoreMatchers.equalTo((Object)true));
    }

    @Test
    public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
        String topic = "topic";
        this.builder.table("topic", Materialized.as((String)"store"));
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.builder.build());
        internalTopologyBuilder.setApplicationId("appId");
        MatcherAssert.assertThat((Object)internalTopologyBuilder.build().storeToChangelogTopic(), (Matcher)CoreMatchers.equalTo(Collections.singletonMap("store", "appId-store-changelog")));
        MatcherAssert.assertThat(internalTopologyBuilder.getStateStores().keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("store")));
        MatcherAssert.assertThat((Object)((InternalTopologyBuilder.StateStoreFactory)internalTopologyBuilder.getStateStores().get("store")).loggingEnabled(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat(((InternalTopologyBuilder.TopicsInfo)internalTopologyBuilder.topicGroups().get((Object)Integer.valueOf((int)0))).stateChangelogTopics.keySet(), (Matcher)CoreMatchers.equalTo(Collections.singleton("appId-store-changelog")));
    }

    @Test(expected=TopologyException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() {
        this.builder.stream(Collections.emptyList());
        this.builder.build();
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() {
        this.builder.stream(Arrays.asList(null, null));
        this.builder.build();
    }

    @Test
    public void shouldUseSpecifiedNameForStreamSourceProcessor() {
        String expected = "source-node";
        this.builder.stream(STREAM_TOPIC, Consumed.as((String)"source-node"));
        this.builder.stream(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        this.assertSpecifiedNameForOperation(topology, "source-node", "KSTREAM-SOURCE-0000000001");
    }

    @Test
    public void shouldUseSpecifiedNameForTableSourceProcessor() {
        String expected = "source-node";
        this.builder.table(STREAM_TOPIC, Consumed.as((String)"source-node"));
        this.builder.table(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        this.assertSpecifiedNameForOperation(topology, "source-node", "source-node-table-source", "KSTREAM-SOURCE-0000000004", "KTABLE-SOURCE-0000000005");
    }

    @Test
    public void shouldUseSpecifiedNameForGlobalTableSourceProcessor() {
        String expected = "source-processor";
        this.builder.globalTable(STREAM_TOPIC, Consumed.as((String)"source-processor"));
        this.builder.globalTable(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        this.assertSpecifiedNameForStateStore(topology.globalStateStores(), "stream-topic-STATE-STORE-0000000000", "stream-topic-two-STATE-STORE-0000000003");
    }

    @Test
    public void shouldUseSpecifiedNameForSinkProcessor() {
        String expected = "sink-processor";
        KStream stream = this.builder.stream(STREAM_TOPIC);
        stream.to(STREAM_TOPIC_TWO, Produced.as((String)"sink-processor"));
        stream.to(STREAM_TOPIC_TWO);
        this.builder.build();
        ProcessorTopology topology = this.builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig((Map)this.props)).build();
        this.assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", "sink-processor", "KSTREAM-SINK-0000000002");
    }

    private void assertSpecifiedNameForOperation(ProcessorTopology topology, String ... expected) {
        List processors = topology.processors();
        Assert.assertEquals((String)"Invalid number of expected processors", (long)expected.length, (long)processors.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)((ProcessorNode)processors.get(i)).name());
        }
    }

    private void assertSpecifiedNameForStateStore(List<StateStore> stores, String ... expected) {
        Assert.assertEquals((String)"Invalid number of expected state stores", (long)expected.length, (long)stores.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)stores.get(i).name());
        }
    }
}

