/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

public class KStreamKStreamLeftJoinTest {
    private static final String[] EMPTY = new String[0];
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory((Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), 0L);
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Test
    public void testLeftJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            int i;
            MockProcessor processor = supplier.theCapturedProcessor();
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
            for (i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("a" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
            for (i = 0; i < 3; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("B" + expectedKeys[i])));
            }
            processor.checkAndClearProcessResult("0:B0+a0 (ts: 0)", "1:B1+a1 (ts: 0)", "2:B2+null (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("b" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:A0+b0 (ts: 0)", "0:B0+b0 (ts: 0)", "1:A1+b1 (ts: 0)", "1:B1+b1 (ts: 0)", "2:B2+b2 (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("C" + expectedKey)));
            }
            processor.checkAndClearProcessResult("0:C0+a0 (ts: 0)", "0:C0+b0 (ts: 0)", "1:C1+a1 (ts: 0)", "1:C1+b1 (ts: 0)", "2:C2+b2 (ts: 0)", "3:C3+b3 (ts: 0)");
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)), Joined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            MockProcessor<Integer, String> processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (int i = 0; i < 2; ++i) {
                driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), 0L));
            }
            processor.checkAndClearProcessResult("0:A0+null (ts: 0)", "1:A1+null (ts: 0)");
            for (int expectedKey : expectedKeys) {
                driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKey, (Object)("a" + expectedKey), 0L));
            }
            processor.checkAndClearProcessResult("0:A0+a0 (ts: 0)", "1:A1+a1 (ts: 0)");
            this.testUpperWindowBound(expectedKeys, driver, processor);
            this.testLowerWindowBound(expectedKeys, driver, processor);
        }
    }

    private void testUpperWindowBound(int[] expectedKeys, TopologyTestDriver driver, MockProcessor<Integer, String> processor) {
        long time = 1000L;
        for (int i = 0; i < expectedKeys.length; ++i) {
            driver.pipeInput(this.recordFactory.create("topic2", (Object)expectedKeys[i], (Object)("b" + expectedKeys[i]), time + (long)i));
        }
        processor.checkAndClearProcessResult(EMPTY);
        time = 1100L;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("B" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:B0+b0 (ts: 1100)", "1:B1+b1 (ts: 1100)", "2:B2+b2 (ts: 1100)", "3:B3+b3 (ts: 1100)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("C" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:C0+null (ts: 1101)", "1:C1+b1 (ts: 1101)", "2:C2+b2 (ts: 1101)", "3:C3+b3 (ts: 1101)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("D" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:D0+null (ts: 1102)", "1:D1+null (ts: 1102)", "2:D2+b2 (ts: 1102)", "3:D3+b3 (ts: 1102)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("E" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:E0+null (ts: 1103)", "1:E1+null (ts: 1103)", "2:E2+null (ts: 1103)", "3:E3+b3 (ts: 1103)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("F" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:F0+null (ts: 1104)", "1:F1+null (ts: 1104)", "2:F2+null (ts: 1104)", "3:F3+null (ts: 1104)");
    }

    private void testLowerWindowBound(int[] expectedKeys, TopologyTestDriver driver, MockProcessor<Integer, String> processor) {
        long time = 899L;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("G" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:G0+null (ts: 899)", "1:G1+null (ts: 899)", "2:G2+null (ts: 899)", "3:G3+null (ts: 899)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("H" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:H0+b0 (ts: 1000)", "1:H1+null (ts: 900)", "2:H2+null (ts: 900)", "3:H3+null (ts: 900)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("I" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:I0+b0 (ts: 1000)", "1:I1+b1 (ts: 1001)", "2:I2+null (ts: 901)", "3:I3+null (ts: 901)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("J" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:J0+b0 (ts: 1000)", "1:J1+b1 (ts: 1001)", "2:J2+b2 (ts: 1002)", "3:J3+null (ts: 902)");
        ++time;
        for (int expectedKey : expectedKeys) {
            driver.pipeInput(this.recordFactory.create("topic1", (Object)expectedKey, (Object)("K" + expectedKey), time));
        }
        processor.checkAndClearProcessResult("0:K0+b0 (ts: 1000)", "1:K1+b1 (ts: 1001)", "2:K2+b2 (ts: 1002)", "3:K3+b3 (ts: 1003)");
    }
}

