/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

public class NamedTopologyIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String INPUT_STREAM_1 = "input-stream-1";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private static final String INPUT_STREAM_3 = "input-stream-3";
    private static final String OUTPUT_STREAM_1 = "output-stream-1";
    private static final String OUTPUT_STREAM_2 = "output-stream-2";
    private static final String OUTPUT_STREAM_3 = "output-stream-3";
    private static final String SUM_OUTPUT = "sum";
    private static final String COUNT_OUTPUT = "count";
    private static final String DELAYED_INPUT_STREAM_1 = "delayed-input-stream-1";
    private static final String DELAYED_INPUT_STREAM_2 = "delayed-input-stream-2";
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store"));
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> ROCKSDB_STORE = Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"store"));
    private static Properties producerConfig;
    private static Properties consumerConfig;
    @Rule
    public final TestName testName = new TestName();
    private String appId;
    private String changelog1;
    private String changelog2;
    private String changelog3;
    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA;
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA;
    private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA;
    private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
    private final NamedTopologyStreamsBuilder topology1Builder = new NamedTopologyStreamsBuilder("topology-1");
    private final NamedTopologyStreamsBuilder topology2Builder = new NamedTopologyStreamsBuilder("topology-2");
    private final NamedTopologyStreamsBuilder topology3Builder = new NamedTopologyStreamsBuilder("topology-3");
    private final NamedTopologyStreamsBuilder topology1Builder2 = new NamedTopologyStreamsBuilder("topology-1");
    private final NamedTopologyStreamsBuilder topology2Builder2 = new NamedTopologyStreamsBuilder("topology-2");
    private final NamedTopologyStreamsBuilder topology3Builder2 = new NamedTopologyStreamsBuilder("topology-3");
    private Properties props;
    private Properties props2;
    private KafkaStreamsNamedTopologyWrapper streams;
    private KafkaStreamsNamedTopologyWrapper streams2;

    @BeforeClass
    public static void initializeClusterAndStandardTopics() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_3, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
        producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class);
        consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        NamedTopologyIntegrationTest.produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        NamedTopologyIntegrationTest.produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        NamedTopologyIntegrationTest.produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    private Properties configProps() {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)this.appId).getPath());
        streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Long().getClass());
        streamsConfiguration.put("num.stream.threads", (Object)2);
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        streamsConfiguration.put("session.timeout.ms", (Object)10000);
        return streamsConfiguration;
    }

    @Before
    public void setup() throws Exception {
        this.appId = IntegrationTestUtils.safeUniqueTestName(NamedTopologyIntegrationTest.class, this.testName);
        this.changelog1 = this.appId + "-topology-1-store-changelog";
        this.changelog2 = this.appId + "-topology-2-store-changelog";
        this.changelog3 = this.appId + "-topology-3-store-changelog";
        this.props = this.configProps();
        this.props2 = this.configProps();
        CLUSTER.createTopic(OUTPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_3, 2, 1);
    }

    @After
    public void shutdown() throws Exception {
        if (this.streams != null) {
            this.streams.close(Duration.ofSeconds(30L));
        }
        if (this.streams2 != null) {
            this.streams2.close(Duration.ofSeconds(30L));
        }
        CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
    }

    @Test
    public void shouldPrefixAllInternalTopicNamesWithNamedTopology() throws Exception {
        String countTopologyName = "count-topology";
        String fkjTopologyName = "FKJ-topology";
        NamedTopologyStreamsBuilder countBuilder = new NamedTopologyStreamsBuilder("count-topology");
        countBuilder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count();
        NamedTopologyStreamsBuilder fkjBuilder = new NamedTopologyStreamsBuilder("FKJ-topology");
        UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
        KTable left = fkjBuilder.table(INPUT_STREAM_2, Consumed.with(serdeScope.decorateSerde(Serdes.String(), this.props, true), serdeScope.decorateSerde(Serdes.Long(), this.props, false)));
        KTable right = fkjBuilder.table(INPUT_STREAM_3, Consumed.with(serdeScope.decorateSerde(Serdes.String(), this.props, true), serdeScope.decorateSerde(Serdes.Long(), this.props, false)));
        left.join(right, Object::toString, (value1, value2) -> String.valueOf(value1 + value2), Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), this.props, false)));
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(fkjBuilder, countBuilder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        String countTopicPrefix = this.appId + "-" + "count-topology";
        String fkjTopicPrefix = this.appId + "-" + "FKJ-topology";
        Set internalTopics = CLUSTER.getAllTopicsInCluster().stream().filter(t -> t.contains(this.appId)).filter(t -> t.endsWith("-repartition") || t.endsWith("-changelog") || t.endsWith("-topic")).collect(Collectors.toSet());
        MatcherAssert.assertThat(internalTopics, (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new String[]{countTopicPrefix + "-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition", countTopicPrefix + "-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog", fkjTopicPrefix + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic", fkjTopicPrefix + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic", fkjTopicPrefix + "-KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010-changelog", fkjTopicPrefix + "-" + INPUT_STREAM_2 + "-STATE-STORE-0000000000-changelog", fkjTopicPrefix + "-" + INPUT_STREAM_3 + "-STATE-STORE-0000000003-changelog"})));
    }

    @Test
    public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).selectKey((k, v) -> k).groupByKey().count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder.buildNamedTopology(this.props), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        List results = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
        MatcherAssert.assertThat(results, (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
        MatcherAssert.assertThat((Object)allTopics.contains(this.appId + "-topology-1-store-changelog"), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)allTopics.contains(this.appId + "-topology-1-store-repartition"), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersistentStateStores() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(this.topology1Builder, this.topology2Builder, this.topology3Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat((Object)CLUSTER.getAllTopicsInCluster().containsAll(Arrays.asList(this.changelog1, this.changelog2, this.changelog3)), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldAddNamedTopologyToUnstartedApplicationWithEmptyInitialTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        this.streams.addNamedTopology(this.topology1Builder.buildNamedTopology(this.props));
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        this.streams.start();
        this.streams.addNamedTopology(this.topology1Builder.buildNamedTopology(this.props));
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.streams), KafkaStreams.State.RUNNING, Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder.buildNamedTopology(this.props), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        this.streams.addNamedTopology(this.topology2Builder.buildNamedTopology(this.props));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(this.topology1Builder, this.topology2Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        this.streams.addNamedTopology(this.topology3Builder.buildNamedTopology(this.props));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology2Builder2.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder.buildNamedTopology(this.props), this.props, this.clientSupplier);
        this.streams2 = new KafkaStreamsNamedTopologyWrapper(this.topology1Builder2.buildNamedTopology(this.props2), this.props2, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Arrays.asList(this.streams, this.streams2), Duration.ofSeconds(15L));
        this.streams.addNamedTopology(this.topology2Builder.buildNamedTopology(this.props));
        this.streams2.addNamedTopology(this.topology2Builder2.buildNamedTopology(this.props2));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Ignore
    @Test
    public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws Exception {
        this.topology1Builder.stream(DELAYED_INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(DELAYED_INPUT_STREAM_2).map((k, v) -> {
            throw new IllegalStateException("Should not process any records for removed topology-2");
        });
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(this.topology1Builder, this.topology2Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        this.streams.removeNamedTopology("topology-2");
        NamedTopologyIntegrationTest.produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
        NamedTopologyIntegrationTest.produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Ignore
    @Test
    public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        KStream inputStream1 = this.topology1Builder.stream(INPUT_STREAM_1);
        inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT);
        inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(this.topology1Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        this.streams.removeNamedTopology("topology-1");
        this.streams.cleanUpNamedTopology("topology-1");
        KStream inputStream2 = this.topology1Builder2.stream(DELAYED_INPUT_STREAM_1);
        inputStream2.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
        inputStream2.groupByKey().count().toStream().to(COUNT_OUTPUT);
        NamedTopologyIntegrationTest.produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
        this.streams.addNamedTopology(this.topology1Builder2.buildNamedTopology(this.props));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), (Matcher)CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
    }

    @Test
    public void shouldAllowPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(Pattern.compile(INPUT_STREAM_1)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(this.topology1Builder, this.topology2Builder, this.topology3Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_3);
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.buildNamedTopologies(this.topology1Builder, this.topology2Builder, this.topology3Builder), this.props, this.clientSupplier);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(15L));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), (Matcher)CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    private static void produceToInputTopics(String topic, Collection<KeyValue<String, Long>> records) {
        IntegrationTestUtils.produceKeyValuesSynchronously(topic, records, producerConfig, (Time)NamedTopologyIntegrationTest.CLUSTER.time);
    }

    private List<NamedTopology> buildNamedTopologies(NamedTopologyStreamsBuilder ... builders) {
        ArrayList<NamedTopology> topologies = new ArrayList<NamedTopology>();
        for (NamedTopologyStreamsBuilder builder : builders) {
            topologies.add(builder.buildNamedTopology(this.props));
        }
        return topologies;
    }

    static {
        STANDARD_INPUT_DATA = Arrays.asList(KeyValue.pair((Object)"A", (Object)100L), KeyValue.pair((Object)"B", (Object)200L), KeyValue.pair((Object)"A", (Object)300L), KeyValue.pair((Object)"C", (Object)400L), KeyValue.pair((Object)"C", (Object)-50L));
        COUNT_OUTPUT_DATA = Arrays.asList(KeyValue.pair((Object)"B", (Object)1L), KeyValue.pair((Object)"A", (Object)2L), KeyValue.pair((Object)"C", (Object)2L));
        SUM_OUTPUT_DATA = Arrays.asList(KeyValue.pair((Object)"B", (Object)200L), KeyValue.pair((Object)"A", (Object)400L), KeyValue.pair((Object)"C", (Object)350L));
    }
}

