/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.examples.metrics;

import com.sun.net.httpserver.HttpServer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class CoreApp {
    private static final Logger log = LoggerFactory.getLogger(CoreApp.class);
    public static final String METRICS_ENDPOINT = "/prometheus";
    final PrometheusMeterRegistry meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
    String inputTopic = "input-topic";
    String outputTopic = "output-topic-" + RandomUtils.nextInt();
    private final Map<String, String> envVars = System.getenv();
    private KafkaClientMetrics kafkaClientMetrics;
    ParallelStreamProcessor<String, String> parallelConsumer;
    private final ExecutorService metricsEndpointExecutor = Executors.newSingleThreadExecutor();

    Consumer<String, String> getKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.envVars.getOrDefault("BOOTSTRAP_SERVERS", "kafka:9092"));
        props.put("group.id", this.envVars.getOrDefault("GROUP_ID", "pc-instance"));
        props.put("enable.auto.commit", (Object)false);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer(props);
    }

    void setupPrometheusEndpoint() {
        try {
            HttpServer server = HttpServer.create(new InetSocketAddress(7001), 0);
            server.createContext(METRICS_ENDPOINT, httpExchange -> {
                String response = this.meterRegistry.scrape();
                httpExchange.sendResponseHeaders(200, response.getBytes().length);
                try (OutputStream os = httpExchange.getResponseBody();){
                    os.write(response.getBytes());
                }
            });
            this.metricsEndpointExecutor.submit(server::start);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    void run() {
        this.parallelConsumer = this.setupParallelConsumer();
        this.postSetup();
        this.parallelConsumer.poll(record -> log.info("Concurrently processing a record: {}", record));
    }

    protected void postSetup() {
        this.setupPrometheusEndpoint();
    }

    ParallelStreamProcessor<String, String> setupParallelConsumer() {
        Consumer<String, String> kafkaConsumer = this.getKafkaConsumer();
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(1000).consumer(kafkaConsumer).meterRegistry((MeterRegistry)this.meterRegistry).metricsTags((Iterable)Tags.of((Tag[])new Tag[]{Tag.of((String)"instance", (String)"pc1")})).build();
        ParallelStreamProcessor eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor((ParallelConsumerOptions)options);
        eosStreamProcessor.subscribe((Collection)UniLists.of((Object)this.inputTopic));
        this.kafkaClientMetrics = new KafkaClientMetrics(kafkaConsumer);
        this.kafkaClientMetrics.bindTo((MeterRegistry)this.meterRegistry);
        return eosStreamProcessor;
    }

    void close() {
        this.kafkaClientMetrics.close();
        this.parallelConsumer.close();
        this.metricsEndpointExecutor.shutdownNow();
    }
}

