/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
@Category(value={IntegrationTest.class})
public class SuppressionDurabilityIntegrationTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[0])), 0L);
    @Rule
    public TestName testName = new TestName();
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private static final long COMMIT_INTERVAL = 100L;
    @Parameterized.Parameter
    public String processingGuaranteee;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Parameterized.Parameters(name="{0}")
    public static Collection<String[]> data() {
        return Arrays.asList({"at_least_once"}, {"exactly_once"}, {"exactly_once_v2"});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldRecoverBufferAfterShutdown() {
        String testId = IntegrationTestUtils.safeUniqueTestName(this.testName);
        String appId = "appId_" + testId;
        String input = "input" + testId;
        String storeName = "counts";
        String outputSuppressed = "output-suppressed" + testId;
        String outputRaw = "output-raw" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, input, outputRaw, outputSuppressed);
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream(input, Consumed.with(STRING_SERDE, STRING_SERDE)).groupByKey().count(Materialized.as((String)"counts").withCachingDisabled());
        KStream suppressedCounts = valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)3L).emitEarlyWhenFull())).toStream();
        AtomicInteger eventCount = new AtomicInteger(0);
        suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
        MetadataValidator metadataValidator = new MetadataValidator(input);
        suppressedCounts.transform((TransformerSupplier)metadataValidator, new String[0]).to(outputSuppressed, Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().transform((TransformerSupplier)metadataValidator, new String[0]).to(outputRaw, Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"poll.ms", (Object)Long.toString(100L)), Utils.mkEntry((Object)"processing.guarantee", (Object)this.processingGuaranteee), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath())}));
        streamsConfig.put("commit.interval.ms", (Object)100L);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            SuppressionDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(input, Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v2", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("k3", "v3", this.scaledTime(3L))));
            this.verifyOutput(outputRaw, new HashSet<KeyValueTimestamp<String, Long>>(Arrays.asList(new KeyValueTimestamp<String, Long>("k1", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("k2", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("k3", 1L, this.scaledTime(3L)))));
            MatcherAssert.assertThat((Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)0));
            SuppressionDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(input, Arrays.asList(new KeyValueTimestamp<String, String>("k4", "v4", this.scaledTime(4L)), new KeyValueTimestamp<String, String>("k5", "v5", this.scaledTime(5L))));
            this.verifyOutput(outputRaw, new HashSet<KeyValueTimestamp<String, Long>>(Arrays.asList(new KeyValueTimestamp<String, Long>("k4", 1L, this.scaledTime(4L)), new KeyValueTimestamp<String, Long>("k5", 1L, this.scaledTime(5L)))));
            MatcherAssert.assertThat((Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)2));
            this.verifyOutput(outputSuppressed, Arrays.asList(new KeyValueTimestamp<String, Long>("k1", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("k2", 1L, this.scaledTime(2L))));
            driver.close();
            MatcherAssert.assertThat((Object)driver.state(), (Matcher)CoreMatchers.is((Object)KafkaStreams.State.NOT_RUNNING));
            driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, false);
            SuppressionDurabilityIntegrationTest.produceSynchronouslyToPartitionZero(input, Arrays.asList(new KeyValueTimestamp<String, String>("k6", "v6", this.scaledTime(6L)), new KeyValueTimestamp<String, String>("k7", "v7", this.scaledTime(7L)), new KeyValueTimestamp<String, String>("k8", "v8", this.scaledTime(8L))));
            this.verifyOutput(outputRaw, new HashSet<KeyValueTimestamp<String, Long>>(Arrays.asList(new KeyValueTimestamp<String, Long>("k6", 1L, this.scaledTime(6L)), new KeyValueTimestamp<String, Long>("k7", 1L, this.scaledTime(7L)), new KeyValueTimestamp<String, Long>("k8", 1L, this.scaledTime(8L)))));
            MatcherAssert.assertThat((String)"suppress has apparently produced some duplicates. There should only be 5 output events.", (Object)eventCount.get(), (Matcher)CoreMatchers.is((Object)5));
            this.verifyOutput(outputSuppressed, Arrays.asList(new KeyValueTimestamp<String, Long>("k3", 1L, this.scaledTime(3L)), new KeyValueTimestamp<String, Long>("k4", 1L, this.scaledTime(4L)), new KeyValueTimestamp<String, Long>("k5", 1L, this.scaledTime(5L))));
            metadataValidator.raiseExceptionIfAny();
        }
        finally {
            driver.close();
            IntegrationTestUtils.quietlyCleanStateAfterTest(CLUSTER, driver);
        }
    }

    private void verifyOutput(String topic, List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"group.id", (Object)"test-group"), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.deserializer", (Object)LONG_DESERIALIZER.getClass().getName())}));
        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
    }

    private void verifyOutput(String topic, Set<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"group.id", (Object)"test-group"), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.deserializer", (Object)LONG_DESERIALIZER.getClass().getName())}));
        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
    }

    private long scaledTime(long unscaledTime) {
        return 200L * unscaledTime;
    }

    private static void produceSynchronouslyToPartitionZero(String topic, List<KeyValueTimestamp<String, String>> toProduce) {
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.of(0), toProduce);
    }

    private static final class MetadataValidator
    implements TransformerSupplier<String, Long, KeyValue<String, Long>> {
        private static final Logger LOG = LoggerFactory.getLogger(MetadataValidator.class);
        private final AtomicReference<Throwable> firstException = new AtomicReference();
        private final String topic;

        public MetadataValidator(String topic) {
            this.topic = topic;
        }

        public Transformer<String, Long, KeyValue<String, Long>> get() {
            return new Transformer<String, Long, KeyValue<String, Long>>(){
                private ProcessorContext context;

                public void init(ProcessorContext context) {
                    this.context = context;
                }

                public KeyValue<String, Long> transform(String key, Long value) {
                    try {
                        MatcherAssert.assertThat((Object)this.context.topic(), (Matcher)Matchers.equalTo((Object)topic));
                    }
                    catch (Throwable e) {
                        firstException.compareAndSet(null, e);
                        LOG.error("Validation Failed", e);
                    }
                    return new KeyValue((Object)key, (Object)value);
                }

                public void close() {
                }
            };
        }

        void raiseExceptionIfAny() {
            Throwable exception = this.firstException.get();
            if (exception != null) {
                throw new AssertionError("Got an exception during run", exception);
            }
        }
    }
}

