/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.conn.BasicHttpClientConnectionManager;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RealtimeConsumerMonitor
extends ControllerPeriodicTask<Context> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
    private static final int DEFAULT_TIMEOUT_MS = 10000;
    private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;

    @VisibleForTesting
    public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, ConsumingSegmentInfoReader consumingSegmentInfoReader) {
        super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(), controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._consumingSegmentInfoReader = consumingSegmentInfoReader;
    }

    public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics, ExecutorService executorService) {
        this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics, new ConsumingSegmentInfoReader(executorService, (HttpClientConnectionManager)new BasicHttpClientConnectionManager(), pinotHelixResourceManager));
    }

    protected void setUpTask() {
        LOGGER.info("Setting up RealtimeConsumerMonitor task");
    }

    @Override
    protected void processTable(String tableNameWithType) {
        if (!TableType.REALTIME.equals((Object)TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType))) {
            return;
        }
        try {
            ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap = this._consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);
            HashMap<String, List> partitionToLagSet = new HashMap<String, List>();
            HashMap<String, List> partitionToAvailabilityLagSet = new HashMap<String, List>();
            for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
                info.forEach(segment -> {
                    segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
                        if (!"NOT_CALCULATED".equals(v)) {
                            try {
                                long recordsLag = Long.parseLong(v);
                                partitionToLagSet.computeIfAbsent((String)k, k1 -> new ArrayList()).add(recordsLag);
                            }
                            catch (NumberFormatException numberFormatException) {
                                // empty catch block
                            }
                        }
                    });
                    segment._partitionOffsetInfo._availabilityLagMap.forEach((k, v) -> {
                        if (!"NOT_CALCULATED".equals(v)) {
                            try {
                                long availabilityLagMs = Long.parseLong(v);
                                partitionToAvailabilityLagSet.computeIfAbsent((String)k, k1 -> new ArrayList()).add(availabilityLagMs);
                            }
                            catch (NumberFormatException numberFormatException) {
                                // empty catch block
                            }
                        }
                    });
                });
            }
            partitionToLagSet.forEach((partition, lagSet) -> this._controllerMetrics.setValueOfPartitionGauge(tableNameWithType, Integer.parseInt(partition), (AbstractMetrics.Gauge)ControllerGauge.MAX_RECORDS_LAG, ((Long)Collections.max(lagSet)).longValue()));
            partitionToAvailabilityLagSet.forEach((partition, lagSet) -> this._controllerMetrics.setValueOfPartitionGauge(tableNameWithType, Integer.parseInt(partition), (AbstractMetrics.Gauge)ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS, ((Long)Collections.max(lagSet)).longValue()));
        }
        catch (Exception e) {
            LOGGER.error("Failed to fetch consuming segments info. Unable to update table consumption status metrics");
        }
    }

    public static final class Context {
    }
}

