/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.cnc.apptelemetry.collector;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCollector;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCounter;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryCounterType;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryHistogram;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryMetricSet;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryRequestClassifier;
import com.couchbase.client.core.cnc.apptelemetry.collector.AppTelemetryRequestType;
import com.couchbase.client.core.cnc.apptelemetry.collector.NodeAndBucket;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.env.UserAgent;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.Request;
import com.couchbase.client.core.msg.RequestContext;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
import com.couchbase.client.core.topology.HostAndServicePorts;
import com.couchbase.client.core.topology.NodeIdentifier;
import com.couchbase.client.core.util.CbStrings;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.util.annotation.Nullable;

@Stability.Internal
public final class AppTelemetryCollectorImpl
implements AppTelemetryCollector {
    private static final Logger log = LoggerFactory.getLogger(AppTelemetryCollectorImpl.class);
    private final String agent;
    private final ConcurrentMap<NodeAndBucket, AppTelemetryMetricSet> metricSets = new ConcurrentHashMap<NodeAndBucket, AppTelemetryMetricSet>();
    private volatile boolean paused;
    @Nullable
    private volatile ClusterTopology latestTopology;

    public AppTelemetryCollectorImpl(Flux<ClusterConfig> configs, UserAgent userAgent) {
        this.agent = userAgent.name() + "/" + userAgent.version();
        configs.doOnNext(config -> {
            try {
                ClusterTopology topology;
                this.latestTopology = topology = config.globalTopology();
                this.prune(topology == null ? Collections.emptyList() : topology.nodes());
            }
            catch (Exception e) {
                log.warn("PRUNE: Failed to prune App Telemetry metrics", (Throwable)e);
            }
        }).subscribe();
    }

    private void prune(List<HostAndServicePorts> existingNodes) {
        Set canonicalAddressesOfExistingNodes = existingNodes.stream().map(it -> it.id().canonical()).collect(Collectors.toSet());
        log.debug("PRUNE: Canonical addresses of nodes in cluster: {}", (Object)RedactableArgument.redactSystem(canonicalAddressesOfExistingNodes));
        this.metricSets.keySet().removeIf(it -> {
            boolean remove;
            boolean bl = remove = !canonicalAddressesOfExistingNodes.contains(it.nodeId.canonical());
            if (remove) {
                log.info("PRUNE: Discarding App Telemetry metrics for node that is no longer in cluster: {}", (Object)RedactableArgument.redactSystem(it.nodeId));
            }
            return remove;
        });
    }

    @Override
    public synchronized void setPaused(boolean paused) {
        if (this.paused == paused) {
            return;
        }
        log.info("{} app telemetry collection.", (Object)(paused ? "Pausing" : "Resuming"));
        this.paused = paused;
        if (paused) {
            this.prune(Collections.emptyList());
        }
    }

    @Override
    public void recordLatency(Request<?> request) {
        if (this.paused) {
            return;
        }
        AppTelemetryRequestType type = AppTelemetryRequestClassifier.classify(request);
        if (type == null) {
            return;
        }
        RequestContext ctx = request.context();
        this.recordLatency(ctx.lastDispatchedToNode(), request.bucket(), type, ctx.dispatchLatency());
    }

    private void recordLatency(NodeIdentifier nodeId, @Nullable String bucket, AppTelemetryRequestType type, long latencyNanos) {
        NodeAndBucket nodeAndBucket = new NodeAndBucket(nodeId, bucket);
        AppTelemetryMetricSet histogramGroup = this.metricSets.computeIfAbsent(nodeAndBucket, key -> new AppTelemetryMetricSet());
        AppTelemetryHistogram histogram = histogramGroup.histograms.get((Object)type);
        histogram.record(latencyNanos);
        this.increment(nodeId, bucket, type.service, AppTelemetryCounterType.TOTAL);
    }

    @Override
    public void increment(Request<?> request, AppTelemetryCounterType counterType) {
        if (this.paused) {
            return;
        }
        AppTelemetryRequestType type = AppTelemetryRequestClassifier.classify(request);
        if (type == null) {
            return;
        }
        RequestContext ctx = request.context();
        NodeIdentifier nodeId = ctx.lastDispatchedToNode();
        if (nodeId == null) {
            return;
        }
        this.increment(nodeId, request.bucket(), request.serviceType(), counterType);
    }

    private void increment(NodeIdentifier nodeId, @Nullable String bucket, ServiceType serviceType, AppTelemetryCounterType counterType) {
        NodeAndBucket nodeAndBucket = new NodeAndBucket(nodeId, bucket);
        AppTelemetryMetricSet metricSet = this.metricSets.computeIfAbsent(nodeAndBucket, key -> new AppTelemetryMetricSet());
        Map<AppTelemetryCounterType, AppTelemetryCounter> countersByType = metricSet.counters.get((Object)serviceType);
        if (countersByType == null) {
            return;
        }
        countersByType.get((Object)counterType).increment();
        if (counterType != AppTelemetryCounterType.TOTAL) {
            countersByType.get((Object)AppTelemetryCounterType.TOTAL).increment();
        }
    }

    @Override
    public synchronized void reportTo(Consumer<? super CharSequence> charSink) {
        long currentTimeMillis = System.currentTimeMillis();
        this.metricSets.forEach((nodeAndBucket, metricSet) -> {
            String nodeUuid = this.getNodeUuid((NodeAndBucket)nodeAndBucket);
            if (nodeUuid == null) {
                return;
            }
            LinkedHashMap<String, String> commonTags = new LinkedHashMap<String, String>();
            commonTags.put("agent", this.agent);
            commonTags.put("node_uuid", nodeUuid);
            nodeAndBucket.writeTo(commonTags);
            metricSet.forEachMetric(reportable -> reportable.reportTo(charSink, commonTags, currentTimeMillis));
        });
    }

    @Nullable
    private String getNodeUuid(NodeAndBucket nodeAndBucket) {
        ClusterTopology topology = this.latestTopology;
        if (topology == null) {
            return null;
        }
        return topology.nodes().stream().filter(node -> node.id().equals(nodeAndBucket.nodeId)).findFirst().map(node -> CbStrings.nullToEmpty(node.uuid())).orElse(null);
    }
}

