/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.kafka.util;

import io.micrometer.core.instrument.Counter;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class KafkaTopicProducerMetrics {
    static final String NUMBER_OF_RECORDS_SENT = "numberOfRecordsSent";
    static final String NUMBER_OF_BYTES_SENT = "numberOfBytesSent";
    static final String RECORD_SEND_TOTAL = "record-send-total";
    static final String BYTE_TOTAL = "byte-total";
    static final String TOPIC = "topic";
    static final String METRIC_SUFFIX_TOTAL = "-total";
    static final String BYTE_RATE = "byte-rate";
    static final String RECORD_SEND_RATE = "record-send-rate";
    static final String BYTE_SEND_RATE = "byteSendRate";
    static final String BYTE_SEND_TOTAL = "byteSendTotal";
    static final String RECORD_SEND_RATE_MAP_VALUE = "recordSendRate";
    static final String RECORD_SEND_TOTAL_MAP_VALUE = "recordSendTotal";
    static final String NUMBER_OF_RAW_DATA_SEND_ERRORS = "numberOfRawDataSendErrors";
    static final String NUMBER_OF_RECORD_SEND_ERRORS = "numberOfRecordSendErrors";
    static final String NUMBER_OF_RECORD_PROCESSING_ERRORS = "numberOfRecordProcessingErrors";
    private final String topicName;
    private Map<String, String> metricsNameMap;
    private Map<KafkaProducer, Map<String, Double>> metricValues;
    private final PluginMetrics pluginMetrics;
    private final Counter numberOfRecordsSent;
    private final Counter numberOfBytesSent;
    private final Counter numberOfRawDataSendErrors;
    private final Counter numberOfRecordSendErrors;
    private final Counter numberOfRecordProcessingErrors;

    public KafkaTopicProducerMetrics(String topicName, PluginMetrics pluginMetrics, boolean topicNameInMetrics) {
        this.pluginMetrics = pluginMetrics;
        this.topicName = topicName;
        this.metricValues = new HashMap<KafkaProducer, Map<String, Double>>();
        this.initializeMetricNamesMap(topicNameInMetrics);
        this.numberOfRecordsSent = pluginMetrics.counter(this.getTopicMetricName(NUMBER_OF_RECORDS_SENT, topicNameInMetrics));
        this.numberOfBytesSent = pluginMetrics.counter(this.getTopicMetricName(NUMBER_OF_BYTES_SENT, topicNameInMetrics));
        this.numberOfRawDataSendErrors = pluginMetrics.counter(this.getTopicMetricName(NUMBER_OF_RAW_DATA_SEND_ERRORS, topicNameInMetrics));
        this.numberOfRecordSendErrors = pluginMetrics.counter(this.getTopicMetricName(NUMBER_OF_RECORD_SEND_ERRORS, topicNameInMetrics));
        this.numberOfRecordProcessingErrors = pluginMetrics.counter(this.getTopicMetricName(NUMBER_OF_RECORD_PROCESSING_ERRORS, topicNameInMetrics));
    }

    private void initializeMetricNamesMap(boolean topicNameInMetrics) {
        this.metricsNameMap = new HashMap<String, String>();
        this.metricsNameMap.put(BYTE_RATE, BYTE_SEND_RATE);
        this.metricsNameMap.put(BYTE_TOTAL, BYTE_SEND_TOTAL);
        this.metricsNameMap.put(RECORD_SEND_RATE, RECORD_SEND_RATE_MAP_VALUE);
        this.metricsNameMap.put(RECORD_SEND_TOTAL, RECORD_SEND_TOTAL_MAP_VALUE);
        this.metricsNameMap.forEach((metricName, camelCaseName) -> {
            if (!metricName.contains(METRIC_SUFFIX_TOTAL)) {
                this.pluginMetrics.gauge(this.getTopicMetricName((String)camelCaseName, topicNameInMetrics), this.metricValues, metricValues -> {
                    double sum = 0.0;
                    for (Map.Entry entry : metricValues.entrySet()) {
                        Map producerMetrics;
                        Map map = producerMetrics = (Map)entry.getValue();
                        synchronized (map) {
                            sum += ((Double)producerMetrics.get(metricName)).doubleValue();
                        }
                    }
                    return sum;
                });
            }
        });
    }

    public void register(KafkaProducer producer) {
        this.metricValues.put(producer, new HashMap());
        Map<String, Double> producerMetrics = this.metricValues.get(producer);
        this.metricsNameMap.forEach((k, name) -> producerMetrics.put((String)k, 0.0));
    }

    Counter getNumberOfRecordsSent() {
        return this.numberOfRecordsSent;
    }

    Counter getNumberOfBytesSent() {
        return this.numberOfBytesSent;
    }

    public Counter getNumberOfRawDataSendErrors() {
        return this.numberOfRawDataSendErrors;
    }

    public Counter getNumberOfRecordSendErrors() {
        return this.numberOfRecordSendErrors;
    }

    public Counter getNumberOfRecordProcessingErrors() {
        return this.numberOfRecordProcessingErrors;
    }

    private String getTopicMetricName(String metricName, boolean topicNameInMetrics) {
        if (topicNameInMetrics) {
            return "topic." + this.topicName + "." + metricName;
        }
        return metricName;
    }

    Map<KafkaProducer, Map<String, Double>> getMetricValues() {
        return this.metricValues;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void update(KafkaProducer producer) {
        Map<String, Double> producerMetrics = this.metricValues.get(producer);
        Map metrics = producer.metrics();
        for (Map.Entry entry : metrics.entrySet()) {
            double prevValue;
            Map<String, Double> map;
            MetricName metric = (MetricName)entry.getKey();
            Metric value = (Metric)entry.getValue();
            String metricName = metric.name();
            if (!Objects.nonNull(this.metricsNameMap.get(metricName)) || !metric.tags().containsKey(TOPIC)) continue;
            double newValue = (Double)value.metricValue();
            if (metricName.equals(RECORD_SEND_TOTAL)) {
                map = producerMetrics;
                synchronized (map) {
                    prevValue = producerMetrics.get(metricName);
                    this.numberOfRecordsSent.increment(newValue - prevValue);
                }
            }
            if (metricName.equals(BYTE_TOTAL)) {
                map = producerMetrics;
                synchronized (map) {
                    prevValue = producerMetrics.get(metricName);
                    this.numberOfBytesSent.increment(newValue - prevValue);
                }
            }
            map = producerMetrics;
            synchronized (map) {
                producerMetrics.put(metricName, newValue);
            }
        }
    }
}

