/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category(value={IntegrationTest.class})
public class JoinStoreIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    private static final String APP_ID = "join-store-integration-test";
    private static final Long COMMIT_INTERVAL = 100L;
    static final Properties STREAMS_CONFIG = new Properties();
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
    static final String OUTPUT_TOPIC = "outputTopic";
    StreamsBuilder builder;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void prepareTopology() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
        this.builder = new StreamsBuilder();
    }

    @After
    public void cleanup() throws InterruptedException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
    }

    @Test
    public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws InterruptedException {
        STREAMS_CONFIG.put("application.id", "join-store-integration-test-no-store-access");
        StreamsBuilder builder = new StreamsBuilder();
        KStream left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        KStream right = builder.stream(INPUT_TOPIC_RIGHT, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Integer()));
        CountDownLatch latch = new CountDownLatch(1);
        left.join(right, (value1, value2) -> value1 + value2, JoinWindows.of((Duration)Duration.ofMillis(100L)), StreamJoined.with((Serde)Serdes.String(), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()).withStoreName("join-store"));
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG);){
            kafkaStreams.setStateListener((newState, oldState) -> {
                if (newState == KafkaStreams.State.RUNNING) {
                    latch.countDown();
                }
            });
            kafkaStreams.start();
            latch.await();
            UnknownStateStoreException exception = (UnknownStateStoreException)Assert.assertThrows(UnknownStateStoreException.class, () -> {
                ReadOnlyKeyValueStore cfr_ignored_0 = (ReadOnlyKeyValueStore)kafkaStreams.store(StoreQueryParameters.fromNameAndType((String)"join-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
            });
            MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"Cannot get state store join-store because no such store is registered in the topology."));
        }
    }
}

