/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class KStreamKStreamOuterJoinTest {
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final Consumed<Integer, String> consumed = Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String());
    private static final Properties PROPS = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @BeforeClass
    public static void beforeClass() {
        PROPS.put("__emit.interval.ms.kstreams.outer.join.spurious.results.fix__", (Object)0L);
    }

    @Test
    public void testOuterJoinDuplicatesWithFixDisabledOldApi() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of((Duration)Duration.ofMillis(100L)).grace(Duration.ofMillis(10L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(PROPS), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            Assert.assertEquals((long)2L, (long)driver.getAllStateStores().size());
            inputTopic1.pipeInput((Object)0, (Object)"A0", 0L);
            inputTopic1.pipeInput((Object)0, (Object)"A0-0", 0L);
            inputTopic2.pipeInput((Object)0, (Object)"a0", 0L);
            inputTopic2.pipeInput((Object)1, (Object)"b1", 0L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(0, "A0-0+null", 0L), new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "A0-0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "null+b1", 0L));
        }
    }

    @Test
    public void testOuterJoinDuplicates() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(100L), (Duration)Duration.ofMillis(10L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            inputTopic1.pipeInput((Object)0, (Object)"A0", 0L);
            inputTopic1.pipeInput((Object)0, (Object)"A0-0", 0L);
            inputTopic2.pipeInput((Object)1, (Object)"a1", 0L);
            inputTopic2.pipeInput((Object)1, (Object)"a1-0", 0L);
            inputTopic2.pipeInput((Object)1, (Object)"a0", 111L);
            inputTopic2.pipeInput((Object)3, (Object)"dummy", 211L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "null+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "null+a1-0", 0L), new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(0, "A0-0+null", 0L));
            inputTopic1.pipeInput((Object)2, (Object)"A2", 200L);
            inputTopic1.pipeInput((Object)2, (Object)"A2-0", 200L);
            inputTopic2.pipeInput((Object)2, (Object)"a2", 201L);
            inputTopic2.pipeInput((Object)2, (Object)"a2-0", 201L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+a2", 201L), new KeyValueTimestamp<Integer, String>(2, "A2-0+a2", 201L), new KeyValueTimestamp<Integer, String>(2, "A2+a2-0", 201L), new KeyValueTimestamp<Integer, String>(2, "A2-0+a2-0", 201L));
            inputTopic2.pipeInput((Object)3, (Object)"dummy", 1500L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "null+a0", 111L), new KeyValueTimestamp<Integer, String>(3, "null+dummy", 211L));
        }
    }

    @Test
    public void testLeftExpiredNonJoinedRecordsAreEmittedByTheLeftProcessor() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            long windowStart = 0L;
            inputTopic1.pipeInput((Object)0, (Object)"A0", 1L);
            inputTopic1.pipeInput((Object)1, (Object)"A1", 2L);
            inputTopic1.pipeInput((Object)0, (Object)"A0-0", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)1, (Object)"a1", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+a1", 3L));
            inputTopic1.pipeInput((Object)2, (Object)"dummy", 401L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 1L), new KeyValueTimestamp<Integer, String>(0, "A0-0+null", 3L));
            inputTopic2.pipeInput((Object)2, (Object)"dummy", 401L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "dummy+dummy", 401L));
        }
    }

    @Test
    public void testLeftExpiredNonJoinedRecordsAreEmittedByTheRightProcessor() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(100L), (Duration)Duration.ofMillis(0L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            long windowStart = 0L;
            inputTopic1.pipeInput((Object)0, (Object)"A0", 1L);
            inputTopic1.pipeInput((Object)1, (Object)"A1", 2L);
            inputTopic1.pipeInput((Object)0, (Object)"A0-0", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)1, (Object)"a1", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+a1", 3L));
            inputTopic2.pipeInput((Object)2, (Object)"dummy", 401L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 1L), new KeyValueTimestamp<Integer, String>(0, "A0-0+null", 3L));
            inputTopic1.pipeInput((Object)2, (Object)"dummy", 402L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "dummy+dummy", 402L));
        }
    }

    @Test
    public void testRightExpiredNonJoinedRecordsAreEmittedByTheLeftProcessor() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            long windowStart = 0L;
            inputTopic2.pipeInput((Object)0, (Object)"A0", 1L);
            inputTopic2.pipeInput((Object)1, (Object)"A1", 2L);
            inputTopic2.pipeInput((Object)0, (Object)"A0-0", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic1.pipeInput((Object)1, (Object)"a1", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "a1+A1", 3L));
            inputTopic1.pipeInput((Object)2, (Object)"dummy", 401L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "null+A0", 1L), new KeyValueTimestamp<Integer, String>(0, "null+A0-0", 3L));
            inputTopic2.pipeInput((Object)2, (Object)"dummy", 402L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "dummy+dummy", 402L));
        }
    }

    @Test
    public void testRightExpiredNonJoinedRecordsAreEmittedByTheRightProcessor() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(100L), (Duration)Duration.ofMillis(0L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            long windowStart = 0L;
            inputTopic2.pipeInput((Object)0, (Object)"A0", 1L);
            inputTopic2.pipeInput((Object)1, (Object)"A1", 2L);
            inputTopic2.pipeInput((Object)0, (Object)"A0-0", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic1.pipeInput((Object)1, (Object)"a1", 3L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "a1+A1", 3L));
            inputTopic2.pipeInput((Object)2, (Object)"dummy", 401L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "null+A0", 1L), new KeyValueTimestamp<Integer, String>(0, "null+A0-0", 3L));
            inputTopic1.pipeInput((Object)2, (Object)"dummy", 402L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "dummy+dummy", 402L));
        }
    }

    @Test
    public void testOrdering() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            inputTopic1.pipeInput((Object)0, (Object)"A0", 0L);
            inputTopic1.pipeInput((Object)1, (Object)"A1", 100L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)1, (Object)"a1", 110L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 110L));
        }
    }

    @Test
    public void testGracePeriod() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(100L), (Duration)Duration.ofMillis(10L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            inputTopic1.pipeInput((Object)0, (Object)"A0", 0L);
            inputTopic2.pipeInput((Object)1, (Object)"a1", 0L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)0, (Object)"a0", 101L);
            inputTopic1.pipeInput((Object)1, (Object)"A1", 101L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)0, (Object)"dummy", 211L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "null+a1", 0L), new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L));
        }
    }

    @Test
    public void testOuterJoinWithInMemoryCustomSuppliers() {
        JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L));
        WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore((String)"in-memory-join-store-other", (Duration)Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), (Duration)Duration.ofMillis(joinWindows.size()), (boolean)true);
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String());
        this.runOuterJoin((StreamJoined<Integer, String, String>)streamJoined.withThisStoreSupplier(thisStoreSupplier).withOtherStoreSupplier(otherStoreSupplier), joinWindows);
    }

    @Test
    public void testOuterJoinWithDefaultSuppliers() {
        JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L));
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String());
        this.runOuterJoin((StreamJoined<Integer, String, String>)streamJoined, joinWindows);
    }

    public void runOuterJoin(StreamJoined<Integer, String, String> streamJoined, JoinWindows joinWindows) {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, joinWindows, streamJoined);
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            int i;
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            Assert.assertEquals((long)3L, (long)driver.getAllStateStores().size());
            for (i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            for (i = 0; i < 2; ++i) {
                inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("a" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            for (i = 0; i < 3; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("B" + expectedKeys[i]));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+a1", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("b" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+b0", 0L), new KeyValueTimestamp<Integer, String>(0, "B0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+b1", 0L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 0L));
            for (int expectedKey : expectedKeys) {
                inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey));
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+a0", 0L), new KeyValueTimestamp<Integer, String>(0, "C0+b0", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+a1", 0L), new KeyValueTimestamp<Integer, String>(1, "C1+b1", 0L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 0L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 0L));
            inputTopic1.pipeInput((Object)0, (Object)"dummy", 400L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
        Assert.assertEquals((long)1L, (long)copartitionGroups.size());
        Assert.assertEquals(new HashSet<String>(Arrays.asList("topic1", "topic2")), copartitionGroups.iterator().next());
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor<Integer, String, Void, Void> processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (int i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), 0L);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            for (int expectedKey : expectedKeys) {
                inputTopic2.pipeInput((Object)expectedKey, (Object)("a" + expectedKey), 0L);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 0L), new KeyValueTimestamp<Integer, String>(1, "A1+a1", 0L));
            this.testUpperWindowBound(expectedKeys, driver, processor);
            this.testLowerWindowBound(expectedKeys, driver, processor);
        }
    }

    @Test
    public void shouldNotEmitLeftJoinResultForAsymmetricBeforeWindow() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)).before(Duration.ZERO), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (int i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)expectedKeys[0], (Object)("a" + expectedKeys[0]), time += 100L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+a0", 100L));
            inputTopic2.pipeInput((Object)expectedKeys[1], (Object)("a" + expectedKeys[1]), time += 2L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+null", 1L));
            inputTopic2.pipeInput((Object)expectedKeys[2], (Object)("a" + expectedKeys[2]), ++time);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "null+a1", 102L));
            inputTopic1.pipeInput((Object)expectedKeys[2], (Object)("A" + expectedKeys[2]), time);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+a2", 103L));
        }
    }

    @Test
    public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
        StreamsBuilder builder = new StreamsBuilder();
        int[] expectedKeys = new int[]{0, 1, 2, 3};
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)).after(Duration.ZERO), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            long time = 0L;
            for (int i = 0; i < 2; ++i) {
                inputTopic1.pipeInput((Object)expectedKeys[i], (Object)("A" + expectedKeys[i]), time + (long)i);
            }
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic2.pipeInput((Object)expectedKeys[1], (Object)("a" + expectedKeys[1]), ++time);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "A1+a1", 1L));
            inputTopic2.pipeInput((Object)expectedKeys[2], (Object)("a" + expectedKeys[2]), time += 100L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "A0+null", 0L));
            inputTopic2.pipeInput((Object)expectedKeys[3], (Object)("a" + expectedKeys[3]), time);
            processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            inputTopic1.pipeInput((Object)expectedKeys[2], (Object)("A" + expectedKeys[2]), time += 100L);
            processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "A2+a2", 201L));
        }
    }

    @Test
    public void shouldForwardCurrentHeaders() {
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace((Duration)Duration.ofMillis(100L), (Duration)Duration.ofMillis(10L)), StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()));
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor processor = supplier.theCapturedProcessor();
            inputTopic1.pipeInput(new TestRecord((Object)0, (Object)"A0", (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{1})}), Long.valueOf(0L)));
            inputTopic2.pipeInput(new TestRecord((Object)1, (Object)"a0", (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{2})}), Long.valueOf(0L)));
            inputTopic2.pipeInput(new TestRecord((Object)3, (Object)"dummy", (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{3})}), Long.valueOf(211L)));
            processor.checkAndClearProcessedRecords(new Record((Object)1, (Object)"null+a0", 0L, (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{3})})), new Record((Object)0, (Object)"A0+null", 0L, (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{3})})));
            inputTopic1.pipeInput(new TestRecord((Object)2, (Object)"A2", (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{4})}), Long.valueOf(200L)));
            inputTopic2.pipeInput(new TestRecord((Object)2, (Object)"a2", (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{5})}), Long.valueOf(200L)));
            processor.checkAndClearProcessedRecords(new Record((Object)2, (Object)"A2+a2", 200L, (Headers)new RecordHeaders(new Header[]{new RecordHeader("h", new byte[]{5})})));
        }
    }

    private void testUpperWindowBound(int[] expectedKeys, TopologyTestDriver driver, MockApiProcessor<Integer, String, Void, Void> processor) {
        TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
        TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
        long time = 1000L;
        for (int i = 0; i < expectedKeys.length; ++i) {
            inputTopic2.pipeInput((Object)expectedKeys[i], (Object)("b" + expectedKeys[i]), time + (long)i);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "null+a2", 0L), new KeyValueTimestamp<Integer, String>(3, "null+a3", 0L));
        time = 1100L;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("B" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "B0+b0", 1100L), new KeyValueTimestamp<Integer, String>(1, "B1+b1", 1100L), new KeyValueTimestamp<Integer, String>(2, "B2+b2", 1100L), new KeyValueTimestamp<Integer, String>(3, "B3+b3", 1100L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("C" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(1, "C1+b1", 1101L), new KeyValueTimestamp<Integer, String>(2, "C2+b2", 1101L), new KeyValueTimestamp<Integer, String>(3, "C3+b3", 1101L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("D" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(2, "D2+b2", 1102L), new KeyValueTimestamp<Integer, String>(3, "D3+b3", 1102L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("E" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(3, "E3+b3", 1103L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("F" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
        inputTopic1.pipeInput((Object)0, (Object)"dummy", time += 301L);
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "C0+null", 1101L), new KeyValueTimestamp<Integer, String>(0, "D0+null", 1102L), new KeyValueTimestamp<Integer, String>(1, "D1+null", 1102L), new KeyValueTimestamp<Integer, String>(0, "E0+null", 1103L), new KeyValueTimestamp<Integer, String>(1, "E1+null", 1103L), new KeyValueTimestamp<Integer, String>(2, "E2+null", 1103L), new KeyValueTimestamp<Integer, String>(0, "F0+null", 1104L), new KeyValueTimestamp<Integer, String>(1, "F1+null", 1104L), new KeyValueTimestamp<Integer, String>(2, "F2+null", 1104L), new KeyValueTimestamp<Integer, String>(3, "F3+null", 1104L));
    }

    private void testLowerWindowBound(int[] expectedKeys, TopologyTestDriver driver, MockApiProcessor<Integer, String, Void, Void> processor) {
        TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
        long time = 899L;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("G" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "G0+null", 899L), new KeyValueTimestamp<Integer, String>(1, "G1+null", 899L), new KeyValueTimestamp<Integer, String>(2, "G2+null", 899L), new KeyValueTimestamp<Integer, String>(3, "G3+null", 899L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("H" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "H0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "H1+null", 900L), new KeyValueTimestamp<Integer, String>(2, "H2+null", 900L), new KeyValueTimestamp<Integer, String>(3, "H3+null", 900L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("I" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "I0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "I1+b1", 1001L), new KeyValueTimestamp<Integer, String>(2, "I2+null", 901L), new KeyValueTimestamp<Integer, String>(3, "I3+null", 901L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("J" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "J0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "J1+b1", 1001L), new KeyValueTimestamp<Integer, String>(2, "J2+b2", 1002L), new KeyValueTimestamp<Integer, String>(3, "J3+null", 902L));
        ++time;
        for (int expectedKey : expectedKeys) {
            inputTopic1.pipeInput((Object)expectedKey, (Object)("K" + expectedKey), time);
        }
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "K0+b0", 1000L), new KeyValueTimestamp<Integer, String>(1, "K1+b1", 1001L), new KeyValueTimestamp<Integer, String>(2, "K2+b2", 1002L), new KeyValueTimestamp<Integer, String>(3, "K3+b3", 1003L));
        inputTopic1.pipeInput((Object)0, (Object)"dummy", time + 200L);
        processor.checkAndClearProcessResult(new KeyValueTimestamp<Integer, String>(0, "dummy+null", 1103L));
    }

    @Test
    public void shouldJoinWithNonTimestampedStore() {
        CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
        StreamJoined streamJoined = StreamJoined.with((Serde)Serdes.Integer(), (Serde)Serdes.String(), (Serde)Serdes.String()).withDslStoreSuppliers((DslStoreSuppliers)suppliers);
        StreamsBuilder builder = new StreamsBuilder();
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        KStream stream1 = builder.stream("topic1", this.consumed);
        KStream stream2 = builder.stream("topic2", this.consumed);
        KStream joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(100L)), streamJoined);
        joined.process(supplier, new String[0]);
        try (TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS);){
            MatcherAssert.assertThat((String)"Expected stream joined to supply builders that create non-timestamped stores", (!WrappedStateStore.isTimestamped((StateStore)suppliers.capture.get().get()) ? 1 : 0) != 0);
        }
    }

    public static class CapturingStoreSuppliers
    extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers {
        final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference();

        public KeyValueBytesStoreSupplier keyValueStore(DslKeyValueParams params) {
            KeyValueBytesStoreSupplier result = super.keyValueStore(params);
            this.capture.set(result);
            return result;
        }
    }
}

