/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamKTableLeftJoinTest {
    private final String streamTopic = "streamTopic";
    private final String tableTopic = "tableTopic";
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
    private TopologyTestDriver driver;
    private MockProcessor<Integer, String> processor;
    private final int[] expectedKeys = new int[]{0, 1, 2, 3};
    private StreamsBuilder builder;

    @Before
    public void setUp() {
        this.builder = new StreamsBuilder();
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        Consumed consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
        KStream stream = this.builder.stream("streamTopic", consumed);
        KTable table = this.builder.table("tableTopic", consumed);
        stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier, new String[0]);
        Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
        this.driver = new TopologyTestDriver(this.builder.build(), props, 0L);
        this.processor = supplier.theCapturedProcessor();
    }

    @After
    public void cleanup() {
        this.driver.close();
    }

    private void pushToStream(int messageCount, String valuePrefix) {
        for (int i = 0; i < messageCount; ++i) {
            this.driver.pipeInput(this.recordFactory.create("streamTopic", (Object)this.expectedKeys[i], (Object)(valuePrefix + this.expectedKeys[i])));
        }
    }

    private void pushToTable(int messageCount, String valuePrefix) {
        for (int i = 0; i < messageCount; ++i) {
            this.driver.pipeInput(this.recordFactory.create("tableTopic", (Object)this.expectedKeys[i], (Object)(valuePrefix + this.expectedKeys[i])));
        }
    }

    private void pushNullValueToTable(int messageCount) {
        for (int i = 0; i < messageCount; ++i) {
            this.driver.pipeInput(this.recordFactory.create("tableTopic", (Object)this.expectedKeys[i], (Object)null));
        }
    }

    @Test
    public void shouldRequireCopartitionedStreams() {
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(this.builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("streamTopic", "tableTopic")), copartitionGroups.iterator().next());
    }

    @Test
    public void shouldJoinWithEmptyTableOnStreamUpdates() {
        this.pushToStream(2, "X");
        this.processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
    }

    @Test
    public void shouldNotJoinOnTableUpdates() {
        this.pushToStream(2, "X");
        this.processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
        this.pushToTable(2, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X");
        this.processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
        this.pushToTable(4, "YY");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X");
        this.processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
        this.pushToTable(4, "YYY");
        this.processor.checkAndClearProcessResult(new String[0]);
    }

    @Test
    public void shouldJoinRegardlessIfMatchFoundOnStreamUpdates() {
        this.pushToTable(2, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X");
        this.processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
    }

    @Test
    public void shouldClearTableEntryOnNullValueUpdates() {
        this.pushToTable(4, "Y");
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "X");
        this.processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3");
        this.pushNullValueToTable(2);
        this.processor.checkAndClearProcessResult(new String[0]);
        this.pushToStream(4, "XX");
        this.processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
    }
}

