/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamTransformIntegrationTest {
    private StreamsBuilder builder;
    private final String topic = "stream";
    private final String stateStoreName = "myTransformState";
    private final List<KeyValue<Integer, Integer>> results = new ArrayList<KeyValue<Integer, Integer>>();
    private final ForeachAction<Integer, Integer> action = new ForeachAction<Integer, Integer>(){

        public void apply(Integer key, Integer value) {
            KStreamTransformIntegrationTest.this.results.add(KeyValue.pair((Object)key, (Object)value));
        }
    };
    private KStream<Integer, Integer> stream;

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"myTransformState"), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        this.builder.addStateStore(keyValueStoreBuilder);
        this.stream = this.builder.stream("stream", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
    }

    private void verifyResult(List<KeyValue<Integer, Integer>> expected) {
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
        Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), props);){
            driver.pipeInput(recordFactory.create("stream", Arrays.asList(new KeyValue((Object)1, (Object)1), new KeyValue((Object)2, (Object)2), new KeyValue((Object)3, (Object)3), new KeyValue((Object)1, (Object)4), new KeyValue((Object)2, (Object)5), new KeyValue((Object)3, (Object)6))));
        }
        MatcherAssert.assertThat(this.results, (Matcher)IsEqual.equalTo(expected));
    }

    @Test
    public void shouldFlatTransform() throws Exception {
        this.stream.flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public Iterable<KeyValue<Integer, Integer>> transform(Integer key, Integer value) {
                ArrayList<KeyValue<Integer, Integer>> result = new ArrayList<KeyValue<Integer, Integer>>();
                this.state.putIfAbsent((Object)key, (Object)0);
                Integer storedValue = (Integer)this.state.get((Object)key);
                int outputValue = storedValue;
                for (int i = 0; i < 3; ++i) {
                    result.add((KeyValue<Integer, Integer>)new KeyValue((Object)(key + i), (Object)(value + outputValue++)));
                }
                this.state.put((Object)key, (Object)new Integer(outputValue));
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)4, (Object)4), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)4, (Object)4), KeyValue.pair((Object)5, (Object)5), KeyValue.pair((Object)1, (Object)7), KeyValue.pair((Object)2, (Object)8), KeyValue.pair((Object)3, (Object)9), KeyValue.pair((Object)2, (Object)8), KeyValue.pair((Object)3, (Object)9), KeyValue.pair((Object)4, (Object)10), KeyValue.pair((Object)3, (Object)9), KeyValue.pair((Object)4, (Object)10), KeyValue.pair((Object)5, (Object)11));
        this.verifyResult(expected);
    }

    @Test
    public void shouldTransform() throws Exception {
        this.stream.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
                this.state.putIfAbsent((Object)key, (Object)0);
                Integer storedValue = (Integer)this.state.get((Object)key);
                int outputValue = storedValue;
                KeyValue result = new KeyValue((Object)(key + 1), (Object)(value + outputValue++));
                this.state.put((Object)key, (Object)outputValue);
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)2, (Object)1), KeyValue.pair((Object)3, (Object)2), KeyValue.pair((Object)4, (Object)3), KeyValue.pair((Object)2, (Object)5), KeyValue.pair((Object)3, (Object)6), KeyValue.pair((Object)4, (Object)7));
        this.verifyResult(expected);
    }
}

