/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.client.impl.consumer;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.AckCallback;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.FilterMessageContext;
import org.apache.rocketmq.client.hook.FilterMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopOrderlyService;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.PopProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PopRequest;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.remoting.protocol.body.PopProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;

public class DefaultMQPushConsumerImpl
implements MQConsumerInner {
    private long pullTimeDelayMillsWhenException = 3000L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000L;
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 15000L;
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 30000L;
    private static final Logger log = LoggerFactory.getLogger(DefaultMQPushConsumerImpl.class);
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList();
    private final RPCHook rpcHook;
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private volatile boolean pause = false;
    private boolean consumeOrderly = false;
    private MessageListener messageListenerInner;
    private OffsetStore offsetStore;
    private ConsumeMessageService consumeMessageService;
    private ConsumeMessageService consumeMessagePopService;
    private long queueFlowControlTimes = 0L;
    private long queueMaxSpanFlowControlTimes = 0L;
    private final int[] popDelayLevel = new int[]{10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
    private static final int MAX_POP_INVISIBLE_TIME = 300000;
    private static final int MIN_POP_INVISIBLE_TIME = 5000;
    private static final int ASYNC_TIMEOUT = 3000;
    private static boolean doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;

    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        this.rpcHook = rpcHook;
        this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
    }

    public void registerFilterMessageHook(FilterMessageHook hook) {
        this.filterMessageHookList.add(hook);
        log.info("register FilterMessageHook Hook, {}", (Object)hook.hookName());
    }

    public boolean hasHook() {
        return !this.consumeMessageHookList.isEmpty();
    }

    public void registerConsumeMessageHook(ConsumeMessageHook hook) {
        this.consumeMessageHookList.add(hook);
        log.info("register consumeMessageHook Hook, {}", (Object)hook.hookName());
    }

    public void executeHookBefore(ConsumeMessageContext context) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageBefore(context);
                }
                catch (Throwable e) {
                    log.warn("consumeMessageHook {} executeHookBefore exception", (Object)hook.hookName(), (Object)e);
                }
            }
        }
    }

    public void executeHookAfter(ConsumeMessageContext context) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageAfter(context);
                }
                catch (Throwable e) {
                    log.warn("consumeMessageHook {} executeHookAfter exception", (Object)hook.hookName(), (Object)e);
                }
            }
        }
    }

    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
        this.createTopic(key, newTopic, queueNum, 0);
    }

    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
        this.mQClientFactory.getMQAdminImpl().createTopic(key, newTopic, queueNum, topicSysFlag, null);
    }

    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        Set result = (Set)this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
        if (null == result) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            result = (Set)this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
        }
        if (null == result) {
            throw new MQClientException("The topic[" + topic + "] not exist", null);
        }
        return this.parseSubscribeMessageQueues(result);
    }

    public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> messageQueueList) {
        HashSet<MessageQueue> resultQueues = new HashSet<MessageQueue>();
        for (MessageQueue queue : messageQueueList) {
            String userTopic = NamespaceUtil.withoutNamespace((String)queue.getTopic(), (String)this.defaultMQPushConsumer.getNamespace());
            resultQueues.add(new MessageQueue(userTopic, queue.getBrokerName(), queue.getQueueId()));
        }
        return resultQueues;
    }

    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(mq);
    }

    public long maxOffset(MessageQueue mq) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
    }

    public long minOffset(MessageQueue mq) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().minOffset(mq);
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public void setOffsetStore(OffsetStore offsetStore) {
        this.offsetStore = offsetStore;
    }

    public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", (Object)pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        }
        catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", (Throwable)e);
            this.executePullRequestLater(pullRequest, this.pullTimeDelayMillsWhenException);
            return;
        }
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", (Object)this.defaultMQPushConsumer.getInstanceName(), (Object)this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, 1000L);
            return;
        }
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 0x100000L;
        if (cachedMessageCount > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.queueFlowControlTimes++ % 1000L == 0L) {
                log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", new Object[]{this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, this.queueFlowControlTimes});
            }
            return;
        }
        if (cachedMessageSizeInMiB > (long)this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.queueFlowControlTimes++ % 1000L == 0L) {
                log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", new Object[]{this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, this.queueFlowControlTimes});
            }
            return;
        }
        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, 50L);
                if (this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
                    log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, this.queueMaxSpanFlowControlTimes});
                }
                return;
            }
        } else if (processQueue.isLocked()) {
            if (!pullRequest.isPreviouslyLocked()) {
                long offset = -1L;
                try {
                    offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                    if (offset < 0L) {
                        throw new MQClientException(1, "Unexpected offset " + offset);
                    }
                }
                catch (Exception e) {
                    this.executePullRequestLater(pullRequest, this.pullTimeDelayMillsWhenException);
                    log.error("Failed to compute pull offset, pullResult: {}", (Object)pullRequest, (Object)e);
                    return;
                }
                boolean brokerBusy = offset < pullRequest.getNextOffset();
                log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", new Object[]{pullRequest, offset, brokerBusy});
                if (brokerBusy) {
                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", (Object)pullRequest, (Object)offset);
                }
                pullRequest.setPreviouslyLocked(true);
                pullRequest.setNextOffset(offset);
            }
        } else {
            this.executePullRequestLater(pullRequest, this.pullTimeDelayMillsWhenException);
            log.info("pull message later because not locked in broker, {}", (Object)pullRequest);
            return;
        }
        final MessageQueue messageQueue = pullRequest.getMessageQueue();
        final SubscriptionData subscriptionData = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(messageQueue.getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, this.pullTimeDelayMillsWhenException);
            log.warn("find the consumer's subscription failed, {}", (Object)pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback(){

            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND: {
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() >= prevRequestOffset && firstMsgOffset >= prevRequestOffset) break;
                            log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", new Object[]{pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset});
                            break;
                        }
                        case NO_NEW_MSG: 
                        case NO_MATCHED_MSG: {
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        }
                        case OFFSET_ILLEGAL: {
                            log.warn("the pull request offset illegal, {} {}", (Object)pullRequest.toString(), (Object)pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTask(new Runnable(){

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateAndFreezeOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset());
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.getmQClientFactory().rebalanceImmediately();
                                        log.warn("fix the pull request offset, {}", (Object)pullRequest);
                                    }
                                    catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            });
                            break;
                        }
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
                    if (e instanceof MQBrokerException && ((MQBrokerException)e).getResponseCode() == 25) {
                        log.warn("the subscription is not latest, group={}, messageQueue={}", (Object)DefaultMQPushConsumerImpl.this.groupName(), (Object)messageQueue);
                    } else {
                        log.warn("execute the pull request exception, group={}, messageQueue={}", new Object[]{DefaultMQPushConsumerImpl.this.groupName(), messageQueue, e});
                    }
                }
                if (e instanceof MQBrokerException && ((MQBrokerException)e).getResponseCode() == 215) {
                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 20L);
                } else {
                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.pullTimeDelayMillsWhenException);
                }
            }
        };
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel() && (commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY)) > 0L) {
            commitOffsetEnable = true;
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag((boolean)commitOffsetEnable, (boolean)true, (subExpression != null ? 1 : 0) != 0, (boolean)classFilter);
        try {
            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), this.defaultMQPushConsumer.getPullBatchSizeInBytes(), sysFlag, commitOffsetValue, 15000L, 30000L, CommunicationMode.ASYNC, pullCallback);
        }
        catch (Exception e) {
            log.error("pullKernelImpl exception", (Throwable)e);
            this.executePullRequestLater(pullRequest, this.pullTimeDelayMillsWhenException);
        }
    }

    void popMessage(final PopRequest popRequest) {
        final PopProcessQueue processQueue = popRequest.getPopProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pop request[{}] is dropped.", (Object)popRequest.toString());
            return;
        }
        processQueue.setLastPopTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        }
        catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", (Throwable)e);
            this.executePopPullRequestLater(popRequest, this.pullTimeDelayMillsWhenException);
            return;
        }
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", (Object)this.defaultMQPushConsumer.getInstanceName(), (Object)this.defaultMQPushConsumer.getConsumerGroup());
            this.executePopPullRequestLater(popRequest, 1000L);
            return;
        }
        if (processQueue.getWaiAckMsgCount() > this.defaultMQPushConsumer.getPopThresholdForQueue()) {
            this.executePopPullRequestLater(popRequest, 50L);
            if (this.queueFlowControlTimes++ % 1000L == 0L) {
                log.warn("the messages waiting to ack exceeds the threshold {}, so do flow control, popRequest={}, flowControlTimes={}, wait count={}", new Object[]{this.defaultMQPushConsumer.getPopThresholdForQueue(), popRequest, this.queueFlowControlTimes, processQueue.getWaiAckMsgCount()});
            }
            return;
        }
        final SubscriptionData subscriptionData = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(popRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePopPullRequestLater(popRequest, this.pullTimeDelayMillsWhenException);
            log.warn("find the consumer's subscription failed, {}", (Object)popRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PopCallback popCallback = new PopCallback(){

            @Override
            public void onSuccess(PopResult popResult) {
                if (popResult == null) {
                    log.error("pop callback popResult is null");
                    DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
                    return;
                }
                DefaultMQPushConsumerImpl.this.processPopResult(popResult, subscriptionData);
                switch (popResult.getPopStatus()) {
                    case FOUND: {
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(popRequest.getConsumerGroup(), popRequest.getMessageQueue().getTopic(), pullRT);
                        if (popResult.getMsgFoundList() == null || popResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
                            break;
                        }
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(popRequest.getConsumerGroup(), popRequest.getMessageQueue().getTopic(), popResult.getMsgFoundList().size());
                        popRequest.getPopProcessQueue().incFoundMsg(popResult.getMsgFoundList().size());
                        DefaultMQPushConsumerImpl.this.consumeMessagePopService.submitPopConsumeRequest(popResult.getMsgFoundList(), processQueue, popRequest.getMessageQueue());
                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {
                            DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            break;
                        }
                        DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
                        break;
                    }
                    case NO_NEW_MSG: 
                    case POLLING_NOT_FOUND: {
                        DefaultMQPushConsumerImpl.this.executePopPullRequestImmediately(popRequest);
                        break;
                    }
                    case POLLING_FULL: {
                        DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, DefaultMQPushConsumerImpl.this.pullTimeDelayMillsWhenException);
                        break;
                    }
                    default: {
                        DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, DefaultMQPushConsumerImpl.this.pullTimeDelayMillsWhenException);
                    }
                }
            }

            @Override
            public void onException(Throwable e) {
                if (!popRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
                    log.warn("execute the pull request exception: {}", e);
                }
                if (e instanceof MQBrokerException && ((MQBrokerException)e).getResponseCode() == 215) {
                    DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, 20L);
                } else {
                    DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, DefaultMQPushConsumerImpl.this.pullTimeDelayMillsWhenException);
                }
            }
        };
        try {
            long invisibleTime = this.defaultMQPushConsumer.getPopInvisibleTime();
            if (invisibleTime < 5000L || invisibleTime > 300000L) {
                invisibleTime = 60000L;
            }
            this.pullAPIWrapper.popAsync(popRequest.getMessageQueue(), invisibleTime, this.defaultMQPushConsumer.getPopBatchNums(), popRequest.getConsumerGroup(), 15000L, popCallback, true, popRequest.getInitMode(), false, subscriptionData.getExpressionType(), subscriptionData.getSubString());
        }
        catch (Exception e) {
            log.error("popAsync exception", (Throwable)e);
            this.executePopPullRequestLater(popRequest, this.pullTimeDelayMillsWhenException);
        }
    }

    private PopResult processPopResult(PopResult popResult, SubscriptionData subscriptionData) {
        if (PopStatus.FOUND == popResult.getPopStatus()) {
            List<MessageExt> msgFoundList;
            List<MessageExt> msgListFilterAgain = msgFoundList = popResult.getMsgFoundList();
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode() && popResult.getMsgFoundList().size() > 0) {
                msgListFilterAgain = new ArrayList<MessageExt>(popResult.getMsgFoundList().size());
                for (MessageExt messageExt : popResult.getMsgFoundList()) {
                    if (messageExt.getTags() == null || !subscriptionData.getTagsSet().contains(messageExt.getTags())) continue;
                    msgListFilterAgain.add(messageExt);
                }
            }
            if (!this.filterMessageHookList.isEmpty()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(this.defaultMQPushConsumer.isUnitMode());
                filterMessageContext.setMsgList(msgListFilterAgain);
                if (!this.filterMessageHookList.isEmpty()) {
                    for (FilterMessageHook hook : this.filterMessageHookList) {
                        try {
                            hook.filterMessage(filterMessageContext);
                        }
                        catch (Throwable e) {
                            log.error("execute hook error. hookName={}", (Object)hook.hookName());
                        }
                    }
                }
            }
            if (msgFoundList.size() != msgListFilterAgain.size()) {
                for (MessageExt messageExt : msgFoundList) {
                    if (msgListFilterAgain.contains(messageExt)) continue;
                    this.ackAsync(messageExt, this.groupName());
                }
            }
            popResult.setMsgFoundList(msgListFilterAgain);
        }
        return popResult;
    }

    private void makeSureStateOK() throws MQClientException {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new MQClientException("The consumer service state not OK, " + this.serviceState + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
    }

    void executePullRequestLater(PullRequest pullRequest, long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);
    }

    public boolean isPause() {
        return this.pause;
    }

    public void setPause(boolean pause) {
        this.pause = pause;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.mQClientFactory.getConsumerStatsManager();
    }

    public void executePullRequestImmediately(PullRequest pullRequest) {
        this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
    }

    void executePopPullRequestLater(PopRequest pullRequest, long timeDelay) {
        this.mQClientFactory.getPullMessageService().executePopPullRequestLater(pullRequest, timeDelay);
    }

    void executePopPullRequestImmediately(PopRequest pullRequest) {
        this.mQClientFactory.getPullMessageService().executePopPullRequestImmediately(pullRequest);
    }

    private void correctTagsOffset(PullRequest pullRequest) {
        if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
            this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
        }
    }

    public void executeTaskLater(Runnable r, long timeDelay) {
        this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
    }

    public void executeTask(Runnable r) {
        this.mQClientFactory.getPullMessageService().executeTask(r);
    }

    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException {
        return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
    }

    public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException, InterruptedException {
        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
    }

    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }

    public void resume() {
        this.pause = false;
        this.doRebalance();
        log.info("resume this consumer, {}", (Object)this.defaultMQPushConsumer.getConsumerGroup());
    }

    @Deprecated
    public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.sendMessageBack(msg, delayLevel, brokerName, null);
    }

    public void sendMessageBack(MessageExt msg, int delayLevel, MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        this.sendMessageBack(msg, delayLevel, null, mq);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        boolean needRetry = true;
        try {
            if (brokerName != null && brokerName.startsWith("__syslo__") || mq != null && mq.getBrokerName().startsWith("__syslo__")) {
                needRetry = false;
                this.sendMessageBackAsNormalMessage(msg);
            } else {
                String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr((SocketAddress)msg.getStoreHost());
                this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000L, this.getMaxReconsumeTimes());
            }
        }
        catch (Throwable t) {
            log.error("Failed to send message back, consumerGroup={}, brokerName={}, mq={}, message={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), brokerName, mq, msg, t});
            if (needRetry) {
                this.sendMessageBackAsNormalMessage(msg);
            }
        }
        finally {
            msg.setTopic(NamespaceUtil.withoutNamespace((String)msg.getTopic(), (String)this.defaultMQPushConsumer.getNamespace()));
        }
    }

    private void sendMessageBackAsNormalMessage(MessageExt msg) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        Message newMsg = new Message(MixAll.getRetryTopic((String)this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
        String originMsgId = MessageAccessor.getOriginMessageId((Message)msg);
        MessageAccessor.setOriginMessageId((Message)newMsg, (String)(UtilAll.isBlank((String)originMsgId) ? msg.getMsgId() : originMsgId));
        newMsg.setFlag(msg.getFlag());
        MessageAccessor.setProperties((Message)newMsg, (Map)msg.getProperties());
        MessageAccessor.putProperty((Message)newMsg, (String)"RETRY_TOPIC", (String)msg.getTopic());
        MessageAccessor.setReconsumeTime((Message)newMsg, (String)String.valueOf(msg.getReconsumeTimes() + 1));
        MessageAccessor.setMaxReconsumeTimes((Message)newMsg, (String)String.valueOf(this.getMaxReconsumeTimes()));
        MessageAccessor.clearProperty((Message)newMsg, (String)"TRAN_MSG");
        newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    }

    void ackAsync(MessageExt message, String consumerGroup) {
        final String extraInfo = message.getProperty("POP_CK");
        try {
            FindBrokerResult findBrokerResult;
            String[] extraInfoStrs = ExtraInfoUtil.split((String)extraInfo);
            String brokerName = ExtraInfoUtil.getBrokerName((String[])extraInfoStrs);
            int queueId = ExtraInfoUtil.getQueueId((String[])extraInfoStrs);
            long queueOffset = ExtraInfoUtil.getQueueOffset((String[])extraInfoStrs);
            String topic = message.getTopic();
            String desBrokerName = brokerName;
            if (brokerName != null && brokerName.startsWith("__syslo__")) {
                desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
            }
            if (null == (findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, 0L, true))) {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, 0L, true);
            }
            if (findBrokerResult == null) {
                log.error("The broker[" + desBrokerName + "] not exist");
                return;
            }
            AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
            requestHeader.setTopic(ExtraInfoUtil.getRealTopic((String[])extraInfoStrs, (String)topic, (String)consumerGroup));
            requestHeader.setQueueId(Integer.valueOf(queueId));
            requestHeader.setOffset(Long.valueOf(queueOffset));
            requestHeader.setConsumerGroup(consumerGroup);
            requestHeader.setExtraInfo(extraInfo);
            requestHeader.setBrokerName(brokerName);
            this.mQClientFactory.getMQClientAPIImpl().ackMessageAsync(findBrokerResult.getBrokerAddr(), 3000L, new AckCallback(){

                @Override
                public void onSuccess(AckResult ackResult) {
                    if (ackResult != null && !AckStatus.OK.equals((Object)ackResult.getStatus())) {
                        log.warn("Ack message fail. ackResult: {}, extraInfo: {}", (Object)ackResult, (Object)extraInfo);
                    }
                }

                @Override
                public void onException(Throwable e) {
                    log.warn("Ack message fail. extraInfo: {}  error message: {}", (Object)extraInfo, (Object)e.toString());
                }
            }, requestHeader);
        }
        catch (Throwable t) {
            log.error("ack async error.", t);
        }
    }

    void changePopInvisibleTimeAsync(String topic, String consumerGroup, String extraInfo, long invisibleTime, AckCallback callback) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        FindBrokerResult findBrokerResult;
        String[] extraInfoStrs = ExtraInfoUtil.split((String)extraInfo);
        String brokerName = ExtraInfoUtil.getBrokerName((String[])extraInfoStrs);
        int queueId = ExtraInfoUtil.getQueueId((String[])extraInfoStrs);
        String desBrokerName = brokerName;
        if (brokerName != null && brokerName.startsWith("__syslo__")) {
            desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId)));
        }
        if (null == (findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, 0L, true))) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, 0L, true);
        }
        if (findBrokerResult != null) {
            ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader();
            requestHeader.setTopic(ExtraInfoUtil.getRealTopic((String[])extraInfoStrs, (String)topic, (String)consumerGroup));
            requestHeader.setQueueId(Integer.valueOf(queueId));
            requestHeader.setOffset(Long.valueOf(ExtraInfoUtil.getQueueOffset((String[])extraInfoStrs)));
            requestHeader.setConsumerGroup(consumerGroup);
            requestHeader.setExtraInfo(extraInfo);
            requestHeader.setInvisibleTime(Long.valueOf(invisibleTime));
            requestHeader.setBrokerName(brokerName);
            this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, 3000L, callback);
            return;
        }
        throw new MQClientException("The broker[" + desBrokerName + "] not exist", null);
    }

    public int getMaxReconsumeTimes() {
        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
            return 16;
        }
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }

    public void shutdown() {
        this.shutdown(0L);
    }

    public synchronized void shutdown(long awaitTerminateMillis) {
        switch (this.serviceState) {
            case CREATE_JUST: {
                break;
            }
            case RUNNING: {
                this.consumeMessageService.shutdown(awaitTerminateMillis);
                this.persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                log.info("the consumer [{}] shutdown OK", (Object)this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.destroy();
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                break;
            }
            case SHUTDOWN_ALREADY: {
                break;
            }
        }
    }

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST: {
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()});
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                if (this.pullAPIWrapper == null) {
                    this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
                }
                this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING: {
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        }
                        case CLUSTERING: {
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        }
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
                    this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
                    this.consumeMessagePopService = new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                this.consumeMessagePopService.start();
                boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(this.defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
                }
                this.mQClientFactory.start();
                log.info("the consumer [{}] start OK.", (Object)this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            }
            case RUNNING: 
            case SHUTDOWN_ALREADY: 
            case START_FAILED: {
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
            }
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
            this.mQClientFactory.rebalanceImmediately();
        }
    }

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
        if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
            throw new MQClientException("consumerGroup is null" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getConsumerGroup().equals("DEFAULT_CONSUMER")) {
            throw new MQClientException("consumerGroup can not equal DEFAULT_CONSUMER, please specify another one." + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (null == this.defaultMQPushConsumer.getMessageModel()) {
            throw new MQClientException("messageModel is null" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
            throw new MQClientException("consumeFromWhere is null" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        Date dt = UtilAll.parseDate((String)this.defaultMQPushConsumer.getConsumeTimestamp(), (String)"yyyyMMddHHmmss");
        if (null == dt) {
            throw new MQClientException("consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received " + this.defaultMQPushConsumer.getConsumeTimestamp() + " " + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
            throw new MQClientException("allocateMessageQueueStrategy is null" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (null == this.defaultMQPushConsumer.getSubscription()) {
            throw new MQClientException("subscription is null" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (null == this.defaultMQPushConsumer.getMessageListener()) {
            throw new MQClientException("messageListener is null" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
        boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
        if (!orderly && !concurrently) {
            throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
            throw new MQClientException("consumeThreadMin Out of range [1, 1000]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
            throw new MQClientException("consumeThreadMax Out of range [1, 1000]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
            throw new MQClientException("consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")", null);
        }
        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1 || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
            throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
            throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1 && (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500)) {
            throw new MQClientException("pullThresholdForTopic Out of range [1, 6553500]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
            throw new MQClientException("pullThresholdSizeForQueue Out of range [1, 1024]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1 && (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400)) {
            throw new MQClientException("pullThresholdSizeForTopic Out of range [1, 102400]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPullInterval() < 0L || this.defaultMQPushConsumer.getPullInterval() > 65535L) {
            throw new MQClientException("pullInterval Out of range [0, 65535]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
            throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
            throw new MQClientException("pullBatchSize Out of range [1, 1024]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPopInvisibleTime() < 5000L || this.defaultMQPushConsumer.getPopInvisibleTime() > 300000L) {
            throw new MQClientException("popInvisibleTime Out of range [5000, 300000]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
        if (this.defaultMQPushConsumer.getPopBatchNums() <= 0 || this.defaultMQPushConsumer.getPopBatchNums() > 32) {
            throw new MQClientException("popBatchNums Out of range [1, 32]" + FAQUrl.suggestTodo((String)"https://rocketmq.apache.org/docs/bestPractice/06FAQ"), null);
        }
    }

    private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (Map.Entry<String, String> entry : sub.entrySet()) {
                    String topic = entry.getKey();
                    String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)topic, (String)subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }
            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING: {
                    break;
                }
                case CLUSTERING: {
                    String retryTopic = MixAll.getRetryTopic((String)this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)retryTopic, (String)"*");
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                }
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public MessageListener getMessageListenerInner() {
        return this.messageListenerInner;
    }

    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
        if (doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged) {
            return;
        }
        ConcurrentMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (Map.Entry entry : subTable.entrySet()) {
                String topic = (String)entry.getKey();
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            }
        }
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.rebalanceImpl.getSubscriptionInner();
    }

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)topic, (String)subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData((String)topic, (String)"*");
            subscriptionData.setSubString(fullClassName);
            subscriptionData.setClassFilterMode(true);
            subscriptionData.setFilterClassSource(filterClassSource);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
        try {
            if (messageSelector == null) {
                this.subscribe(topic, "*");
                return;
            }
            SubscriptionData subscriptionData = FilterAPI.build((String)topic, (String)messageSelector.getExpression(), (String)messageSelector.getExpressionType());
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

    public void suspend() {
        this.pause = true;
        log.info("suspend this consumer, {}", (Object)this.defaultMQPushConsumer.getConsumerGroup());
    }

    public void unsubscribe(String topic) {
        this.rebalanceImpl.getSubscriptionInner().remove(topic);
    }

    public void updateConsumeOffset(MessageQueue mq, long offset) {
        this.offsetStore.updateOffset(mq, offset, false);
    }

    public void updateCorePoolSize(int corePoolSize) {
        this.consumeMessageService.updateCorePoolSize(corePoolSize);
    }

    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
        return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
    }

    public RebalanceImpl getRebalanceImpl() {
        return this.rebalanceImpl;
    }

    public boolean isConsumeOrderly() {
        return this.consumeOrderly;
    }

    public void setConsumeOrderly(boolean consumeOrderly) {
        this.consumeOrderly = consumeOrderly;
    }

    public void resetOffsetByTimeStamp(long timeStamp) throws MQClientException {
        for (String topic : this.rebalanceImpl.getSubscriptionInner().keySet()) {
            Set mqs = (Set)this.rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
            if (!CollectionUtils.isNotEmpty((Collection)mqs)) continue;
            HashMap<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(mqs.size(), 1.0f);
            for (MessageQueue mq : mqs) {
                long offset = this.searchOffset(mq, timeStamp);
                offsetTable.put(mq, offset);
            }
            this.mQClientFactory.resetOffset(topic, this.groupName(), offsetTable);
        }
    }

    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
        return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
    }

    @Override
    public String groupName() {
        return this.defaultMQPushConsumer.getConsumerGroup();
    }

    @Override
    public MessageModel messageModel() {
        return this.defaultMQPushConsumer.getMessageModel();
    }

    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_PASSIVELY;
    }

    @Override
    public ConsumeFromWhere consumeFromWhere() {
        return this.defaultMQPushConsumer.getConsumeFromWhere();
    }

    @Override
    public Set<SubscriptionData> subscriptions() {
        return new HashSet<SubscriptionData>(this.rebalanceImpl.getSubscriptionInner().values());
    }

    @Override
    public void doRebalance() {
        if (!this.pause) {
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }

    @Override
    public boolean tryRebalance() {
        if (!this.pause) {
            return this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
        return false;
    }

    @Override
    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            HashSet<MessageQueue> mqs = new HashSet<MessageQueue>();
            Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            mqs.addAll(allocateMq);
            this.offsetStore.persistAll(mqs);
        }
        catch (Exception e) {
            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", (Throwable)e);
        }
    }

    @Override
    public void updateTopicSubscribeInfo(String topic, Set<MessageQueue> info) {
        ConcurrentMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
        }
    }

    @Override
    public boolean isSubscribeTopicNeedUpdate(String topic) {
        ConcurrentMap<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null && subTable.containsKey(topic)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
        }
        return false;
    }

    @Override
    public boolean isUnitMode() {
        return this.defaultMQPushConsumer.isUnitMode();
    }

    @Override
    public ConsumerRunningInfo consumerRunningInfo() {
        ConsumerRunningInfo info = new ConsumerRunningInfo();
        Properties prop = MixAll.object2Properties((Object)this.defaultMQPushConsumer);
        prop.put("PROP_CONSUMEORDERLY", String.valueOf(this.consumeOrderly));
        prop.put("PROP_THREADPOOL_CORE_SIZE", String.valueOf(this.consumeMessageService.getCorePoolSize()));
        prop.put("PROP_CONSUMER_START_TIMESTAMP", String.valueOf(this.consumerStartTimestamp));
        info.setProperties(prop);
        Set<SubscriptionData> subSet = this.subscriptions();
        info.getSubscriptionSet().addAll(subSet);
        for (Map.Entry next : this.rebalanceImpl.getProcessQueueTable().entrySet()) {
            MessageQueue mq = (MessageQueue)next.getKey();
            ProcessQueue pq = (ProcessQueue)next.getValue();
            ProcessQueueInfo pqinfo = new ProcessQueueInfo();
            pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
            pq.fillProcessQueueInfo(pqinfo);
            info.getMqTable().put(mq, pqinfo);
        }
        for (Map.Entry next : this.rebalanceImpl.getPopProcessQueueTable().entrySet()) {
            MessageQueue mq = (MessageQueue)next.getKey();
            PopProcessQueue pq = (PopProcessQueue)next.getValue();
            PopProcessQueueInfo pqinfo = new PopProcessQueueInfo();
            pq.fillPopProcessQueueInfo(pqinfo);
            info.getMqPopTable().put(mq, pqinfo);
        }
        for (SubscriptionData sd : subSet) {
            ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());
            info.getStatusTable().put(sd.getTopic(), consumeStatus);
        }
        return info;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientFactory) {
        this.mQClientFactory = mQClientFactory;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }

    @Deprecated
    public synchronized void setServiceState(ServiceState serviceState) {
        this.serviceState = serviceState;
    }

    public void adjustThreadPool() {
        long computeAccTotal = this.computeAccumulationTotal();
        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
        long incThreshold = (long)((double)adjustThreadPoolNumsThreshold * 1.0);
        long decThreshold = (long)((double)adjustThreadPoolNumsThreshold * 0.8);
        if (computeAccTotal >= incThreshold) {
            this.consumeMessageService.incCorePoolSize();
        }
        if (computeAccTotal < decThreshold) {
            this.consumeMessageService.decCorePoolSize();
        }
    }

    private long computeAccumulationTotal() {
        long msgAccTotal = 0L;
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
        for (Map.Entry next : processQueueTable.entrySet()) {
            ProcessQueue value = (ProcessQueue)next.getValue();
            msgAccTotal += value.getMsgAccCnt();
        }
        return msgAccTotal;
    }

    public List<QueueTimeSpan> queryConsumeTimeSpan(String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
        ArrayList<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
        TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000L);
        for (BrokerData brokerData : routeData.getBrokerDatas()) {
            String addr = brokerData.selectBrokerAddr();
            queueTimeSpan.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, this.groupName(), 3000L));
        }
        return queueTimeSpan;
    }

    public void tryResetPopRetryTopic(List<MessageExt> msgs, String consumerGroup) {
        String popRetryPrefix = "%RETRY%" + consumerGroup + "_";
        for (MessageExt msg : msgs) {
            String normalTopic;
            if (!msg.getTopic().startsWith(popRetryPrefix) || (normalTopic = KeyBuilder.parseNormalTopic((String)msg.getTopic(), (String)consumerGroup)) == null || normalTopic.isEmpty()) continue;
            msg.setTopic(normalTopic);
        }
    }

    public void resetRetryAndNamespace(List<MessageExt> msgs, String consumerGroup) {
        String groupTopic = MixAll.getRetryTopic((String)consumerGroup);
        for (MessageExt msg : msgs) {
            String retryTopic = msg.getProperty("RETRY_TOPIC");
            if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
                msg.setTopic(retryTopic);
            }
            if (!StringUtils.isNotEmpty((CharSequence)this.defaultMQPushConsumer.getNamespace())) continue;
            msg.setTopic(NamespaceUtil.withoutNamespace((String)msg.getTopic(), (String)this.defaultMQPushConsumer.getNamespace()));
        }
    }

    public ConsumeMessageService getConsumeMessageService() {
        return this.consumeMessageService;
    }

    public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
        this.consumeMessageService = consumeMessageService;
    }

    public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
        this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
    }

    int[] getPopDelayLevel() {
        return this.popDelayLevel;
    }

    public MessageQueueListener getMessageQueueListener() {
        if (null == this.defaultMQPushConsumer) {
            return null;
        }
        return this.defaultMQPushConsumer.getMessageQueueListener();
    }
}

