/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final MockTime MOCK_TIME = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.CLUSTER.time;
    private static final String TABLE_1 = "table1";
    private static final String TABLE_2 = "table2";
    private static final String OUTPUT = "output-";
    private final Properties streamsConfig = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.getStreamsConfig();
    private final Properties streamsConfigTwo = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.getStreamsConfig();
    private final Properties streamsConfigThree = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.getStreamsConfig();
    private KafkaStreams streams;
    private KafkaStreams streamsTwo;
    private KafkaStreams streamsThree;
    private static final Properties CONSUMER_CONFIG = new Properties();
    private static final Properties PRODUCER_CONFIG_1 = new Properties();
    private static final Properties PRODUCER_CONFIG_2 = new Properties();

    @BeforeClass
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopic(TABLE_1, 4, 1);
        CLUSTER.createTopic(TABLE_2, 4, 1);
        CLUSTER.createTopic(OUTPUT, 4, 1);
        PRODUCER_CONFIG_1.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_1.put("acks", "all");
        PRODUCER_CONFIG_1.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG_1.put("value.serializer", StringSerializer.class);
        PRODUCER_CONFIG_2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_2.put("acks", "all");
        PRODUCER_CONFIG_2.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG_2.put("value.serializer", StringSerializer.class);
        List table1 = Arrays.asList(new KeyValue((Object)"ID123-1", (Object)"ID123-A1"), new KeyValue((Object)"ID123-2", (Object)"ID123-A2"), new KeyValue((Object)"ID123-3", (Object)"ID123-A3"), new KeyValue((Object)"ID123-4", (Object)"ID123-A4"));
        List table2 = Arrays.asList(new KeyValue((Object)"ID123", (Object)"BBB"));
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, (Time)MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, (Time)MOCK_TIME);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() throws IOException {
        String stateDirBasePath = TestUtils.tempDirectory().getPath();
        this.streamsConfig.put("state.dir", stateDirBasePath + "-1");
        this.streamsConfigTwo.put("state.dir", stateDirBasePath + "-2");
        this.streamsConfigThree.put("state.dir", stateDirBasePath + "-3");
    }

    @After
    public void after() throws IOException {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        if (this.streamsTwo != null) {
            this.streamsTwo.close();
            this.streamsTwo = null;
        }
        if (this.streamsThree != null) {
            this.streamsThree.close();
            this.streamsThree = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(Arrays.asList(this.streamsConfig, this.streamsConfigTwo, this.streamsConfigThree));
    }

    @Test
    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
        HashSet<KeyValue<String, String>> expectedOne = new HashSet<KeyValue<String, String>>();
        expectedOne.add(new KeyValue((Object)"ID123-1", (Object)"value1=ID123-A1,value2=BBB"));
        expectedOne.add(new KeyValue((Object)"ID123-2", (Object)"value1=ID123-A2,value2=BBB"));
        expectedOne.add(new KeyValue((Object)"ID123-3", (Object)"value1=ID123-A3,value2=BBB"));
        expectedOne.add(new KeyValue((Object)"ID123-4", (Object)"value1=ID123-A4,value2=BBB"));
        this.verifyKTableKTableJoin(expectedOne);
    }

    private void verifyKTableKTableJoin(Set<KeyValue<String, String>> expectedResult) throws Exception {
        String innerJoinType = "INNER";
        String queryableName = "INNER-store1";
        this.streams = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.prepareTopology("INNER-store1", this.streamsConfig);
        this.streamsTwo = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.prepareTopology("INNER-store1", this.streamsConfigTwo);
        this.streamsThree = KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.prepareTopology("INNER-store1", this.streamsConfigThree);
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(this.streams, this.streamsTwo, this.streamsThree);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(120L));
        HashSet result = new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, expectedResult.size()));
        Assert.assertEquals(expectedResult, result);
    }

    private static Properties getStreamsConfig() {
        Properties streamsConfig = new Properties();
        streamsConfig.put("application.id", "KTable-FKJ-Partitioner");
        streamsConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfig.put("auto.offset.reset", "earliest");
        streamsConfig.put("cache.max.bytes.buffering", (Object)0);
        streamsConfig.put("commit.interval.ms", (Object)100L);
        return streamsConfig;
    }

    private static KafkaStreams prepareTopology(String queryableName, Properties streamsConfig) {
        UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder builder = new StreamsBuilder();
        KTable table1 = builder.stream(TABLE_1, Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))).repartition(KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.repartitionA()).toTable(Named.as((String)"table.a"));
        KTable table2 = builder.stream(TABLE_2, Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))).repartition(KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.repartitionB()).toTable(Named.as((String)"table.b"));
        if (queryableName == null) {
            throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
        }
        Materialized materialized = Materialized.as((String)queryableName).withKeySerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true)).withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)).withCachingDisabled();
        ValueJoiner joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
        TableJoined tableJoined = TableJoined.with((topic, key, value, numPartitions) -> Math.abs(KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.getKeyB(key).hashCode()) % numPartitions, (topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions);
        table1.join(table2, KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest::getKeyB, joiner, tableJoined, materialized).toStream().to(OUTPUT, Produced.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
    }

    private static Repartitioned<String, String> repartitionA() {
        Repartitioned repartitioned = Repartitioned.as((String)"a");
        return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.getKeyB(key).hashCode()) % numPartitions).withNumberOfPartitions(4);
    }

    private static Repartitioned<String, String> repartitionB() {
        Repartitioned repartitioned = Repartitioned.as((String)"b");
        return repartitioned.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withStreamPartitioner((topic, key, value, numPartitions) -> Math.abs(key.hashCode()) % numPartitions).withNumberOfPartitions(4);
    }

    private static String getKeyB(String value) {
        return value.substring(0, value.indexOf("-"));
    }
}

