/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyTestDriverWrapper;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KTableFilterTest {
    private final Consumed<String, Integer> consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer());
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.Integer());
    private final Predicate<String, Integer> predicate = (key, value) -> value % 2 == 0;

    @Before
    public void setUp() {
        this.props.setProperty("cache.max.bytes.buffering", "0");
    }

    private void doTestKTable(StreamsBuilder builder, KTable<String, Integer> table2, KTable<String, Integer> table3, String topic) {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        table3.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(topic, (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)1, 10L);
            inputTopic.pipeInput((Object)"B", (Object)2, 5L);
            inputTopic.pipeInput((Object)"C", (Object)3, 8L);
            inputTopic.pipeInput((Object)"D", (Object)4, 14L);
            inputTopic.pipeInput((Object)"A", null, 18L);
            inputTopic.pipeInput((Object)"B", null, 15L);
        }
        List processors = supplier.capturedProcessors(2);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<String, Object>("A", null, 10L), new KeyValueTimestamp<String, Integer>("B", 2, 5L), new KeyValueTimestamp<String, Object>("C", null, 8L), new KeyValueTimestamp<String, Integer>("D", 4, 14L), new KeyValueTimestamp<String, Object>("A", null, 18L), new KeyValueTimestamp<String, Object>("B", null, 15L));
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<String, Integer>("A", 1, 10L), new KeyValueTimestamp<String, Object>("B", null, 5L), new KeyValueTimestamp<String, Integer>("C", 3, 8L), new KeyValueTimestamp<String, Object>("D", null, 14L), new KeyValueTimestamp<String, Object>("A", null, 18L), new KeyValueTimestamp<String, Object>("B", null, 15L));
    }

    @Test
    public void shouldPassThroughWithoutMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.filter(this.predicate);
        KTable table3 = table1.filterNot(this.predicate);
        Assert.assertNull((Object)table1.queryableStoreName());
        Assert.assertNull((Object)table2.queryableStoreName());
        Assert.assertNull((Object)table3.queryableStoreName());
        this.doTestKTable(builder, (KTable<String, Integer>)table2, (KTable<String, Integer>)table3, "topic1");
    }

    @Test
    public void shouldPassThroughOnMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.filter(this.predicate, Materialized.as((String)"store2"));
        KTable table3 = table1.filterNot(this.predicate);
        Assert.assertNull((Object)table1.queryableStoreName());
        Assert.assertEquals((Object)"store2", (Object)table2.queryableStoreName());
        Assert.assertNull((Object)table3.queryableStoreName());
        this.doTestKTable(builder, (KTable<String, Integer>)table2, (KTable<String, Integer>)table3, "topic1");
    }

    private void doTestValueGetter(StreamsBuilder builder, KTableImpl<String, Integer, Integer> table2, KTableImpl<String, Integer, Integer> table3, String topic1) {
        Topology topology = builder.build();
        KTableValueGetterSupplier getterSupplier2 = table2.valueGetterSupplier();
        KTableValueGetterSupplier getterSupplier3 = table3.valueGetterSupplier();
        InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
        topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
        topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
        try (TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(topic1, (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            KTableValueGetter getter2 = getterSupplier2.get();
            KTableValueGetter getter3 = getterSupplier3.get();
            getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
            getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
            inputTopic.pipeInput((Object)"A", (Object)1, 5L);
            inputTopic.pipeInput((Object)"B", (Object)1, 10L);
            inputTopic.pipeInput((Object)"C", (Object)1, 15L);
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertNull((Object)getter2.get((Object)"B"));
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)5L), (Object)getter3.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)10L), (Object)getter3.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)15L), (Object)getter3.get((Object)"C"));
            inputTopic.pipeInput((Object)"A", (Object)2, 10L);
            inputTopic.pipeInput((Object)"B", (Object)2, 5L);
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2, (long)10L), (Object)getter2.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2, (long)5L), (Object)getter2.get((Object)"B"));
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)15L), (Object)getter3.get((Object)"C"));
            inputTopic.pipeInput((Object)"A", (Object)3, 15L);
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)2, (long)5L), (Object)getter2.get((Object)"B"));
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)3, (long)15L), (Object)getter3.get((Object)"A"));
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)15L), (Object)getter3.get((Object)"C"));
            inputTopic.pipeInput((Object)"A", null, 10L);
            inputTopic.pipeInput((Object)"B", null, 20L);
            Assert.assertNull((Object)getter2.get((Object)"A"));
            Assert.assertNull((Object)getter2.get((Object)"B"));
            Assert.assertNull((Object)getter2.get((Object)"C"));
            Assert.assertNull((Object)getter3.get((Object)"A"));
            Assert.assertNull((Object)getter3.get((Object)"B"));
            Assert.assertEquals((Object)ValueAndTimestamp.make((Object)1, (long)15L), (Object)getter3.get((Object)"C"));
        }
    }

    @Test
    public void shouldGetValuesOnMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate, Materialized.as((String)"store2"));
        KTableImpl table3 = (KTableImpl)table1.filterNot(this.predicate, Materialized.as((String)"store3"));
        KTableImpl table4 = (KTableImpl)table1.filterNot(this.predicate);
        Assert.assertNull((Object)table1.queryableStoreName());
        Assert.assertEquals((Object)"store2", (Object)table2.queryableStoreName());
        Assert.assertEquals((Object)"store3", (Object)table3.queryableStoreName());
        Assert.assertNull((Object)table4.queryableStoreName());
        this.doTestValueGetter(builder, (KTableImpl<String, Integer, Integer>)table2, (KTableImpl<String, Integer, Integer>)table3, "topic1");
    }

    private void doTestNotSendingOldValue(StreamsBuilder builder, KTableImpl<String, Integer, Integer> table1, KTableImpl<String, Integer, Integer> table2, String topic1) {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        builder.build().addProcessor("proc1", supplier, new String[]{table1.name});
        builder.build().addProcessor("proc2", supplier, new String[]{table2.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(topic1, (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)1, 5L);
            inputTopic.pipeInput((Object)"B", (Object)1, 10L);
            inputTopic.pipeInput((Object)"C", (Object)1, 15L);
            List processors = supplier.capturedProcessors(2);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)1, null), 5L), new KeyValueTimestamp<String, Change>("B", new Change((Object)1, null), 10L), new KeyValueTimestamp<String, Change>("C", new Change((Object)1, null), 15L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, null), 5L), new KeyValueTimestamp<String, Change>("B", new Change(null, null), 10L), new KeyValueTimestamp<String, Change>("C", new Change(null, null), 15L));
            inputTopic.pipeInput((Object)"A", (Object)2, 15L);
            inputTopic.pipeInput((Object)"B", (Object)2, 8L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)2, null), 15L), new KeyValueTimestamp<String, Change>("B", new Change((Object)2, null), 8L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)2, null), 15L), new KeyValueTimestamp<String, Change>("B", new Change((Object)2, null), 8L));
            inputTopic.pipeInput((Object)"A", (Object)3, 20L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)3, null), 20L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, null), 20L));
            inputTopic.pipeInput((Object)"A", null, 10L);
            inputTopic.pipeInput((Object)"B", null, 20L);
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, null), 10L), new KeyValueTimestamp<String, Change>("B", new Change(null, null), 20L));
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, null), 10L), new KeyValueTimestamp<String, Change>("B", new Change(null, null), 20L));
        }
    }

    @Test
    public void shouldNotSendOldValuesWithoutMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate);
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, "topic1");
    }

    @Test
    public void shouldNotSendOldValuesOnMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate, Materialized.as((String)"store2"));
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, "topic1");
    }

    @Test
    public void shouldNotEnableSendingOldValuesIfNotAlreadyMaterializedAndNotForcedToMaterialize() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate);
        table2.enableSendingOldValues(false);
        this.doTestNotSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, "topic1");
    }

    private void doTestSendingOldValue(StreamsBuilder builder, KTableImpl<String, Integer, Integer> table1, KTableImpl<String, Integer, Integer> table2, String topic1) {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        Topology topology = builder.build();
        topology.addProcessor("proc1", supplier, new String[]{table1.name});
        topology.addProcessor("proc2", supplier, new String[]{table2.name});
        boolean parentSendOldVals = table1.sendingOldValueEnabled();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            TestInputTopic inputTopic = driver.createInputTopic(topic1, (Serializer)new StringSerializer(), (Serializer)new IntegerSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)1, 5L);
            inputTopic.pipeInput((Object)"B", (Object)1, 10L);
            inputTopic.pipeInput((Object)"C", (Object)1, 15L);
            List processors = supplier.capturedProcessors(2);
            MockApiProcessor table1Output = processors.get(0);
            MockApiProcessor table2Output = processors.get(1);
            table1Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)1, null), 5L), new KeyValueTimestamp<String, Change>("B", new Change((Object)1, null), 10L), new KeyValueTimestamp<String, Change>("C", new Change((Object)1, null), 15L));
            table2Output.checkEmptyAndClearProcessResult();
            inputTopic.pipeInput((Object)"A", (Object)2, 15L);
            inputTopic.pipeInput((Object)"B", (Object)2, 8L);
            table1Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)2, (Object)(parentSendOldVals ? Integer.valueOf(1) : null)), 15L), new KeyValueTimestamp<String, Change>("B", new Change((Object)2, (Object)(parentSendOldVals ? Integer.valueOf(1) : null)), 8L));
            table2Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)2, null), 15L), new KeyValueTimestamp<String, Change>("B", new Change((Object)2, null), 8L));
            inputTopic.pipeInput((Object)"A", (Object)3, 20L);
            table1Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)3, (Object)(parentSendOldVals ? Integer.valueOf(2) : null)), 20L));
            table2Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, (Object)2), 20L));
            inputTopic.pipeInput((Object)"A", null, 10L);
            inputTopic.pipeInput((Object)"B", null, 20L);
            table1Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change(null, (Object)(parentSendOldVals ? Integer.valueOf(3) : null)), 10L), new KeyValueTimestamp<String, Change>("B", new Change(null, (Object)(parentSendOldVals ? Integer.valueOf(2) : null)), 20L));
            table2Output.checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("B", new Change(null, (Object)2), 20L));
        }
    }

    @Test
    public void shouldEnableSendOldValuesWhenNotMaterializedAlreadyButForcedToMaterialize() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate);
        table2.enableSendingOldValues(true);
        MatcherAssert.assertThat((Object)table1.sendingOldValueEnabled(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)table2.sendingOldValueEnabled(), (Matcher)Matchers.is((Object)true));
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, "topic1");
    }

    @Test
    public void shouldEnableSendOldValuesWhenMaterializedAlreadyAndForcedToMaterialize() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed);
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate, Materialized.as((String)"store2"));
        table2.enableSendingOldValues(true);
        MatcherAssert.assertThat((Object)table1.sendingOldValueEnabled(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)table2.sendingOldValueEnabled(), (Matcher)Matchers.is((Object)true));
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, "topic1");
    }

    @Test
    public void shouldSendOldValuesWhenEnabledOnUpStreamMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTableImpl table1 = (KTableImpl)builder.table("topic1", this.consumed, Materialized.as((String)"store2"));
        KTableImpl table2 = (KTableImpl)table1.filter(this.predicate);
        table2.enableSendingOldValues(false);
        MatcherAssert.assertThat((Object)table1.sendingOldValueEnabled(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)table2.sendingOldValueEnabled(), (Matcher)Matchers.is((Object)true));
        this.doTestSendingOldValue(builder, (KTableImpl<String, Integer, Integer>)table1, (KTableImpl<String, Integer, Integer>)table2, "topic1");
    }

    private void doTestSkipNullOnMaterialization(StreamsBuilder builder, KTableImpl<String, String, String> table1, KTableImpl<String, String, String> table2, String topic1) {
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        Topology topology = builder.build();
        topology.addProcessor("proc1", supplier, new String[]{table1.name});
        topology.addProcessor("proc2", supplier, new String[]{table2.name});
        try (TopologyTestDriver driver = new TopologyTestDriver(topology, this.props);){
            TestInputTopic stringinputTopic = driver.createInputTopic(topic1, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            stringinputTopic.pipeInput((Object)"A", (Object)"reject", 5L);
            stringinputTopic.pipeInput((Object)"B", (Object)"reject", 10L);
            stringinputTopic.pipeInput((Object)"C", (Object)"reject", 20L);
        }
        List processors = supplier.capturedProcessors(2);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<String, Change>("A", new Change((Object)"reject", null), 5L), new KeyValueTimestamp<String, Change>("B", new Change((Object)"reject", null), 10L), new KeyValueTimestamp<String, Change>("C", new Change((Object)"reject", null), 20L));
        processors.get(1).checkEmptyAndClearProcessResult();
    }

    @Test
    public void shouldSkipNullToRepartitionWithoutMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KTableImpl table1 = (KTableImpl)builder.table("topic1", consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((key, value) -> value.equalsIgnoreCase("accept")).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER);
        this.doTestSkipNullOnMaterialization(builder, (KTableImpl<String, String, String>)table1, (KTableImpl<String, String, String>)table2, "topic1");
    }

    @Test
    public void shouldSkipNullToRepartitionOnMaterialization() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        KTableImpl table1 = (KTableImpl)builder.table("topic1", consumed);
        KTableImpl table2 = (KTableImpl)table1.filter((key, value) -> value.equalsIgnoreCase("accept"), Materialized.as((String)"store2")).groupBy(MockMapper.noOpKeyValueMapper()).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.as((String)"mock-result"));
        this.doTestSkipNullOnMaterialization(builder, (KTableImpl<String, String, String>)table1, (KTableImpl<String, String, String>)table2, "topic1");
    }

    @Test
    public void testTypeVariance() {
        Predicate numberKeyPredicate = (key, value) -> false;
        new StreamsBuilder().table("empty").filter(numberKeyPredicate).filterNot(numberKeyPredicate).toStream().to("nirvana");
    }
}

