/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;

public class KafkaBinderMetrics
implements MeterBinder,
ApplicationListener<BindingCreatedEvent> {
    private static final int DEFAULT_TIMEOUT = 60;
    private static final Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
    static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
    private final KafkaMessageChannelBinder binder;
    private final KafkaBinderConfigurationProperties binderConfigurationProperties;
    private ConsumerFactory<?, ?> defaultConsumerFactory;
    private final MeterRegistry meterRegistry;
    private Map<String, Consumer<?, ?>> metadataConsumers;
    private int timeout = 60;

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties, ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
        this.binder = binder;
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.defaultConsumerFactory = defaultConsumerFactory;
        this.meterRegistry = meterRegistry;
        this.metadataConsumers = new ConcurrentHashMap();
    }

    public KafkaBinderMetrics(KafkaMessageChannelBinder binder, KafkaBinderConfigurationProperties binderConfigurationProperties) {
        this(binder, binderConfigurationProperties, null, null);
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void bindTo(MeterRegistry registry) {
        for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse().entrySet()) {
            if (!topicInfo.getValue().isConsumerTopic()) continue;
            String topic = topicInfo.getKey();
            String group = topicInfo.getValue().getConsumerGroup();
            Gauge.builder((String)METRIC_NAME, (Object)this, o -> this.computeUnconsumedMessages(topic, group)).tag("group", group).tag("topic", topic).description("Unconsumed messages for a particular group and topic").register(registry);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long computeUnconsumedMessages(String topic, String group) {
        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<Long> future = exec.submit(() -> {
            long lag = 0L;
            try {
                Consumer metadataConsumer;
                Consumer consumer = metadataConsumer = this.metadataConsumers.computeIfAbsent(group, g -> this.createConsumerFactory().createConsumer(g, "monitoring"));
                synchronized (consumer) {
                    List partitionInfos = metadataConsumer.partitionsFor(topic);
                    LinkedList<TopicPartition> topicPartitions = new LinkedList<TopicPartition>();
                    for (PartitionInfo partitionInfo : partitionInfos) {
                        topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    }
                    Map endOffsets = metadataConsumer.endOffsets(topicPartitions);
                    for (Map.Entry endOffset : endOffsets.entrySet()) {
                        OffsetAndMetadata current = metadataConsumer.committed((TopicPartition)endOffset.getKey());
                        lag += ((Long)endOffset.getValue()).longValue();
                        if (current == null) continue;
                        lag -= current.offset();
                    }
                }
            }
            catch (Exception ex) {
                LOG.debug((Object)("Cannot generate metric for topic: " + topic), (Throwable)ex);
            }
            return lag;
        });
        try {
            long l = future.get(this.timeout, TimeUnit.SECONDS);
            return l;
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            long l = 0L;
            return l;
        }
        catch (ExecutionException | TimeoutException ex) {
            long l = 0L;
            return l;
        }
        finally {
            exec.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerFactory<?, ?> createConsumerFactory() {
        if (this.defaultConsumerFactory == null) {
            KafkaBinderMetrics kafkaBinderMetrics = this;
            synchronized (kafkaBinderMetrics) {
                if (this.defaultConsumerFactory == null) {
                    HashMap<String, Object> props = new HashMap<String, Object>();
                    props.put("key.deserializer", ByteArrayDeserializer.class);
                    props.put("value.deserializer", ByteArrayDeserializer.class);
                    Map mergedConfig = this.binderConfigurationProperties.mergedConsumerConfiguration();
                    if (!ObjectUtils.isEmpty((Object)mergedConfig)) {
                        props.putAll(mergedConfig);
                    }
                    if (!props.containsKey("bootstrap.servers")) {
                        props.put("bootstrap.servers", this.binderConfigurationProperties.getKafkaConnectionString());
                    }
                    this.defaultConsumerFactory = new DefaultKafkaConsumerFactory(props);
                }
            }
        }
        return this.defaultConsumerFactory;
    }

    public void onApplicationEvent(BindingCreatedEvent event) {
        if (this.meterRegistry != null) {
            this.bindTo(this.meterRegistry);
        }
    }
}

