/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamFlatTransformValuesTest
extends EasyMockSupport {
    private Integer inputKey;
    private Integer inputValue;
    private ValueTransformerWithKey<Integer, Integer, Iterable<String>> valueTransformer;
    private InternalProcessorContext<Integer, String> context;
    private KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor<Integer, Integer, String> processor;

    @Before
    public void setUp() {
        this.inputKey = 1;
        this.inputValue = 10;
        this.valueTransformer = (ValueTransformerWithKey)this.mock(ValueTransformerWithKey.class);
        this.context = (InternalProcessorContext)this.strictMock(InternalProcessorContext.class);
        this.processor = new KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor(this.valueTransformer);
    }

    @Test
    public void shouldInitializeFlatTransformValuesProcessor() {
        this.valueTransformer.init((ProcessorContext)EasyMock.isA(ForwardingDisabledProcessorContext.class));
        this.replayAll();
        this.processor.init(this.context);
        this.verifyAll();
    }

    @Test
    public void shouldTransformInputRecordToMultipleOutputValues() {
        List<String> outputValues = Arrays.asList("Hello", "Blue", "Planet");
        this.processor.init(this.context);
        EasyMock.reset((Object[])new Object[]{this.valueTransformer});
        EasyMock.expect((Object)this.valueTransformer.transform((Object)this.inputKey, (Object)this.inputValue)).andReturn(outputValues);
        for (String outputValue : outputValues) {
            this.context.forward(new Record((Object)this.inputKey, (Object)outputValue, 0L));
        }
        this.replayAll();
        this.processor.process(new Record((Object)this.inputKey, (Object)this.inputValue, 0L));
        this.verifyAll();
    }

    @Test
    public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
        this.processor.init(this.context);
        EasyMock.reset((Object[])new Object[]{this.valueTransformer});
        EasyMock.expect((Object)this.valueTransformer.transform((Object)this.inputKey, (Object)this.inputValue)).andReturn(Collections.emptyList());
        this.replayAll();
        this.processor.process(new Record((Object)this.inputKey, (Object)this.inputValue, 0L));
        this.verifyAll();
    }

    @Test
    public void shouldEmitNoRecordIfTransformReturnsNull() {
        this.processor.init(this.context);
        EasyMock.reset((Object[])new Object[]{this.valueTransformer});
        EasyMock.expect((Object)this.valueTransformer.transform((Object)this.inputKey, (Object)this.inputValue)).andReturn(null);
        this.replayAll();
        this.processor.process(new Record((Object)this.inputKey, (Object)this.inputValue, 0L));
        this.verifyAll();
    }

    @Test
    public void shouldCloseFlatTransformValuesProcessor() {
        this.valueTransformer.close();
        this.replayAll();
        this.processor.close();
        this.verifyAll();
    }

    @Test
    public void shouldGetFlatTransformValuesProcessor() {
        ValueTransformerWithKeySupplier valueTransformerSupplier = (ValueTransformerWithKeySupplier)this.mock(ValueTransformerWithKeySupplier.class);
        KStreamFlatTransformValues processorSupplier = new KStreamFlatTransformValues(valueTransformerSupplier);
        EasyMock.expect((Object)valueTransformerSupplier.get()).andReturn(this.valueTransformer);
        this.replayAll();
        Processor processor = processorSupplier.get();
        this.verifyAll();
        Assert.assertTrue((boolean)(processor instanceof KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor));
    }
}

