/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.reactor;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.reactor.ReactorProcessor;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;
import reactor.core.publisher.Mono;

public class ReactorApp {
    private static final Logger log = LoggerFactory.getLogger(ReactorApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    ReactorProcessor<String, String> parallelConsumer;

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        Producer<String, String> kafkaProducer = this.getKafkaProducer();
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).consumer(kafkaConsumer).producer(kafkaProducer).build();
        this.parallelConsumer = new ReactorProcessor(options);
        this.parallelConsumer.subscribe((Collection)UniLists.of((Object)inputTopic));
        this.postSetup();
        int port = this.getPort();
        this.parallelConsumer.react(context -> {
            ConsumerRecord consumerRecord = context.getSingleRecord().getConsumerRecord();
            log.info("Concurrently constructing and returning RequestInfo from record: {}", (Object)consumerRecord);
            Map params = UniMaps.of((Object)"recordKey", (Object)((String)consumerRecord.key()), (Object)"payload", (Object)((String)consumerRecord.value()));
            return Mono.just((Object)"something todo");
        });
    }

    protected int getPort() {
        return 8080;
    }

    void close() {
        this.parallelConsumer.closeDrainFirst();
    }

    protected void postSetup() {
    }
}

