/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.service;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.service.BasicService;
import org.apache.kylin.shaded.com.google.common.cache.Cache;
import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.client.CoordinatorClientFactory;
import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.model.stats.ClusterState;
import org.apache.kylin.stream.core.model.stats.ConsumerStats;
import org.apache.kylin.stream.core.model.stats.CubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.PartitionConsumeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeRealTimeState;
import org.apache.kylin.stream.core.model.stats.ReceiverCubeStats;
import org.apache.kylin.stream.core.model.stats.ReceiverState;
import org.apache.kylin.stream.core.model.stats.ReceiverStats;
import org.apache.kylin.stream.core.model.stats.ReplicaSetState;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.source.StreamingSourceConfig;
import org.apache.kylin.stream.core.source.StreamingSourceConfigManager;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.stereotype.Component;

@Component(value="streamingServiceV2")
public class StreamingV2Service
extends BasicService {
    private static final Logger logger = LoggerFactory.getLogger(StreamingV2Service.class);
    private static final String CLUSTER_STATE = "cluster_state";
    private StreamMetadataStore streamMetadataStore;
    private ReceiverAdminClient receiverAdminClient;
    private Cache<String, ClusterState> clusterStateCache = CacheBuilder.newBuilder().expireAfterWrite(10L, TimeUnit.SECONDS).build();
    private ExecutorService clusterStateExecutor = new ThreadPoolExecutor(0, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("fetch_receiver_state"));

    public StreamingV2Service() {
        this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
        this.receiverAdminClient = new HttpReceiverAdminClient();
    }

    StreamingV2Service(StreamMetadataStore metadataStore, ReceiverAdminClient adminClient) {
        this.streamMetadataStore = metadataStore;
        this.receiverAdminClient = adminClient;
    }

    public List<StreamingSourceConfig> listAllStreamingConfigs(String table, String projectName) throws IOException {
        List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList();
        if (StringUtils.isEmpty((CharSequence)table) || StringUtils.isEmpty((CharSequence)projectName)) {
            streamingSourceConfigs = this.getStreamingManagerV2().listAllStreaming();
        } else {
            StreamingSourceConfig config = this.getStreamingManagerV2().getConfig(table, projectName);
            if (config != null) {
                streamingSourceConfigs.add(config);
            }
        }
        return streamingSourceConfigs;
    }

    public List<StreamingSourceConfig> getStreamingConfigs(String table, String projectName, Integer limit, Integer offset) throws IOException {
        List<StreamingSourceConfig> streamingSourceConfigs = this.listAllStreamingConfigs(table, projectName);
        if (limit == null || offset == null) {
            return streamingSourceConfigs;
        }
        if (streamingSourceConfigs.size() - offset < limit) {
            return streamingSourceConfigs.subList(offset, streamingSourceConfigs.size());
        }
        return streamingSourceConfigs.subList(offset, offset + limit);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#project, 'ADMINISTRATION')")
    public StreamingSourceConfig createStreamingConfig(StreamingSourceConfig config, ProjectInstance project) throws IOException {
        if (this.getStreamingManagerV2().getConfigMustWithProject(config.getName(), config.getProjectName()) != null) {
            throw new InternalErrorException("The streamingSourceConfig named " + config.getName() + " already exists");
        }
        StreamingSourceConfig streamingSourceConfig = this.getStreamingManagerV2().saveStreamingConfig(config);
        return streamingSourceConfig;
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public StreamingSourceConfig updateStreamingConfig(StreamingSourceConfig config) throws IOException {
        return this.getStreamingManagerV2().updateStreamingConfig(config);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void dropStreamingConfig(StreamingSourceConfig config) throws IOException {
        this.getStreamingManagerV2().removeStreamingConfig(config);
    }

    public String getParserTemplate(final int sourceType, StreamingSourceConfig config) {
        IStreamingSource streamingSource = StreamingSourceFactory.getStreamingSource((ISourceAware)new ISourceAware(){

            public int getSourceType() {
                return sourceType;
            }

            public KylinConfig getConfig() {
                return this.getConfig();
            }
        });
        return streamingSource.getMessageTemplate(config);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public List<CubeAssignment> getStreamingCubeAssignments(CubeInstance cube) {
        if (cube == null) {
            return this.streamMetadataStore.getAllCubeAssignments();
        }
        ArrayList result = Lists.newArrayList();
        CubeAssignment assignment = this.streamMetadataStore.getAssignmentsByCube(cube.getName());
        if (assignment != null) {
            result.add(assignment);
        }
        return result;
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public Map<Integer, Map<String, List<Partition>>> getStreamingReplicaSetAssignments(Integer replicaSetID) {
        if (replicaSetID == null) {
            return this.streamMetadataStore.getAllReplicaSetAssignments();
        }
        HashMap result = Maps.newHashMap();
        Map assignment = this.streamMetadataStore.getAssignmentsByReplicaSet(replicaSetID.intValue());
        if (assignment != null) {
            result.put(replicaSetID, assignment);
        }
        return result;
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public Map<Integer, Map<String, List<Partition>>> reBalancePlan() {
        return this.getCoordinatorClient().reBalanceRecommend();
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan) {
        this.getCoordinatorClient().reBalance(reBalancePlan);
    }

    public List<String> getStreamingCubes() {
        return this.streamMetadataStore.getCubes();
    }

    public StreamingCubeConsumeState getStreamingCubeConsumeState(String cubeName) {
        return this.streamMetadataStore.getStreamingCubeConsumeState(cubeName);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void assignCube(CubeInstance cube) {
        this.getCoordinatorClient().assignCube(cube.getName());
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void unAssignCube(CubeInstance cube) {
        this.getCoordinatorClient().unAssignCube(cube.getName());
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void reAssignCube(String cubeName, CubeAssignment newAssignment) {
        this.validateAssignment(newAssignment);
        this.getCoordinatorClient().reAssignCube(cubeName, newAssignment);
    }

    void validateAssignment(CubeAssignment newAssignment) {
        Map assignments = newAssignment.getAssignments();
        Map assignmentSet = assignments.keySet().stream().collect(Collectors.toMap(Function.identity(), x -> new HashSet((Collection)assignments.get(x))));
        Set inputReplicaSetIDs = assignments.keySet();
        HashSet allReplicaSetIDs = Sets.newHashSet((Iterable)this.streamMetadataStore.getReplicaSetIDs());
        for (Integer inputReplicaSetID : inputReplicaSetIDs) {
            if (!allReplicaSetIDs.contains(inputReplicaSetID)) {
                throw new IllegalArgumentException("The replica set id:" + inputReplicaSetID + " does not exist");
            }
            Set partitionSet = assignmentSet.get(inputReplicaSetID);
            if (partitionSet.isEmpty()) {
                throw new IllegalArgumentException("PartitionList is empty :" + inputReplicaSetID);
            }
            for (Map.Entry entry : assignmentSet.entrySet()) {
                Set anotherPartitionSet;
                int intersection;
                if (((Integer)entry.getKey()).equals(inputReplicaSetID) || (intersection = Sets.intersection((Set)(anotherPartitionSet = entry.getValue()), (Set)partitionSet).size()) <= 0) continue;
                throw new IllegalArgumentException("Intersection detected between : " + inputReplicaSetID + " with " + entry.getKey());
            }
        }
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void pauseConsumers(CubeInstance cube) {
        this.getCoordinatorClient().pauseConsumers(cube.getName());
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT')")
    public void resumeConsumers(CubeInstance cube) {
        this.getCoordinatorClient().resumeConsumers(cube.getName());
    }

    public StreamingSourceConfigManager getStreamingManagerV2() {
        return StreamingSourceConfigManager.getInstance((KylinConfig)this.getConfig());
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void removeCubeAssignment() {
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public List<Node> getReceivers() {
        List result = this.streamMetadataStore.getReceivers();
        return result;
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void removeReceiver(Node receiver) {
        List replicaSets = this.streamMetadataStore.getReplicaSets();
        for (ReplicaSet replicaSet : replicaSets) {
            Set receivers = replicaSet.getNodes();
            if (receivers == null || !receivers.contains(receiver)) continue;
            throw new IllegalStateException("Before remove receiver, it must be firstly removed from replica set:" + replicaSet.getReplicaSetID());
        }
        this.streamMetadataStore.removeReceiver(receiver);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void createReplicaSet(ReplicaSet rs) {
        this.getCoordinatorClient().createReplicaSet(rs);
        this.clusterStateCache.invalidate((Object)CLUSTER_STATE);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void removeReplicaSet(int rsID) {
        this.getCoordinatorClient().removeReplicaSet(rsID);
        this.clusterStateCache.invalidate((Object)CLUSTER_STATE);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) {
        this.getCoordinatorClient().addNodeToReplicaSet(replicaSetID, nodeID);
        this.clusterStateCache.invalidate((Object)CLUSTER_STATE);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) {
        this.getCoordinatorClient().removeNodeFromReplicaSet(replicaSetID, nodeID);
        this.clusterStateCache.invalidate((Object)CLUSTER_STATE);
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN')")
    public List<ReplicaSet> getReplicaSets() {
        List result = this.streamMetadataStore.getReplicaSets();
        return result;
    }

    public ReceiverStats getReceiverStats(Node receiver) {
        try {
            return this.receiverAdminClient.getReceiverStats(receiver);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public ReceiverCubeStats getReceiverCubeStats(Node receiver, String cubeName) {
        try {
            return this.receiverAdminClient.getReceiverCubeStats(receiver, cubeName);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @PreAuthorize(value="hasRole('ROLE_ADMIN') or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'MANAGEMENT') or hasPermission(#cube.getProjectInstance(), 'MANAGEMENT') or hasPermission(#cube.getProjectInstance(), 'ADMINISTRATION')")
    public CubeRealTimeState getCubeRealTimeState(CubeInstance cube) {
        CubeRealTimeState result = new CubeRealTimeState();
        result.setCubeName(cube.getName());
        CubeAssignment cubeAssignment = this.streamMetadataStore.getAssignmentsByCube(cube.getName());
        HashMap rsReceiverCubeStateMap = Maps.newHashMap();
        for (Integer replicaSetID : cubeAssignment.getReplicaSetIDs()) {
            ReplicaSet replicaSet = this.streamMetadataStore.getReplicaSet(replicaSetID.intValue());
            HashMap receiverCubeStateMap = Maps.newHashMap();
            Set receivers = replicaSet.getNodes();
            for (Node receiver : receivers) {
                ReceiverCubeRealTimeState receiverCubeRealTimeState = new ReceiverCubeRealTimeState();
                try {
                    ReceiverCubeStats receiverCubeStats = this.receiverAdminClient.getReceiverCubeStats(receiver, cube.getName());
                    receiverCubeRealTimeState.setState(ReceiverState.State.HEALTHY);
                    receiverCubeRealTimeState.setReceiverCubeStats(receiverCubeStats);
                }
                catch (IOException e) {
                    logger.error("exception when get receiver cube stats", (Throwable)e);
                    if (!this.isReceiverReachable(receiver)) {
                        receiverCubeRealTimeState.setState(ReceiverState.State.UNREACHABLE);
                    }
                    receiverCubeRealTimeState.setState(ReceiverState.State.DOWN);
                }
                receiverCubeStateMap.put(receiver, receiverCubeRealTimeState);
            }
            rsReceiverCubeStateMap.put(replicaSetID, receiverCubeStateMap);
        }
        result.setReceiverCubeStateMap((Map)rsReceiverCubeStateMap);
        return result;
    }

    public ClusterState getClusterState() {
        ClusterState clusterState = (ClusterState)this.clusterStateCache.getIfPresent((Object)CLUSTER_STATE);
        if (clusterState != null) {
            return clusterState;
        }
        List replicaSets = this.streamMetadataStore.getReplicaSets();
        List allReceivers = this.streamMetadataStore.getReceivers();
        Map rsAssignments = this.streamMetadataStore.getAllReplicaSetAssignments();
        HashMap statsFuturesMap = Maps.newHashMap();
        for (Node receiver : allReceivers) {
            Future<ReceiverStats> receiverStatsFuture = this.clusterStateExecutor.submit(() -> this.receiverAdminClient.getReceiverStats(receiver));
            statsFuturesMap.put(receiver, receiverStatsFuture);
        }
        clusterState = new ClusterState();
        for (ReplicaSet replicaSet : replicaSets) {
            ReplicaSetState replicaSetState = this.calReplicaSetState(replicaSet, (Map)rsAssignments.get(replicaSet.getReplicaSetID()), statsFuturesMap);
            clusterState.addReplicaSetState(replicaSetState);
            allReceivers.removeAll(replicaSet.getNodes());
        }
        for (Node receiver : allReceivers) {
            Future futureStats = (Future)statsFuturesMap.get(receiver);
            ReceiverState receiverState = this.getReceiverStateFromStats(receiver, futureStats);
            clusterState.addAvailableReveiverState(receiverState);
        }
        clusterState.setLastUpdateTime(System.currentTimeMillis());
        this.clusterStateCache.put((Object)CLUSTER_STATE, (Object)clusterState);
        return clusterState;
    }

    private ReplicaSetState calReplicaSetState(ReplicaSet replicaSet, Map<String, List<Partition>> rsAssignment, Map<Node, Future<ReceiverStats>> statsFuturesMap) {
        ReplicaSetState replicaSetState = new ReplicaSetState();
        replicaSetState.setRsID(replicaSet.getReplicaSetID());
        replicaSetState.setAssignment(rsAssignment);
        Set receivers = replicaSet.getNodes();
        if (receivers == null || receivers.isEmpty()) {
            return replicaSetState;
        }
        Node leadReceiver = replicaSet.getLeader();
        replicaSetState.setLead(leadReceiver);
        HashMap receiverStatsMap = Maps.newHashMap();
        for (Object receiver : receivers) {
            Future<ReceiverStats> futureStats = statsFuturesMap.get(receiver);
            try {
                ReceiverStats receiverStats = futureStats.get();
                receiverStatsMap.put(receiver, receiverStats);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            catch (ExecutionException e) {
                replicaSetState.addReveiverState(this.getReceiverStateFromException((Node)receiver, e));
            }
        }
        HashMap cubeLatestEventMap = Maps.newHashMap();
        for (ReceiverStats receiverStats : receiverStatsMap.values()) {
            Map cubeStatsMap = receiverStats.getCubeStatsMap();
            for (Map.Entry cubeStatsEntry : cubeStatsMap.entrySet()) {
                String cubeName = (String)cubeStatsEntry.getKey();
                ReceiverCubeStats cubeStats = (ReceiverCubeStats)cubeStatsEntry.getValue();
                Long latestEventTime = (Long)cubeLatestEventMap.get(cubeName);
                if (latestEventTime != null && latestEventTime >= cubeStats.getLatestEventTime()) continue;
                cubeLatestEventMap.put(cubeName, cubeStats.getLatestEventTime());
            }
        }
        long consumeEventLagThreshold = 300000L;
        for (Map.Entry receiverStatsEntry : receiverStatsMap.entrySet()) {
            ReceiverState.State state;
            Node receiver = (Node)receiverStatsEntry.getKey();
            ReceiverStats receiverStats = (ReceiverStats)receiverStatsEntry.getValue();
            ReceiverState receiverState = new ReceiverState();
            receiverState.setReceiver(receiver);
            receiverState.setState(ReceiverState.State.HEALTHY);
            Map receiverAssignment = receiverStats.getAssignments();
            if (!this.assignmentEqual(receiverAssignment, rsAssignment)) {
                state = ReceiverState.State.WARN;
                receiverState.setState(state);
                receiverState.addInfo("assignment is inconsistent");
            }
            if (receiverStats.isLead() && !receiver.equals((Object)leadReceiver)) {
                state = ReceiverState.State.WARN;
                receiverState.setState(state);
                receiverState.addInfo("lead state is inconsistent");
            }
            Map cubeStatsMap = receiverStats.getCubeStatsMap();
            for (Map.Entry cubeStatsEntry : cubeStatsMap.entrySet()) {
                String cubeName = (String)cubeStatsEntry.getKey();
                ReceiverCubeStats cubeStats = (ReceiverCubeStats)cubeStatsEntry.getValue();
                Long latestEventTime = (Long)cubeLatestEventMap.get(cubeName);
                if (latestEventTime - cubeStats.getLatestEventTime() < consumeEventLagThreshold) continue;
                ReceiverState.State state2 = ReceiverState.State.WARN;
                receiverState.setState(state2);
                receiverState.addInfo("cube:" + cubeName + " consuming is lagged");
            }
            receiverState.setRateInOneMin(this.calConsumeRate(receiver, receiverStats));
            replicaSetState.addReveiverState(receiverState);
        }
        return replicaSetState;
    }

    private boolean assignmentEqual(Map<String, List<Partition>> receiverAssignment, Map<String, List<Partition>> rsAssignment) {
        if (this.emptyMap(receiverAssignment) && this.emptyMap(rsAssignment)) {
            return true;
        }
        if (receiverAssignment != null) {
            for (Map.Entry<String, List<Partition>> entry : receiverAssignment.entrySet()) {
                Collections.sort(entry.getValue());
                entry.setValue(entry.getValue());
            }
        }
        if (rsAssignment != null) {
            for (Map.Entry<String, List<Partition>> entry : rsAssignment.entrySet()) {
                Collections.sort(entry.getValue());
                entry.setValue(entry.getValue());
            }
        }
        return receiverAssignment != null && receiverAssignment.equals(rsAssignment);
    }

    private boolean emptyMap(Map map) {
        return map == null || map.isEmpty();
    }

    private ReceiverState getReceiverStateFromException(Node receiver, ExecutionException e) {
        ReceiverState receiverState = new ReceiverState();
        receiverState.setReceiver(receiver);
        if (!this.isReceiverReachable(receiver)) {
            receiverState.setState(ReceiverState.State.UNREACHABLE);
        } else {
            receiverState.setState(ReceiverState.State.DOWN);
        }
        return receiverState;
    }

    private boolean isReceiverReachable(Node receiver) {
        try {
            InetAddress address = InetAddress.getByName(receiver.getHost());
            boolean reachable = address.isReachable(1000);
            return reachable;
        }
        catch (Exception exception) {
            logger.error("exception when try ping host:" + receiver.getHost(), (Throwable)exception);
            return false;
        }
    }

    private ReceiverState getReceiverStateFromStats(Node receiver, Future<ReceiverStats> futureStats) {
        ReceiverState receiverState = new ReceiverState();
        try {
            ReceiverStats receiverStats = futureStats.get();
            receiverState.setReceiver(receiver);
            receiverState.setState(ReceiverState.State.HEALTHY);
            receiverState.setRateInOneMin(this.calConsumeRate(receiver, receiverStats));
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            receiverState = this.getReceiverStateFromException(receiver, e);
        }
        return receiverState;
    }

    private double calConsumeRate(Node receiver, ReceiverStats receiverStats) {
        double result = 0.0;
        Map cubeStatsMap = receiverStats.getCubeStatsMap();
        for (Map.Entry receiverCubeStatsEntry : cubeStatsMap.entrySet()) {
            ReceiverCubeStats cubeStats = (ReceiverCubeStats)receiverCubeStatsEntry.getValue();
            ConsumerStats consumerStats = cubeStats.getConsumerStats();
            if (consumerStats == null) {
                logger.warn("no consumer stats exist for cube:{} in receiver:{}", receiverCubeStatsEntry.getKey(), (Object)receiver);
                continue;
            }
            Map partitionConsumeStatsMap = consumerStats.getPartitionConsumeStatsMap();
            for (PartitionConsumeStats partitionStats : partitionConsumeStatsMap.values()) {
                result += partitionStats.getOneMinRate();
            }
        }
        return result;
    }

    private synchronized CoordinatorClient getCoordinatorClient() {
        return CoordinatorClientFactory.createCoordinatorClient((StreamMetadataStore)this.streamMetadataStore);
    }
}

