/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.cnc.metrics;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Counter;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.Meter;
import com.couchbase.client.core.cnc.ValueRecorder;
import com.couchbase.client.core.cnc.events.metrics.LatencyMetricsAggregatedEvent;
import com.couchbase.client.core.cnc.metrics.AggregatingCounter;
import com.couchbase.client.core.cnc.metrics.AggregatingValueRecorder;
import com.couchbase.client.core.cnc.metrics.NameAndTags;
import com.couchbase.client.core.deps.org.HdrHistogram.Histogram;
import com.couchbase.client.core.env.AggregatingMeterConfig;
import java.time.Duration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;

@Stability.Volatile
public class AggregatingMeter
implements Meter {
    private static final AtomicInteger METER_ID = new AtomicInteger();
    private final EventBus eventBus;
    private final Thread worker;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Map<NameAndTags, AggregatingValueRecorder> valueRecorders = new ConcurrentHashMap<NameAndTags, AggregatingValueRecorder>();
    private final long emitIntervalMs;

    public static AggregatingMeter create(EventBus eventBus, AggregatingMeterConfig config) {
        return new AggregatingMeter(config, eventBus);
    }

    private AggregatingMeter(AggregatingMeterConfig config, EventBus eventBus) {
        this.eventBus = eventBus;
        this.emitIntervalMs = config.emitInterval().toMillis();
        this.worker = new Thread(new Worker());
        this.worker.setDaemon(true);
    }

    @Override
    public Counter counter(String name, Map<String, String> tags) {
        return AggregatingCounter.INSTANCE;
    }

    @Override
    public synchronized ValueRecorder valueRecorder(String name, Map<String, String> tags) {
        return this.valueRecorders.computeIfAbsent(new NameAndTags(name, tags), key -> new AggregatingValueRecorder(name, tags));
    }

    @Override
    public Mono<Void> start() {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(false, true)) {
                this.worker.start();
            }
            return Mono.empty();
        });
    }

    @Override
    public Mono<Void> stop(Duration timeout) {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(true, false)) {
                this.worker.interrupt();
            }
            return Mono.empty();
        });
    }

    private class Worker
    implements Runnable {
        private Worker() {
        }

        @Override
        public void run() {
            Thread.currentThread().setName("cb-metrics-" + METER_ID.incrementAndGet());
            while (AggregatingMeter.this.running.get()) {
                try {
                    Thread.sleep(AggregatingMeter.this.emitIntervalMs);
                    this.dumpMetrics();
                }
                catch (InterruptedException ex) {
                    if (!AggregatingMeter.this.running.get()) {
                        return;
                    }
                    Thread.currentThread().interrupt();
                }
                catch (Exception exception) {}
            }
        }

        private synchronized void dumpMetrics() {
            HashMap<String, Map<String, Object>> output = new HashMap<String, Map<String, Object>>();
            HashMap<String, Long> meta = new HashMap<String, Long>();
            meta.put("emit_interval_s", TimeUnit.MILLISECONDS.toSeconds(AggregatingMeter.this.emitIntervalMs));
            output.put("meta", meta);
            boolean wroteRow = false;
            for (Map.Entry entry : AggregatingMeter.this.valueRecorders.entrySet()) {
                AggregatingValueRecorder avr;
                Histogram histogram;
                if (!((NameAndTags)entry.getKey()).name().equals("db.couchbase.requests") || (histogram = (avr = (AggregatingValueRecorder)entry.getValue()).clearStats()).getTotalCount() == 0L) continue;
                wroteRow = true;
                String service = avr.tags().get("db.couchbase.service");
                String hostname = avr.tags().get("net.peer.name");
                Map serviceMap = output.computeIfAbsent(service, k -> new HashMap());
                Map hostMap = (Map)serviceMap.computeIfAbsent(hostname, k -> new HashMap());
                hostMap.put("total_count", histogram.getTotalCount());
                LinkedHashMap<String, Double> percentiles = new LinkedHashMap<String, Double>();
                percentiles.put("50.0", (double)histogram.getValueAtPercentile(50.0) / 1000.0);
                percentiles.put("90.0", (double)histogram.getValueAtPercentile(90.0) / 1000.0);
                percentiles.put("99.0", (double)histogram.getValueAtPercentile(99.0) / 1000.0);
                percentiles.put("99.9", (double)histogram.getValueAtPercentile(99.9) / 1000.0);
                percentiles.put("100.0", (double)histogram.getMaxValue() / 1000.0);
                hostMap.put("percentiles_us", percentiles);
            }
            if (wroteRow) {
                AggregatingMeter.this.eventBus.publish(new LatencyMetricsAggregatedEvent(Duration.ofMillis(AggregatingMeter.this.emitIntervalMs), output));
            }
        }
    }
}

