/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.metrics;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.TraceReporterSetup;
import org.apache.flink.runtime.metrics.ViewUpdater;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.ReporterScopedSettings;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.traces.reporter.TraceReporter;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricRegistryImpl
implements MetricRegistry,
AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(MetricRegistryImpl.class);
    private final Object lock = new Object();
    private final List<ReporterAndSettings> reporters;
    private final List<TraceReporter> traceReporters;
    private final ScheduledExecutorService reporterScheduledExecutor;
    private final ScheduledExecutorService viewUpdaterScheduledExecutor;
    private final ScopeFormats scopeFormats;
    private final char globalDelimiter;
    private final CompletableFuture<Void> terminationFuture;
    private final long maximumFramesize;
    @Nullable
    private MetricQueryService queryService;
    @Nullable
    private RpcService metricQueryServiceRpcService;
    private ViewUpdater viewUpdater;
    private boolean isShutdown;

    public MetricRegistryImpl(MetricRegistryConfiguration config) {
        this(config, Collections.emptyList(), Collections.emptyList());
    }

    public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations) {
        this(config, reporterConfigurations, Collections.emptyList());
    }

    public MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations, Collection<TraceReporterSetup> traceReporterConfigurations) {
        this(config, reporterConfigurations, traceReporterConfigurations, Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-Metric-Reporter")), Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-Metric-View-Updater")));
    }

    @VisibleForTesting
    MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations, ScheduledExecutorService scheduledExecutor) {
        this(config, reporterConfigurations, Collections.emptyList(), scheduledExecutor);
    }

    @VisibleForTesting
    MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations, Collection<TraceReporterSetup> traceReporterConfigurations, ScheduledExecutorService scheduledExecutor) {
        this(config, reporterConfigurations, traceReporterConfigurations, scheduledExecutor, scheduledExecutor);
    }

    MetricRegistryImpl(MetricRegistryConfiguration config, Collection<ReporterSetup> reporterConfigurations, Collection<TraceReporterSetup> traceReporterConfigurations, ScheduledExecutorService reporterScheduledExecutor, ScheduledExecutorService viewUpdaterScheduledExecutor) {
        this.maximumFramesize = config.getQueryServiceMessageSizeLimit();
        this.scopeFormats = config.getScopeFormats();
        this.globalDelimiter = config.getDelimiter();
        this.terminationFuture = new CompletableFuture();
        this.isShutdown = false;
        this.reporters = new ArrayList<ReporterAndSettings>(4);
        this.traceReporters = new ArrayList<TraceReporter>(4);
        this.reporterScheduledExecutor = reporterScheduledExecutor;
        this.viewUpdaterScheduledExecutor = viewUpdaterScheduledExecutor;
        this.queryService = null;
        this.metricQueryServiceRpcService = null;
        this.initMetricReporters(reporterConfigurations, reporterScheduledExecutor);
        this.initTraceReporters(traceReporterConfigurations);
    }

    private void initMetricReporters(Collection<ReporterSetup> reporterConfigurations, ScheduledExecutorService reporterScheduledExecutor) {
        if (reporterConfigurations.isEmpty()) {
            LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
            return;
        }
        for (ReporterSetup reporterSetup : reporterConfigurations) {
            String namedReporter = reporterSetup.getName();
            try {
                MetricReporter reporterInstance = reporterSetup.getReporter();
                String className = reporterInstance.getClass().getName();
                if (reporterInstance instanceof Scheduled) {
                    Duration period = MetricRegistryImpl.getConfiguredIntervalOrDefault(reporterSetup);
                    LOG.info("Periodically reporting metrics in intervals of {} for reporter {} of type {}.", new Object[]{TimeUtils.formatWithHighestUnit(period), namedReporter, className});
                    reporterScheduledExecutor.scheduleWithFixedDelay(new ReporterTask((Scheduled)reporterInstance), period.toMillis(), period.toMillis(), TimeUnit.MILLISECONDS);
                } else {
                    LOG.info("Reporting metrics for reporter {} of type {}.", (Object)namedReporter, (Object)className);
                }
                String delimiterForReporter = reporterSetup.getDelimiter().orElse(String.valueOf(this.globalDelimiter));
                if (delimiterForReporter.length() != 1) {
                    LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", new Object[]{delimiterForReporter, namedReporter, Character.valueOf(this.globalDelimiter)});
                    delimiterForReporter = String.valueOf(this.globalDelimiter);
                }
                this.reporters.add(new ReporterAndSettings(reporterInstance, new ReporterScopedSettings(this.reporters.size(), delimiterForReporter.charAt(0), reporterSetup.getFilter(), reporterSetup.getExcludedVariables(), reporterSetup.getAdditionalVariables())));
            }
            catch (Throwable t) {
                LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", (Object)namedReporter, (Object)t);
            }
        }
    }

    private void initTraceReporters(Collection<TraceReporterSetup> traceReporterConfigurations) {
        if (traceReporterConfigurations.isEmpty()) {
            LOG.info("No trace reporter configured, no metrics will be exposed/reported.");
            return;
        }
        for (TraceReporterSetup reporterSetup : traceReporterConfigurations) {
            String namedReporter = reporterSetup.getName();
            try {
                TraceReporter reporterInstance = reporterSetup.getReporter();
                this.traceReporters.add(reporterInstance);
            }
            catch (Throwable t) {
                LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", (Object)namedReporter, (Object)t);
            }
        }
    }

    private static Duration getConfiguredIntervalOrDefault(ReporterSetup reporterSetup) {
        Optional<String> configuredPeriod = reporterSetup.getIntervalSettings();
        Duration period = MetricOptions.REPORTER_INTERVAL.defaultValue();
        if (configuredPeriod.isPresent()) {
            try {
                period = TimeUtils.parseDuration(configuredPeriod.get());
            }
            catch (Exception e) {
                LOG.error("Cannot parse report interval from config: " + configuredPeriod + " - please use values like '10 SECONDS' or '500 MILLISECONDS'. Using default reporting interval.");
            }
        }
        return period;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startQueryService(RpcService rpcService, ResourceID resourceID) {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState(!this.isShutdown(), "The metric registry has already been shut down.");
            try {
                this.metricQueryServiceRpcService = rpcService;
                this.queryService = MetricQueryService.createMetricQueryService(rpcService, resourceID, this.maximumFramesize);
                this.queryService.start();
            }
            catch (Exception e) {
                LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", (Throwable)e);
            }
        }
    }

    @Nullable
    public RpcService getMetricQueryServiceRpcService() {
        return this.metricQueryServiceRpcService;
    }

    @Override
    @Nullable
    public String getMetricQueryServiceGatewayRpcAddress() {
        if (this.queryService != null) {
            return ((MetricQueryServiceGateway)this.queryService.getSelfGateway(MetricQueryServiceGateway.class)).getAddress();
        }
        return null;
    }

    @Nullable
    @VisibleForTesting
    MetricQueryServiceGateway getMetricQueryServiceGateway() {
        if (this.queryService != null) {
            return (MetricQueryServiceGateway)this.queryService.getSelfGateway(MetricQueryServiceGateway.class);
        }
        return null;
    }

    @Override
    public char getDelimiter() {
        return this.globalDelimiter;
    }

    @VisibleForTesting
    char getDelimiter(int reporterIndex) {
        try {
            return this.reporters.get(reporterIndex).getSettings().getDelimiter();
        }
        catch (IndexOutOfBoundsException e) {
            LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", (Object)reporterIndex);
            return this.globalDelimiter;
        }
    }

    @Override
    public int getNumberReporters() {
        return this.reporters.size();
    }

    @VisibleForTesting
    public List<MetricReporter> getReporters() {
        return this.reporters.stream().map(ReporterAndSettings::getReporter).collect(Collectors.toList());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isShutdown() {
        Object object = this.lock;
        synchronized (object) {
            return this.isShutdown;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                return this.terminationFuture;
            }
            this.isShutdown = true;
            ArrayList terminationFutures = new ArrayList(3);
            Duration gracePeriod = Duration.ofSeconds(1L);
            if (this.metricQueryServiceRpcService != null) {
                CompletableFuture metricQueryServiceRpcServiceTerminationFuture = this.metricQueryServiceRpcService.closeAsync();
                terminationFutures.add(metricQueryServiceRpcServiceTerminationFuture);
            }
            Throwable throwable = null;
            for (ReporterAndSettings reporterAndSettings : this.reporters) {
                try {
                    reporterAndSettings.getReporter().close();
                }
                catch (Throwable t) {
                    throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
                }
            }
            this.reporters.clear();
            if (throwable != null) {
                terminationFutures.add(FutureUtils.completedExceptionally(new FlinkException("Could not shut down the metric reporters properly.", throwable)));
            }
            CompletableFuture<Void> reporterExecutorShutdownFuture = ExecutorUtils.nonBlockingShutdown(gracePeriod.toMillis(), TimeUnit.MILLISECONDS, this.reporterScheduledExecutor);
            terminationFutures.add(reporterExecutorShutdownFuture);
            CompletableFuture<Void> viewUpdaterExecutorShutdownFuture = ExecutorUtils.nonBlockingShutdown(gracePeriod.toMillis(), TimeUnit.MILLISECONDS, this.viewUpdaterScheduledExecutor);
            terminationFutures.add(viewUpdaterExecutorShutdownFuture);
            FutureUtils.completeAll(terminationFutures).whenComplete((ignored, error) -> {
                if (error != null) {
                    this.terminationFuture.completeExceptionally((Throwable)error);
                } else {
                    this.terminationFuture.complete(null);
                }
            });
            return this.terminationFuture;
        }
    }

    @Override
    public ScopeFormats getScopeFormats() {
        return this.scopeFormats;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addSpan(SpanBuilder spanBuilder) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                LOG.warn("Cannot add span, because the MetricRegistry has already been shut down.");
                return;
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("addSpan");
            }
            if (this.reporters != null) {
                this.notifyTraceReportersOfAddedSpan(spanBuilder.build());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void register(Metric metric, String metricName, AbstractMetricGroup group) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registering metric {}.{}.", (Object)group.getLogicalScope(CharacterFilter.NO_OP_FILTER), (Object)metricName);
                }
                if (this.reporters != null) {
                    this.forAllReporters(MetricReporter::notifyOfAddedMetric, metric, metricName, group);
                }
                try {
                    if (this.queryService != null) {
                        this.queryService.addMetric(metricName, metric, group);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while registering metric: {}.", (Object)metricName, (Object)e);
                }
                try {
                    if (metric instanceof View) {
                        if (this.viewUpdater == null) {
                            this.viewUpdater = new ViewUpdater(this.viewUpdaterScheduledExecutor);
                        }
                        this.viewUpdater.notifyOfAddedView((View)metric);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while registering metric: {}.", (Object)metricName, (Object)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
            } else {
                if (this.reporters != null) {
                    this.forAllReporters(MetricReporter::notifyOfRemovedMetric, metric, metricName, group);
                }
                try {
                    if (this.queryService != null) {
                        this.queryService.removeMetric(metric);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while unregistering metric: {}.", (Object)metricName, (Object)e);
                }
                try {
                    if (metric instanceof View && this.viewUpdater != null) {
                        this.viewUpdater.notifyOfRemovedView((View)metric);
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while unregistering metric: {}", (Object)metricName, (Object)e);
                }
            }
        }
    }

    @GuardedBy(value="lock")
    private void notifyTraceReportersOfAddedSpan(Span span) {
        for (int i = 0; i < this.traceReporters.size(); ++i) {
            try {
                this.traceReporters.get(i).notifyOfAddedSpan(span);
                continue;
            }
            catch (Exception e) {
                LOG.warn("Error while handling span: {}.", (Object)span, (Object)e);
            }
        }
    }

    @GuardedBy(value="lock")
    private void forAllReporters(QuadConsumer<MetricReporter, Metric, String, MetricGroup> operation, Metric metric, String metricName, AbstractMetricGroup group) {
        for (int i = 0; i < this.reporters.size(); ++i) {
            try {
                ReporterAndSettings reporterAndSettings = this.reporters.get(i);
                if (reporterAndSettings == null) continue;
                String logicalScope = group.getLogicalScope(CharacterFilter.NO_OP_FILTER);
                if (!reporterAndSettings.settings.getFilter().filter(metric, metricName, logicalScope)) {
                    LOG.trace("Ignoring metric {}.{} for reporter #{} due to filter rules.", new Object[]{logicalScope, metricName, i});
                    continue;
                }
                FrontMetricGroup<AbstractMetricGroup> front = new FrontMetricGroup<AbstractMetricGroup>(reporterAndSettings.getSettings(), group);
                operation.accept(reporterAndSettings.getReporter(), metric, metricName, front);
                continue;
            }
            catch (Exception e) {
                LOG.warn("Error while handling metric: {}.", (Object)metricName, (Object)e);
            }
        }
    }

    @Nullable
    @VisibleForTesting
    MetricQueryService getQueryService() {
        return this.queryService;
    }

    private static class ReporterAndSettings {
        private final MetricReporter reporter;
        private final ReporterScopedSettings settings;

        private ReporterAndSettings(MetricReporter reporter, ReporterScopedSettings settings) {
            this.reporter = Preconditions.checkNotNull(reporter);
            this.settings = Preconditions.checkNotNull(settings);
        }

        public MetricReporter getReporter() {
            return this.reporter;
        }

        public ReporterScopedSettings getSettings() {
            return this.settings;
        }
    }

    private static final class ReporterTask
    extends TimerTask {
        private final Scheduled reporter;

        private ReporterTask(Scheduled reporter) {
            this.reporter = reporter;
        }

        @Override
        public void run() {
            try {
                this.reporter.report();
            }
            catch (Throwable t) {
                LOG.warn("Error while reporting metrics", t);
            }
        }
    }
}

