/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.vertx;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.vertx.JStreamVertxParallelStreamProcessor;
import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

public class VertxApp {
    private static final Logger log = LoggerFactory.getLogger(VertxApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    JStreamVertxParallelStreamProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(1000).maxUncommittedMessagesToHandlePerPartition(10000).build();
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        this.setupSubscription(kafkaConsumer);
        this.parallelConsumer = JStreamVertxParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, this.getKafkaProducer(), (ParallelConsumerOptions)options);
        Stream resultStream = this.parallelConsumer.vertxHttpReqInfoStream(record -> {
            log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
            Map params = UniMaps.of((Object)"recordKey", (Object)((String)record.key()), (Object)"payload", (Object)((String)record.value()));
            return new VertxParallelEoSStreamProcessor.RequestInfo("localhost", "/api", params);
        });
        resultStream.forEach(x -> log.info("From result stream: {}", x));
    }

    void setupSubscription(Consumer<String, String> kafkaConsumer) {
        kafkaConsumer.subscribe((Collection)UniLists.of((Object)inputTopic));
    }

    void close() {
        this.parallelConsumer.closeDrainFirst(Duration.ofSeconds(2L));
    }
}

