/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class StreamsUncaughtExceptionHandlerIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), 0L, 0L);
    public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30L);
    @Rule
    public TestName testName = new TestName();
    private static String inputTopic;
    private static StreamsBuilder builder;
    private static Properties properties;
    private static List<String> processorValueCollector;
    private static String appId;
    private static final AtomicBoolean THROW_ERROR;
    private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION;
    private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setup() {
        String testId = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        appId = "appId_" + testId;
        inputTopic = "input" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
        builder = new StreamsBuilder();
        processorValueCollector = new ArrayList<String>();
        KStream stream = builder.stream(inputTopic);
        stream.process(() -> new ShutdownProcessor(processorValueCollector), Named.as((String)"process"), new String[0]);
        properties = Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.stream.threads", (Object)2), Utils.mkEntry((Object)"default.key.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"default.value.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)StreamsConfig.consumerPrefix((String)"session.timeout.ms"), (Object)10000)}));
    }

    @After
    public void teardown() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(properties);
    }

    @Test
    public void shouldShutdownThreadUsingOldHandler() throws InterruptedException {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            AtomicInteger counter = new AtomicInteger(0);
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> counter.incrementAndGet());
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            this.produceMessages(0L, inputTopic, "A");
            TestUtils.waitForCondition(() -> counter.get() == 1, (String)"Handler was called 1st time");
            TestUtils.waitForCondition(() -> counter.get() == 2, (long)DEFAULT_DURATION.toMillis(), (String)"Handler was called 2nd time");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        }
    }

    @Test
    public void shouldShutdownClient() throws InterruptedException {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            this.produceMessages(0L, inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldShutdownClientWhenIllegalStateException() throws InterruptedException {
        THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            this.produceMessages(0L, inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
        finally {
            THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldShutdownClientWhenIllegalArgumentException() throws InterruptedException {
        THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            this.produceMessages(0L, inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
        finally {
            THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
        }
    }

    @Test
    public void shouldReplaceThreads() throws InterruptedException {
        this.testReplaceThreads(2);
    }

    @Test
    public void shouldReplaceSingleThread() throws InterruptedException {
        this.testReplaceThreads(1);
    }

    @Test
    public void shouldShutdownMultipleThreadApplication() throws InterruptedException {
        this.testShutdownApplication(2);
    }

    @Test
    public void shouldShutdownSingleThreadApplication() throws InterruptedException {
        this.testShutdownApplication(1);
    }

    @Test
    public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() throws InterruptedException {
        builder = new StreamsBuilder();
        builder.addGlobalStore((StoreBuilder)new KeyValueStoreBuilder(Stores.persistentKeyValueStore((String)"globalStore"), Serdes.String(), Serdes.String(), (Time)StreamsUncaughtExceptionHandlerIntegrationTest.CLUSTER.time), inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()), () -> new ShutdownProcessor(processorValueCollector));
        properties.put("num.stream.threads", (Object)0);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            this.produceMessages(0L, inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
    }

    private void produceMessages(long timestamp, String streamOneInput, String msg) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(streamOneInput, Collections.singletonList(new KeyValue((Object)"1", (Object)msg)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void testShutdownApplication(int numThreads) throws InterruptedException {
        properties.put("num.stream.threads", (Object)numThreads);
        Topology topology = builder.build();
        try (KafkaStreams kafkaStreams1 = new KafkaStreams(topology, properties);
             KafkaStreams kafkaStreams2 = new KafkaStreams(topology, properties);){
            kafkaStreams1.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams2.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            kafkaStreams1.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION);
            kafkaStreams2.setUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams1);
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams2);
            this.produceMessages(0L, inputTopic, "A");
            IntegrationTestUtils.waitForApplicationState(Arrays.asList(kafkaStreams1, kafkaStreams2), KafkaStreams.State.ERROR, DEFAULT_DURATION);
            MatcherAssert.assertThat((Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)1));
        }
    }

    private void testReplaceThreads(int numThreads) throws InterruptedException {
        properties.put("num.stream.threads", (Object)numThreads);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            kafkaStreams.setUncaughtExceptionHandler((t, e) -> Assert.fail((String)"should not hit old handler"));
            AtomicInteger count = new AtomicInteger();
            kafkaStreams.setUncaughtExceptionHandler(exception -> {
                if (count.incrementAndGet() == numThreads) {
                    THROW_ERROR.set(false);
                }
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
            StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
            this.produceMessages(0L, inputTopic, "A");
            TestUtils.waitForCondition(() -> count.get() == numThreads, (String)"finished replacing threads");
            TestUtils.waitForCondition(() -> THROW_ERROR.get(), (String)"finished replacing threads");
            kafkaStreams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            MatcherAssert.assertThat((String)"All initial threads have failed and the replacement thread had processed on record", (Object)processorValueCollector.size(), (Matcher)CoreMatchers.equalTo((Object)(numThreads + 1)));
        }
    }

    static {
        appId = "";
        THROW_ERROR = new AtomicBoolean(true);
        THROW_ILLEGAL_STATE_EXCEPTION = new AtomicBoolean(false);
        THROW_ILLEGAL_ARGUMENT_EXCEPTION = new AtomicBoolean(false);
    }

    private static class ShutdownProcessor
    extends AbstractProcessor<String, String> {
        final List<String> valueList;

        ShutdownProcessor(List<String> valueList) {
            this.valueList = valueList;
        }

        public void process(String key, String value) {
            this.valueList.add(value + " " + this.context.taskId());
            if (THROW_ERROR.get()) {
                if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
                    throw new IllegalStateException("Something unexpected happened in " + Thread.currentThread().getName());
                }
                if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
                    throw new IllegalArgumentException("Something unexpected happened in " + Thread.currentThread().getName());
                }
                throw new StreamsException(Thread.currentThread().getName());
            }
            THROW_ERROR.set(true);
        }
    }
}

