/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={IntegrationTest.class})
public class StandbyTaskEOSMultiRebalanceIntegrationTest {
    private static final Logger LOG = LoggerFactory.getLogger(StandbyTaskEOSMultiRebalanceIntegrationTest.class);
    private static final long TWO_MINUTE_TIMEOUT = Duration.ofMinutes(2L).toMillis();
    private String appId;
    private String inputTopic;
    private String storeName;
    private String counterName;
    private String outputTopic;
    private KafkaStreams streamInstanceOne;
    private KafkaStreams streamInstanceTwo;
    private KafkaStreams streamInstanceThree;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    private final int partitionCount = 12;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        String safeTestName = UUID.randomUUID().toString();
        this.appId = "app-" + safeTestName;
        this.inputTopic = "input-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        this.storeName = "store-" + safeTestName;
        this.counterName = "counter-" + safeTestName;
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic);
        CLUSTER.createTopic(this.inputTopic, 12, 3);
        CLUSTER.createTopic(this.outputTopic, 12, 3);
    }

    @After
    public void cleanUp() {
        if (this.streamInstanceOne != null) {
            this.streamInstanceOne.close();
        }
        if (this.streamInstanceTwo != null) {
            this.streamInstanceTwo.close();
        }
        if (this.streamInstanceThree != null) {
            this.streamInstanceThree.close();
        }
    }

    @Test
    public void shouldHonorEOSWhenUsingCachingAndStandbyReplicas() throws Exception {
        Properties readCommitted = new Properties();
        readCommitted.setProperty("isolation.level", "read_committed");
        long time = System.currentTimeMillis();
        String base = TestUtils.tempDirectory((String)this.appId).getPath();
        int initialBulk = 3000;
        int secondBulk = 60000;
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, IntStream.range(0, 3000).boxed().map(i -> new KeyValue(i, i)).collect(Collectors.toList()), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L + time);
        this.streamInstanceOne = this.buildWithUniqueIdAssignmentTopology(base + "-1");
        this.streamInstanceTwo = this.buildWithUniqueIdAssignmentTopology(base + "-2");
        this.streamInstanceThree = this.buildWithUniqueIdAssignmentTopology(base + "-3");
        LOG.info("start first instance and wait for completed processing");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streamInstanceOne), Duration.ofSeconds(30L));
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)UUID.randomUUID().toString(), IntegerDeserializer.class, IntegerDeserializer.class, (Properties)readCommitted), this.outputTopic, 3000);
        LOG.info("Finished reading the initial bulk");
        LOG.info("start second instance and wait for standby replication");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streamInstanceTwo), Duration.ofSeconds(30L));
        TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)this.streamInstanceTwo.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (long)TWO_MINUTE_TIMEOUT, (String)"Could not get key from standby store");
        LOG.info("Second stream have some data in the state store");
        LOG.info("Produce the second bulk");
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, IntStream.range(3000, 63000).boxed().map(i -> new KeyValue(i, i)).collect(Collectors.toList()), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 1000L + time);
        LOG.info("Start stream three which will introduce a re-balancing event and hopefully some redistribution of tasks.");
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streamInstanceThree), Duration.ofSeconds(90L));
        LOG.info("Wait for the processing to be completed");
        List outputRecords = IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), (String)UUID.randomUUID().toString(), IntegerDeserializer.class, IntegerDeserializer.class, (Properties)readCommitted), this.outputTopic, 63000, Duration.ofMinutes(10L).toMillis());
        LOG.info("Processing completed");
        outputRecords.stream().collect(Collectors.groupingBy(ConsumerRecord::value)).forEach(this::logIfDuplicate);
        MatcherAssert.assertThat((String)"Each output should correspond to one distinct value", (Object)outputRecords.stream().map(ConsumerRecord::value).distinct().count(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)outputRecords.size())));
    }

    private void logIfDuplicate(Integer id, List<ConsumerRecord<Integer, Integer>> record) {
        MatcherAssert.assertThat((String)"The id and the value in the records must match", (boolean)record.stream().allMatch(r -> id.equals(r.value())));
        if (record.size() > 1) {
            LOG.warn("Id : " + id + " is assigned to the following " + record.stream().map(ConsumerRecord::key).collect(Collectors.toList()));
        }
    }

    private KafkaStreams buildWithUniqueIdAssignmentTopology(String stateDirPath) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)this.storeName), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        builder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)this.counterName), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()).withCachingEnabled());
        builder.stream(this.inputTopic).process(() -> new Processor<Integer, Integer, Integer, Integer>(){
            private KeyValueStore store;
            private KeyValueStore counter;
            private ProcessorContext context;

            public void init(ProcessorContext<Integer, Integer> context) {
                this.context = context;
                this.store = (KeyValueStore)context.getStateStore(StandbyTaskEOSMultiRebalanceIntegrationTest.this.storeName);
                this.counter = (KeyValueStore)context.getStateStore(StandbyTaskEOSMultiRebalanceIntegrationTest.this.counterName);
            }

            public void process(Record<Integer, Integer> record) {
                Integer key = (Integer)record.key();
                Integer unused = (Integer)record.value();
                MatcherAssert.assertThat((String)"Key and value mus be equal", (boolean)key.equals(unused));
                Integer id = (Integer)this.store.get((Object)key);
                if (id == null) {
                    boolean counterKey = false;
                    Integer lastCounter = (Integer)this.counter.get((Object)0);
                    int newCounter = lastCounter == null ? 0 : lastCounter + 1;
                    this.counter.put((Object)0, (Object)newCounter);
                    id = newCounter * 12 + ((RecordMetadata)this.context.recordMetadata().get()).partition();
                    this.store.put((Object)key, (Object)id);
                }
                this.context.forward(record.withKey((Object)id));
            }

            public void close() {
            }
        }, new String[]{this.storeName, this.counterName}).to(this.outputTopic);
        return new KafkaStreams(builder.build(), this.props(stateDirPath));
    }

    private Properties props(String stateDirPath) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", stateDirPath);
        streamsConfiguration.put("num.standby.replicas", (Object)1);
        streamsConfiguration.put("num.stream.threads", (Object)4);
        streamsConfiguration.put("max.warmup.replicas", (Object)1);
        streamsConfiguration.put("processing.guarantee", "exactly_once_v2");
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)100L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }
}

