/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.FloatSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final MockTime MOCK_TIME = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.CLUSTER.time;
    private static final String TABLE_1 = "table1";
    private static final String TABLE_2 = "table2";
    private static final String TABLE_3 = "table3";
    private static final String OUTPUT = "output-";
    private final Properties streamsConfig = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.getStreamsConfig();
    private final Properties streamsConfigTwo = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.getStreamsConfig();
    private final Properties streamsConfigThree = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.getStreamsConfig();
    private KafkaStreams streams;
    private KafkaStreams streamsTwo;
    private KafkaStreams streamsThree;
    private static final Properties CONSUMER_CONFIG = new Properties();
    private static final Properties PRODUCER_CONFIG_1 = new Properties();
    private static final Properties PRODUCER_CONFIG_2 = new Properties();
    private static final Properties PRODUCER_CONFIG_3 = new Properties();

    @BeforeClass
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopic(TABLE_1, 3, 1);
        CLUSTER.createTopic(TABLE_2, 5, 1);
        CLUSTER.createTopic(TABLE_3, 7, 1);
        CLUSTER.createTopic(OUTPUT, 11, 1);
        PRODUCER_CONFIG_1.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_1.put("acks", "all");
        PRODUCER_CONFIG_1.put("key.serializer", IntegerSerializer.class);
        PRODUCER_CONFIG_1.put("value.serializer", FloatSerializer.class);
        PRODUCER_CONFIG_2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_2.put("acks", "all");
        PRODUCER_CONFIG_2.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG_2.put("value.serializer", LongSerializer.class);
        PRODUCER_CONFIG_3.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG_3.put("acks", "all");
        PRODUCER_CONFIG_3.put("key.serializer", IntegerSerializer.class);
        PRODUCER_CONFIG_3.put("value.serializer", StringSerializer.class);
        List table1 = Arrays.asList(new KeyValue((Object)1, (Object)Float.valueOf(1.33f)), new KeyValue((Object)2, (Object)Float.valueOf(2.22f)), new KeyValue((Object)3, (Object)Float.valueOf(-1.22f)), new KeyValue((Object)4, (Object)Float.valueOf(-2.22f)));
        List table2 = Arrays.asList(new KeyValue((Object)"0", (Object)0L), new KeyValue((Object)"1", (Object)10L), new KeyValue((Object)"2", (Object)20L), new KeyValue((Object)"3", (Object)30L), new KeyValue((Object)"4", (Object)40L), new KeyValue((Object)"5", (Object)50L), new KeyValue((Object)"6", (Object)60L), new KeyValue((Object)"7", (Object)70L), new KeyValue((Object)"8", (Object)80L), new KeyValue((Object)"9", (Object)90L));
        List table3 = Collections.singletonList(new KeyValue((Object)10, (Object)"waffle"));
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, (Time)MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, PRODUCER_CONFIG_2, (Time)MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, PRODUCER_CONFIG_3, (Time)MOCK_TIME);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-consumer");
        CONSUMER_CONFIG.put("key.deserializer", IntegerDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void before() throws IOException {
        String stateDirBasePath = TestUtils.tempDirectory().getPath();
        this.streamsConfig.put("state.dir", stateDirBasePath + "-1");
        this.streamsConfigTwo.put("state.dir", stateDirBasePath + "-2");
        this.streamsConfigThree.put("state.dir", stateDirBasePath + "-3");
    }

    @After
    public void after() throws IOException {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        if (this.streamsTwo != null) {
            this.streamsTwo.close();
            this.streamsTwo = null;
        }
        if (this.streamsThree != null) {
            this.streamsThree.close();
            this.streamsThree = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(Arrays.asList(this.streamsConfig, this.streamsConfigTwo, this.streamsConfigThree));
    }

    @Test
    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
        HashSet<KeyValue<Integer, String>> expectedOne = new HashSet<KeyValue<Integer, String>>();
        expectedOne.add(new KeyValue((Object)1, (Object)"value1=1.33,value2=10,value3=waffle"));
        this.verifyKTableKTableJoin(expectedOne);
    }

    private void verifyKTableKTableJoin(Set<KeyValue<Integer, String>> expectedResult) throws Exception {
        String innerJoinType = "INNER";
        String queryableName = "INNER-store1";
        String queryableNameTwo = "INNER-store2";
        this.streams = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.prepareTopology("INNER-store1", "INNER-store2", this.streamsConfig);
        this.streamsTwo = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.prepareTopology("INNER-store1", "INNER-store2", this.streamsConfigTwo);
        this.streamsThree = KTableKTableForeignKeyInnerJoinMultiIntegrationTest.prepareTopology("INNER-store1", "INNER-store2", this.streamsConfigThree);
        List<KafkaStreams> kafkaStreamsList = Arrays.asList(this.streams, this.streamsTwo, this.streamsThree);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(120L));
        HashSet result = new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, expectedResult.size()));
        Assert.assertEquals(expectedResult, result);
    }

    private static Properties getStreamsConfig() {
        Properties streamsConfig = new Properties();
        streamsConfig.put("application.id", "KTable-FKJ-Multi");
        streamsConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfig.put("auto.offset.reset", "earliest");
        streamsConfig.put("cache.max.bytes.buffering", (Object)0);
        streamsConfig.put("commit.interval.ms", (Object)100L);
        return streamsConfig;
    }

    private static KafkaStreams prepareTopology(String queryableName, String queryableNameTwo, Properties streamsConfig) {
        UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder builder = new StreamsBuilder();
        KTable table1 = builder.table(TABLE_1, Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), serdeScope.decorateSerde(Serdes.Float(), streamsConfig, false)));
        KTable table2 = builder.table(TABLE_2, Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true), serdeScope.decorateSerde(Serdes.Long(), streamsConfig, false)));
        KTable table3 = builder.table(TABLE_3, Consumed.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        if (queryableName == null) {
            throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
        }
        Materialized materialized = Materialized.as((String)queryableName).withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true)).withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)).withCachingDisabled();
        if (queryableNameTwo == null) {
            throw new RuntimeException("Current implementation of joinOnForeignKey requires a materialized store");
        }
        Materialized materializedTwo = Materialized.as((String)queryableNameTwo).withKeySerde(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true)).withValueSerde(serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)).withCachingDisabled();
        Function<Float, String> tableOneKeyExtractor = value -> Integer.toString((int)value.floatValue());
        Function<String, Integer> joinedTableKeyExtractor = value -> {
            if (value.contains("value2=10")) {
                return 10;
            }
            return 0;
        };
        ValueJoiner joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
        ValueJoiner joinerTwo = (value1, value2) -> value1 + ",value3=" + value2;
        table1.join(table2, tableOneKeyExtractor, joiner, materialized).join(table3, joinedTableKeyExtractor, joinerTwo, materializedTwo).toStream().to(OUTPUT, Produced.with(serdeScope.decorateSerde(Serdes.Integer(), streamsConfig, true), serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)));
        return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
    }
}

