/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.samza.context.Context;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Reservoir;
import org.apache.samza.metrics.SlidingTimeWindowReservoir;
import org.apache.samza.metrics.Timer;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class SamzaTransformMetrics
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized String ENABLE_TASK_METRICS = "runner.samza.transform.enable.task.metrics";
    private static final @UnknownKeyFor @NonNull @Initialized int DEFAULT_LOOKBACK_TIMER_WINDOW_SIZE_MS = 180000;
    private static final @UnknownKeyFor @NonNull @Initialized String GROUP = "SamzaBeamTransformMetrics";
    private static final @UnknownKeyFor @NonNull @Initialized String METRIC_NAME_PATTERN = "%s-%s";
    private static final @UnknownKeyFor @NonNull @Initialized String TRANSFORM_LATENCY_METRIC = "handle-message-ns";
    private static final @UnknownKeyFor @NonNull @Initialized String TRANSFORM_WATERMARK_PROGRESS = "output-watermark-ms";
    private static final @UnknownKeyFor @NonNull @Initialized String TRANSFORM_IP_THROUGHPUT = "num-input-messages";
    private static final @UnknownKeyFor @NonNull @Initialized String TRANSFORM_OP_THROUGHPUT = "num-output-messages";
    private static final @UnknownKeyFor @NonNull @Initialized String TRANSFORM_ARRIVAL_TIME_CACHE_SIZE = "in-mem-cache-size";
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Timer> transformLatency = new ConcurrentHashMap<String, Timer>();
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Gauge<@UnknownKeyFor @NonNull @Initialized Long>> transformWatermarkProgress;
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Counter> transformInputThroughput;
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Counter> transformOutputThroughPut = new ConcurrentHashMap<String, Counter>();
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Gauge<@UnknownKeyFor @NonNull @Initialized Long>> transformCacheSize;

    public SamzaTransformMetrics() {
        this.transformWatermarkProgress = new ConcurrentHashMap<String, Gauge<Long>>();
        this.transformInputThroughput = new ConcurrentHashMap<String, Counter>();
        this.transformCacheSize = new ConcurrentHashMap<String, Gauge<Long>>();
    }

    public void register(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized Context ctx) {
        this.transformWatermarkProgress.putIfAbsent(transformName, (Gauge<Long>)ctx.getTaskContext().getTaskMetricsRegistry().newGauge(GROUP, SamzaTransformMetrics.getMetricNameWithPrefix(TRANSFORM_WATERMARK_PROGRESS, transformName), (Object)0L));
        boolean enablePerTaskMetrics = ctx.getJobContext().getConfig().getBoolean(ENABLE_TASK_METRICS, false);
        MetricsRegistry metricsRegistry = enablePerTaskMetrics ? ctx.getTaskContext().getTaskMetricsRegistry() : ctx.getContainerContext().getContainerMetricsRegistry();
        this.transformLatency.putIfAbsent(transformName, metricsRegistry.newTimer(GROUP, SamzaTransformMetrics.getTimerWithCustomizedLookBackWindow(transformName)));
        this.transformOutputThroughPut.putIfAbsent(transformName, metricsRegistry.newCounter(GROUP, SamzaTransformMetrics.getMetricNameWithPrefix(TRANSFORM_OP_THROUGHPUT, transformName)));
        this.transformInputThroughput.putIfAbsent(transformName, metricsRegistry.newCounter(GROUP, SamzaTransformMetrics.getMetricNameWithPrefix(TRANSFORM_IP_THROUGHPUT, transformName)));
        this.transformCacheSize.putIfAbsent(transformName, (Gauge<Long>)ctx.getTaskContext().getTaskMetricsRegistry().newGauge(GROUP, SamzaTransformMetrics.getMetricNameWithPrefix(TRANSFORM_ARRIVAL_TIME_CACHE_SIZE, transformName), (Object)0L));
    }

    public @UnknownKeyFor @NonNull @Initialized Timer getTransformLatencyMetric(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.transformLatency.get(transformName);
    }

    public @UnknownKeyFor @NonNull @Initialized Counter getTransformInputThroughput(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.transformInputThroughput.get(transformName);
    }

    public @UnknownKeyFor @NonNull @Initialized Counter getTransformOutputThroughput(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.transformOutputThroughPut.get(transformName);
    }

    public @UnknownKeyFor @NonNull @Initialized Gauge<@UnknownKeyFor @NonNull @Initialized Long> getTransformCacheSize(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.transformCacheSize.get(transformName);
    }

    public @UnknownKeyFor @NonNull @Initialized Gauge<@UnknownKeyFor @NonNull @Initialized Long> getTransformWatermarkProgress(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.transformWatermarkProgress.get(transformName);
    }

    private static @UnknownKeyFor @NonNull @Initialized Timer getTimerWithCustomizedLookBackWindow(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return new Timer(SamzaTransformMetrics.getMetricNameWithPrefix(TRANSFORM_LATENCY_METRIC, transformName), (Reservoir)new SlidingTimeWindowReservoir(180000L));
    }

    private static @UnknownKeyFor @NonNull @Initialized String getMetricNameWithPrefix(@UnknownKeyFor @NonNull @Initialized String metricName, @UnknownKeyFor @NonNull @Initialized String transformName) {
        String samzaSafeMetricName = transformName.replaceAll("[^A-Za-z0-9_]", "_");
        return String.format(METRIC_NAME_PATTERN, samzaSafeMetricName, metricName);
    }
}

