/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

public class KStreamPeekTest {
    private final String topicName = "topic";
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());

    @Test
    public void shouldObserveStreamElements() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream("topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        ArrayList peekObserved = new ArrayList();
        ArrayList streamObserved = new ArrayList();
        stream.peek(KStreamPeekTest.collect(peekObserved)).foreach(KStreamPeekTest.collect(streamObserved));
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            ArrayList<KeyValue> expected = new ArrayList<KeyValue>();
            for (int key = 0; key < 32; ++key) {
                String value = "V" + key;
                driver.pipeInput(this.recordFactory.create("topic", (Object)key, (Object)value));
                expected.add(new KeyValue((Object)key, (Object)value));
            }
            Assert.assertEquals(expected, peekObserved);
            Assert.assertEquals(expected, streamObserved);
        }
    }

    @Test
    public void shouldNotAllowNullAction() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream("topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        try {
            stream.peek(null);
            Assert.fail((String)"expected null action to throw NPE");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    private static <K, V> ForeachAction<K, V> collect(List<KeyValue<K, V>> into) {
        return (key, value) -> into.add(new KeyValue(key, value));
    }
}

