/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.tools.command.consumer;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.apache.rocketmq.tools.command.consumer.GroupConsumeInfo;
import org.slf4j.Logger;

public class ConsumerProgressSubCommand
implements SubCommand {
    private final Logger log = ClientLogger.getLog();

    @Override
    public String commandName() {
        return "consumerProgress";
    }

    @Override
    public String commandDesc() {
        return "Query consumers's progress, speed";
    }

    @Override
    public Options buildCommandlineOptions(Options options) {
        Option opt = new Option("g", "groupName", true, "consumer group name");
        opt.setRequired(false);
        options.addOption(opt);
        return options;
    }

    private Map<MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, String groupName) {
        HashMap<MessageQueue, String> results = new HashMap<MessageQueue, String>();
        try {
            ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
            for (Connection connection : consumerConnection.getConnectionSet()) {
                String clientId = connection.getClientId();
                ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
                for (MessageQueue messageQueue : consumerRunningInfo.getMqTable().keySet()) {
                    results.put(messageQueue, clientId.split("@")[0]);
                }
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        return results;
    }

    @Override
    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
        block18: {
            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
            defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
            try {
                defaultMQAdminExt.start();
                if (commandLine.hasOption('g')) {
                    String consumerGroup = commandLine.getOptionValue('g').trim();
                    ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
                    LinkedList mqList = new LinkedList();
                    mqList.addAll(consumeStats.getOffsetTable().keySet());
                    Collections.sort(mqList);
                    Map<MessageQueue, String> messageQueueAllocationResult = this.getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
                    System.out.printf("%-32s  %-32s  %-4s  %-20s  %-20s  %-20s %-20s  %s%n", "#Topic", "#Broker Name", "#QID", "#Broker Offset", "#Consumer Offset", "#Client IP", "#Diff", "#LastTime");
                    long diffTotal = 0L;
                    for (MessageQueue mq : mqList) {
                        OffsetWrapper offsetWrapper = (OffsetWrapper)consumeStats.getOffsetTable().get(mq);
                        long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
                        diffTotal += diff;
                        String lastTime = "";
                        try {
                            lastTime = UtilAll.formatDate((Date)new Date(offsetWrapper.getLastTimestamp()), (String)"yyyy-MM-dd HH:mm:ss");
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        String clientIP = messageQueueAllocationResult.get(mq);
                        System.out.printf("%-32s  %-32s  %-4d  %-20d  %-20d  %-20s %-20d  %s%n", UtilAll.frontStringAtLeast((String)mq.getTopic(), (int)32), UtilAll.frontStringAtLeast((String)mq.getBrokerName(), (int)32), mq.getQueueId(), offsetWrapper.getBrokerOffset(), offsetWrapper.getConsumerOffset(), null != clientIP ? clientIP : "NA", diff, lastTime);
                    }
                    System.out.printf("%n", new Object[0]);
                    System.out.printf("Consume TPS: %s%n", consumeStats.getConsumeTps());
                    System.out.printf("Diff Total: %d%n", diffTotal);
                    break block18;
                }
                System.out.printf("%-32s  %-6s  %-24s %-5s  %-14s  %-7s  %s%n", "#Group", "#Count", "#Version", "#Type", "#Model", "#TPS", "#Diff Total");
                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
                for (String topic : topicList.getTopicList()) {
                    if (!topic.startsWith("%RETRY%")) continue;
                    String consumerGroup = topic.substring("%RETRY%".length());
                    try {
                        ConsumeStats consumeStats = null;
                        try {
                            consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
                        }
                        catch (Exception e) {
                            this.log.warn("examineConsumeStats exception, " + consumerGroup, (Throwable)e);
                        }
                        ConsumerConnection cc = null;
                        try {
                            cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
                        }
                        catch (Exception e) {
                            this.log.warn("examineConsumerConnectionInfo exception, " + consumerGroup, (Throwable)e);
                        }
                        GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
                        groupConsumeInfo.setGroup(consumerGroup);
                        if (consumeStats != null) {
                            groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
                            groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
                        }
                        if (cc != null) {
                            groupConsumeInfo.setCount(cc.getConnectionSet().size());
                            groupConsumeInfo.setMessageModel(cc.getMessageModel());
                            groupConsumeInfo.setConsumeType(cc.getConsumeType());
                            groupConsumeInfo.setVersion(cc.computeMinVersion());
                        }
                        System.out.printf("%-32s  %-6d  %-24s %-5s  %-14s  %-7d  %d%n", UtilAll.frontStringAtLeast((String)groupConsumeInfo.getGroup(), (int)32), groupConsumeInfo.getCount(), groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE", groupConsumeInfo.consumeTypeDesc(), groupConsumeInfo.messageModelDesc(), groupConsumeInfo.getConsumeTps(), groupConsumeInfo.getDiffTotal());
                    }
                    catch (Exception e) {
                        this.log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, (Throwable)e);
                    }
                }
            }
            catch (Exception e) {
                throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
            }
            finally {
                defaultMQAdminExt.shutdown();
            }
        }
    }
}

