/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class RegexSourceIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String FA_TOPIC = "fa";
    private static final String FOO_TOPIC = "foo";
    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private Properties streamsConfiguration;
    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
    private KafkaStreams streams;

    public RegexSourceIntegrationTest() {
        this.mockTime = RegexSourceIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startKafkaCluster() throws InterruptedException {
        CLUSTER.createTopics(TOPIC_1, TOPIC_2, TOPIC_A, TOPIC_C, TOPIC_Y, TOPIC_Z, FA_TOPIC, FOO_TOPIC, DEFAULT_OUTPUT_TOPIC);
        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
    }

    @Before
    public void setUp() throws Exception {
        CLUSTER.deleteAndRecreateTopics(DEFAULT_OUTPUT_TOPIC);
        Properties properties = new Properties();
        properties.put("cache.max.bytes.buffering", (Object)0);
        properties.put("commit.interval.ms", (Object)100);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("internal.leave.group.on.close", (Object)true);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
    }

    @After
    public void tearDown() throws IOException {
        if (this.streams != null) {
            this.streams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
        Serde stringSerde = Serdes.String();
        List<String> expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1");
        List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
        CLUSTER.createTopic("TEST-TOPIC-1");
        StreamsBuilder builder = new StreamsBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        final CopyOnWriteArrayList assignedTopics = new CopyOnWriteArrayList();
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

            public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                    public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                        super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(assignedTopics, listener));
                    }
                };
            }
        });
        this.streams.start();
        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), (String)STREAM_TASKS_NOT_UPDATED);
        CLUSTER.createTopic("TEST-TOPIC-2");
        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), (String)STREAM_TASKS_NOT_UPDATED);
    }

    @Test
    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
        Serde stringSerde = Serdes.String();
        List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
        List<String> expectedSecondAssignment = Collections.singletonList("TEST-TOPIC-B");
        CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
        StreamsBuilder builder = new StreamsBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        final CopyOnWriteArrayList assignedTopics = new CopyOnWriteArrayList();
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

            public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                    public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                        super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(assignedTopics, listener));
                    }
                };
            }
        });
        this.streams.start();
        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), (String)STREAM_TASKS_NOT_UPDATED);
        CLUSTER.deleteTopic("TEST-TOPIC-A");
        TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), (String)STREAM_TASKS_NOT_UPDATED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        MockKeyValueStoreBuilder storeBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
        long thirtySecondTimeout = 30000L;
        TopologyWrapper topology = new TopologyWrapper();
        topology.addSource("ingest", Pattern.compile("topic-\\d+"));
        topology.addProcessor("my-processor", processorSupplier, new String[]{"ingest"});
        topology.addStateStore((StoreBuilder)storeBuilder, new String[]{"my-processor"});
        this.streams = new KafkaStreams((Topology)topology, this.streamsConfiguration);
        try {
            this.streams.start();
            TestCondition stateStoreNameBoundToSourceTopic = () -> {
                Map stateStoreToSourceTopic = topology.getInternalBuilder().stateStoreNameToSourceTopics();
                List topicNamesList = (List)stateStoreToSourceTopic.get("testStateStore");
                return topicNamesList != null && !topicNamesList.isEmpty() && ((String)topicNamesList.get(0)).equals(TOPIC_1);
            };
            TestUtils.waitForCondition((TestCondition)stateStoreNameBoundToSourceTopic, (long)30000L, (String)"Did not find topic: [topic-1] connected to state store: [testStateStore]");
        }
        finally {
            this.streams.close();
        }
    }

    @Test
    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
        String topic1TestMessage = "topic-1 test";
        String topic2TestMessage = "topic-2 test";
        String topicATestMessage = "topic-A test";
        String topicCTestMessage = "topic-C test";
        String topicYTestMessage = "topic-Y test";
        String topicZTestMessage = "topic-Z test";
        Serde stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
        KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
        KStream namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z));
        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        namedTopicsStream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singleton("topic-1 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singleton("topic-2 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singleton("topic-A test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singleton("topic-C test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singleton("topic-Y test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singleton("topic-Z test"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        List<String> expectedReceivedValues = Arrays.asList("topic-A test", "topic-1 test", "topic-2 test", "topic-C test", "topic-Y test", "topic-Z test");
        List receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
        ArrayList<Object> actualValues = new ArrayList<Object>(6);
        for (KeyValue receivedKeyValue : receivedKeyValues) {
            actualValues.add(receivedKeyValue.value);
        }
        Collections.sort(actualValues);
        Collections.sort(expectedReceivedValues);
        Assert.assertThat(actualValues, (Matcher)CoreMatchers.equalTo(expectedReceivedValues));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
        KafkaStreams partitionedStreamsLeader = null;
        KafkaStreams partitionedStreamsFollower = null;
        try {
            Serde stringSerde = Serdes.String();
            StreamsBuilder builderLeader = new StreamsBuilder();
            StreamsBuilder builderFollower = new StreamsBuilder();
            List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
            KStream partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
            KStream partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d"));
            partitionedStreamLeader.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
            partitionedStreamFollower.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
            final ArrayList leaderAssignment = new ArrayList();
            final ArrayList followerAssignment = new ArrayList();
            partitionedStreamsLeader = new KafkaStreams(builderLeader.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(leaderAssignment, listener));
                        }
                    };
                }
            });
            partitionedStreamsFollower = new KafkaStreams(builderFollower.build(), this.streamsConfiguration, (KafkaClientSupplier)new DefaultKafkaClientSupplier(){

                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> config) {
                    return new KafkaConsumer<byte[], byte[]>(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer()){

                        public void subscribe(Pattern topics, ConsumerRebalanceListener listener) {
                            super.subscribe(topics, (ConsumerRebalanceListener)new TheConsumerRebalanceListener(followerAssignment, listener));
                        }
                    };
                }
            });
            partitionedStreamsLeader.start();
            partitionedStreamsFollower.start();
            TestUtils.waitForCondition(() -> followerAssignment.equals(expectedAssignment) && leaderAssignment.equals(expectedAssignment), (String)"topic assignment not completed");
        }
        finally {
            if (partitionedStreamsLeader != null) {
                partitionedStreamsLeader.close();
            }
            if (partitionedStreamsFollower != null) {
                partitionedStreamsFollower.close();
            }
        }
    }

    @Test
    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
        String fMessage = "fMessage";
        String fooMessage = "fooMessage";
        Serde stringSerde = Serdes.String();
        StreamsBuilder builder = new StreamsBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("foo.*"));
        KStream pattern2Stream = builder.stream(Pattern.compile("f.*"));
        pattern1Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        pattern2Stream.to(DEFAULT_OUTPUT_TOPIC, Produced.with((Serde)stringSerde, (Serde)stringSerde));
        AtomicBoolean expectError = new AtomicBoolean(false);
        this.streams = new KafkaStreams(builder.build(), this.streamsConfiguration);
        this.streams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.ERROR) {
                expectError.set(true);
            }
        });
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Collections.singleton("fMessage"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Collections.singleton("fooMessage"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        try {
            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000L);
            throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this.");
        }
        catch (AssertionError assertionError) {
            Assert.assertThat((Object)expectError.get(), (Matcher)CoreMatchers.is((Object)true));
            return;
        }
    }

    private static class TheConsumerRebalanceListener
    implements ConsumerRebalanceListener {
        private final List<String> assignedTopics;
        private final ConsumerRebalanceListener listener;

        TheConsumerRebalanceListener(List<String> assignedTopics, ConsumerRebalanceListener listener) {
            this.assignedTopics = assignedTopics;
            this.listener = listener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.assignedTopics.clear();
            this.listener.onPartitionsRevoked(partitions);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            for (TopicPartition partition : partitions) {
                this.assignedTopics.add(partition.topic());
            }
            Collections.sort(this.assignedTopics);
            this.listener.onPartitionsAssigned(partitions);
        }
    }
}

