/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka.spout.metrics2;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaOffsetTopicMetrics
implements MetricSet {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class);
    private String topic;
    Set<TopicPartition> assignment;
    Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
    Supplier<Admin> adminSupplier;

    public KafkaOffsetTopicMetrics(String topic, Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<Admin> adminSupplier, Set<TopicPartition> newAssignment) {
        this.topic = topic;
        this.assignment = newAssignment;
        this.offsetManagerSupplier = offsetManagerSupplier;
        this.adminSupplier = adminSupplier;
        LOG.info("Create KafkaOffsetTopicMetrics for topic: {}", (Object)topic);
    }

    public Map<String, Metric> getMetrics() {
        HashMap<String, Metric> metrics = new HashMap<String, Metric>();
        Gauge totalSpoutLagGauge = () -> {
            Long totalSpoutLag = 0L;
            for (TopicPartition topicPartition : this.assignment) {
                String topicOfPartition = topicPartition.topic();
                if (!topicOfPartition.equals(this.topic)) continue;
                Map<TopicPartition, Long> endOffsets = KafkaOffsetUtil.getEndOffsets(Collections.singleton(topicPartition), this.adminSupplier);
                if (endOffsets == null || endOffsets.isEmpty()) {
                    LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", (Object)topicPartition);
                    return 0L;
                }
                OffsetManager offsetManager = this.offsetManagerSupplier.get().get(topicPartition);
                Long ret = endOffsets.get(topicPartition) - offsetManager.getCommittedOffset();
                totalSpoutLag = totalSpoutLag + ret;
            }
            return totalSpoutLag;
        };
        Gauge totalEarliestTimeOffsetGauge = () -> {
            Long totalEarliestTimeOffset = 0L;
            for (TopicPartition topicPartition : this.assignment) {
                String topicOfPartition = topicPartition.topic();
                if (!topicOfPartition.equals(this.topic)) continue;
                Map<TopicPartition, Long> beginningOffsets = KafkaOffsetUtil.getBeginningOffsets(Collections.singleton(topicPartition), this.adminSupplier);
                if (beginningOffsets == null || beginningOffsets.isEmpty()) {
                    LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", (Object)topicPartition);
                    return 0L;
                }
                Long ret = beginningOffsets.get(topicPartition);
                totalEarliestTimeOffset = totalEarliestTimeOffset + ret;
            }
            return totalEarliestTimeOffset;
        };
        Gauge totalLatestTimeOffsetGauge = () -> {
            Long totalLatestTimeOffset = 0L;
            for (TopicPartition topicPartition : this.assignment) {
                String topicOfPartition = topicPartition.topic();
                if (!topicOfPartition.equals(this.topic)) continue;
                Map<TopicPartition, Long> endOffsets = KafkaOffsetUtil.getEndOffsets(Collections.singleton(topicPartition), this.adminSupplier);
                if (endOffsets == null || endOffsets.isEmpty()) {
                    LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", (Object)topicPartition);
                    return 0L;
                }
                Long ret = endOffsets.get(topicPartition);
                totalLatestTimeOffset = totalLatestTimeOffset + ret;
            }
            return totalLatestTimeOffset;
        };
        Gauge totalLatestEmittedOffsetGauge = () -> {
            Long totalLatestEmittedOffset = 0L;
            for (TopicPartition topicPartition : this.assignment) {
                String topicOfPartition = topicPartition.topic();
                if (!topicOfPartition.equals(this.topic)) continue;
                OffsetManager offsetManager = this.offsetManagerSupplier.get().get(topicPartition);
                Long ret = offsetManager.getLatestEmittedOffset();
                totalLatestEmittedOffset = totalLatestEmittedOffset + ret;
            }
            return totalLatestEmittedOffset;
        };
        Gauge totalLatestCompletedOffsetGauge = () -> {
            Long totalLatestCompletedOffset = 0L;
            for (TopicPartition topicPartition : this.assignment) {
                String topicOfPartition = topicPartition.topic();
                if (!topicOfPartition.equals(this.topic)) continue;
                OffsetManager offsetManager = this.offsetManagerSupplier.get().get(topicPartition);
                Long ret = offsetManager.getCommittedOffset();
                totalLatestCompletedOffset = totalLatestCompletedOffset + ret;
            }
            return totalLatestCompletedOffset;
        };
        Gauge totalRecordsInPartitionsGauge = () -> {
            Long totalRecordsInPartitions = 0L;
            for (TopicPartition topicPartition : this.assignment) {
                String topicOfPartition = topicPartition.topic();
                if (!topicOfPartition.equals(this.topic)) continue;
                Map<TopicPartition, Long> endOffsets = KafkaOffsetUtil.getEndOffsets(Collections.singleton(topicPartition), this.adminSupplier);
                if (endOffsets == null || endOffsets.isEmpty()) {
                    LOG.error("Failed to get endOffsets from Kafka for topic partitions: {}.", (Object)topicPartition);
                    return 0L;
                }
                Map<TopicPartition, Long> beginningOffsets = KafkaOffsetUtil.getBeginningOffsets(Collections.singleton(topicPartition), this.adminSupplier);
                if (beginningOffsets == null || beginningOffsets.isEmpty()) {
                    LOG.error("Failed to get beginningOffsets from Kafka for topic partitions: {}.", (Object)topicPartition);
                    return 0L;
                }
                Long ret = endOffsets.get(topicPartition) - beginningOffsets.get(topicPartition);
                totalRecordsInPartitions = totalRecordsInPartitions + ret;
            }
            return totalRecordsInPartitions;
        };
        metrics.put(this.topic + "/totalSpoutLag", (Metric)totalSpoutLagGauge);
        metrics.put(this.topic + "/totalEarliestTimeOffset", (Metric)totalEarliestTimeOffsetGauge);
        metrics.put(this.topic + "/totalLatestTimeOffset", (Metric)totalLatestTimeOffsetGauge);
        metrics.put(this.topic + "/totalLatestEmittedOffset", (Metric)totalLatestEmittedOffsetGauge);
        metrics.put(this.topic + "/totalLatestCompletedOffset", (Metric)totalLatestCompletedOffsetGauge);
        metrics.put(this.topic + "/totalRecordsInPartitions", (Metric)totalRecordsInPartitionsGauge);
        return metrics;
    }
}

