/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.streams;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class StreamsApp {
    private static final Logger log = LoggerFactory.getLogger(StreamsApp.class);
    static String inputTopic = "input-topic-" + RandomUtils.nextInt();
    static String outputTopicName = "my-output-topic-" + RandomUtils.nextInt();
    KafkaStreams streams;
    ParallelStreamProcessor<String, String> parallelConsumer;
    AtomicInteger messageCount = new AtomicInteger();

    Consumer<String, String> getKafkaConsumer() {
        return new KafkaConsumer(new Properties());
    }

    Producer<String, String> getKafkaProducer() {
        return new KafkaProducer(new Properties());
    }

    void run() {
        this.preprocess();
        this.concurrentProcess();
    }

    void preprocess() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(inputTopic).mapValues((key, value) -> {
            log.info("Streams preprocessing key: {} value: {}", key, value);
            return String.valueOf(value.length());
        }).to(outputTopicName);
        this.startStreams(builder.build());
    }

    void startStreams(Topology topology) {
        this.streams = new KafkaStreams(topology, this.getStreamsProperties());
        this.streams.start();
    }

    void concurrentProcess() {
        this.setupParallelConsumer();
        this.parallelConsumer.poll(record -> {
            log.info("Concurrently processing a record: {}", record);
            this.messageCount.getAndIncrement();
        });
    }

    private void setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        Producer<String, String> kafkaProducer = this.getKafkaProducer();
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxMessagesToQueue(1000).maxNumberMessagesBeyondBaseCommitOffset(10000).consumer(kafkaConsumer).producer(kafkaProducer).build();
        this.parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor((ParallelConsumerOptions)options);
        this.parallelConsumer.subscribe((Collection)UniLists.of((Object)outputTopicName));
    }

    Properties getStreamsProperties() {
        Properties props = new Properties();
        props.put("application.id", this.getClass().getName());
        props.put("bootstrap.servers", this.getServerConfig());
        props.put("default.key.serde", Serdes.String().getClass());
        props.put("default.value.serde", Serdes.String().getClass());
        return props;
    }

    String getServerConfig() {
        return "add your server here";
    }

    void close() {
        this.streams.close();
        this.parallelConsumer.close();
    }
}

