/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.client.impl.factory;

import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.RebalanceService;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;

public class MQClientInstance {
    private static final long LOCK_TIMEOUT_MILLIS = 3000L;
    private final InternalLogger log = ClientLogger.getLog();
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    private final ConcurrentMap<String, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    private final ConcurrentMap<String, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    private final NettyClientConfig nettyClientConfig;
    private final MQClientAPIImpl mQClientAPIImpl;
    private final MQAdminImpl mQAdminImpl;
    private final ConcurrentMap<String, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String, HashMap<String, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    private final ClientRemotingProcessor clientRemotingProcessor;
    private final PullMessageService pullMessageService;
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    private final ConsumerStatsManager consumerStatsManager;
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0L);
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private Random random = new Random();

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
        this(clientConfig, instanceIndex, clientId, null);
    }

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            this.log.info("user specified name server address: {}", (Object)this.clientConfig.getNamesrvAddr());
        }
        this.clientId = clientId;
        this.mQAdminImpl = new MQAdminImpl(this);
        this.pullMessageService = new PullMessageService(this);
        this.rebalanceService = new RebalanceService(this);
        this.defaultMQProducer = new DefaultMQProducer("CLIENT_INNER_PRODUCER");
        this.defaultMQProducer.resetClientConfig(clientConfig);
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
        this.log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", new Object[]{this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc((int)MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()});
    }

    public static TopicPublishInfo topicRouteData2TopicPublishInfo(String topic, TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            String[] brokers;
            for (String broker : brokers = route.getOrderTopicConf().split(";")) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; ++i) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }
            info.setOrderTopic(true);
        } else {
            List qds = route.getQueueDatas();
            Collections.sort(qds);
            for (QueueData qd : qds) {
                if (!PermName.isWriteable((int)qd.getPerm())) continue;
                BrokerData brokerData = null;
                for (BrokerData bd : route.getBrokerDatas()) {
                    if (!bd.getBrokerName().equals(qd.getBrokerName())) continue;
                    brokerData = bd;
                    break;
                }
                if (null == brokerData || !brokerData.getBrokerAddrs().containsKey(0L)) continue;
                for (int i = 0; i < qd.getWriteQueueNums(); ++i) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    info.getMessageQueueList().add(mq);
                }
            }
            info.setOrderTopic(false);
        }
        return info;
    }

    public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(String topic, TopicRouteData route) {
        HashSet<MessageQueue> mqList = new HashSet<MessageQueue>();
        List qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            if (!PermName.isReadable((int)qd.getPerm())) continue;
            for (int i = 0; i < qd.getReadQueueNums(); ++i) {
                MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                mqList.add(mq);
            }
        }
        return mqList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws MQClientException {
        MQClientInstance mQClientInstance = this;
        synchronized (mQClientInstance) {
            switch (this.serviceState) {
                case CREATE_JUST: {
                    this.serviceState = ServiceState.START_FAILED;
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    this.mQClientAPIImpl.start();
                    this.startScheduledTask();
                    this.pullMessageService.start();
                    this.rebalanceService.start();
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    this.log.info("the client factory [{}] start OK", (Object)this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                }
                case START_FAILED: {
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                }
            }
        }
    }

    private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    catch (Exception e) {
                        MQClientInstance.this.log.error("ScheduledTask fetchNameServerAddr exception", (Throwable)e);
                    }
                }
            }, 10000L, 120000L, TimeUnit.MILLISECONDS);
        }
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", (Throwable)e);
                }
            }
        }, 10L, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask sendHeartbeatToAllBroker exception", (Throwable)e);
                }
            }
        }, 1000L, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask persistAllConsumerOffset exception", (Throwable)e);
                }
            }
        }, 10000L, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                }
                catch (Exception e) {
                    MQClientInstance.this.log.error("ScheduledTask adjustThreadPool exception", (Throwable)e);
                }
            }
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    public String getClientId() {
        return this.clientId;
    }

    public void updateTopicRouteInfoFromNameServer() {
        Object impl;
        HashSet<String> topicList = new HashSet<String>();
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            Set<SubscriptionData> subList;
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null || (subList = impl.subscriptions()) == null) continue;
            for (SubscriptionData subData : subList) {
                topicList.add(subData.getTopic());
            }
        }
        for (Map.Entry entry : this.producerTable.entrySet()) {
            impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            Set<String> lst = impl.getPublishTopicList();
            topicList.addAll(lst);
        }
        for (String topic : topicList) {
            this.updateTopicRouteInfoFromNameServer(topic);
        }
    }

    public Map<MessageQueue, Long> parseOffsetTableFromBroker(Map<MessageQueue, Long> offsetTable, String namespace) {
        HashMap<MessageQueue, Long> newOffsetTable = new HashMap<MessageQueue, Long>();
        if (StringUtils.isNotEmpty((CharSequence)namespace)) {
            for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
                MessageQueue queue = entry.getKey();
                queue.setTopic(NamespaceUtil.withoutNamespace((String)queue.getTopic(), (String)namespace));
                newOffsetTable.put(queue, entry.getValue());
            }
        } else {
            newOffsetTable.putAll(offsetTable);
        }
        return newOffsetTable;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanOfflineBroker() {
        block9: {
            try {
                if (!this.lockNamesrv.tryLock(3000L, TimeUnit.MILLISECONDS)) break block9;
                try {
                    ConcurrentHashMap updatedTable = new ConcurrentHashMap();
                    Iterator itBrokerTable = this.brokerAddrTable.entrySet().iterator();
                    while (itBrokerTable.hasNext()) {
                        Map.Entry entry = itBrokerTable.next();
                        String brokerName = (String)entry.getKey();
                        HashMap oneTable = (HashMap)entry.getValue();
                        HashMap cloneAddrTable = new HashMap();
                        cloneAddrTable.putAll(oneTable);
                        Iterator it = cloneAddrTable.entrySet().iterator();
                        while (it.hasNext()) {
                            Map.Entry ee = it.next();
                            String addr = (String)ee.getValue();
                            if (this.isBrokerAddrExistInTopicRouteTable(addr)) continue;
                            it.remove();
                            this.log.info("the broker addr[{} {}] is offline, remove it", (Object)brokerName, (Object)addr);
                        }
                        if (cloneAddrTable.isEmpty()) {
                            itBrokerTable.remove();
                            this.log.info("the broker[{}] name's host is offline, remove it", (Object)brokerName);
                            continue;
                        }
                        updatedTable.put(brokerName, cloneAddrTable);
                    }
                    if (!updatedTable.isEmpty()) {
                        this.brokerAddrTable.putAll(updatedTable);
                    }
                }
                finally {
                    this.lockNamesrv.unlock();
                }
            }
            catch (InterruptedException e) {
                this.log.warn("cleanOfflineBroker Exception", (Throwable)e);
            }
        }
    }

    public void checkClientInBroker() throws MQClientException {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            Set<SubscriptionData> subscriptionInner = ((MQConsumerInner)entry.getValue()).subscriptions();
            if (subscriptionInner == null || subscriptionInner.isEmpty()) {
                return;
            }
            for (SubscriptionData subscriptionData : subscriptionInner) {
                String addr;
                if (ExpressionType.isTagType((String)subscriptionData.getExpressionType()) || (addr = this.findBrokerAddrByTopic(subscriptionData.getTopic())) == null) continue;
                try {
                    this.getMQClientAPIImpl().checkClientInBroker(addr, (String)entry.getKey(), this.clientId, subscriptionData, 3000L);
                }
                catch (Exception e) {
                    if (e instanceof MQClientException) {
                        throw (MQClientException)e;
                    }
                    throw new MQClientException("Check client in broker error, maybe because you use " + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!This error would not affect the launch of consumer, but may has impact on message receiving if you have use the new features which are not supported by server, please check the log!", e);
                }
            }
        }
    }

    public void sendHeartbeatToAllBrokerWithLock() {
        if (this.lockHeartbeat.tryLock()) {
            try {
                this.sendHeartbeatToAllBroker();
                this.uploadFilterClassSource();
            }
            catch (Exception e) {
                this.log.error("sendHeartbeatToAllBroker exception", (Throwable)e);
            }
            finally {
                this.lockHeartbeat.unlock();
            }
        } else {
            this.log.warn("lock heartBeat, but failed.");
        }
    }

    private void persistAllConsumerOffset() {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            impl.persistConsumerOffset();
        }
    }

    public void adjustThreadPool() {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            try {
                if (!(impl instanceof DefaultMQPushConsumerImpl)) continue;
                DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl)impl;
                dmq.adjustThreadPool();
            }
            catch (Exception exception) {}
        }
    }

    public boolean updateTopicRouteInfoFromNameServer(String topic) {
        return this.updateTopicRouteInfoFromNameServer(topic, false, null);
    }

    private boolean isBrokerAddrExistInTopicRouteTable(String addr) {
        for (Map.Entry entry : this.topicRouteTable.entrySet()) {
            TopicRouteData topicRouteData = (TopicRouteData)entry.getValue();
            List bds = topicRouteData.getBrokerDatas();
            for (BrokerData bd : bds) {
                boolean exist;
                if (bd.getBrokerAddrs() == null || !(exist = bd.getBrokerAddrs().containsValue(addr))) continue;
                return true;
            }
        }
        return false;
    }

    private void sendHeartbeatToAllBroker() {
        HeartbeatData heartbeatData = this.prepareHeartbeatData();
        boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
        boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
        if (producerEmpty && consumerEmpty) {
            this.log.warn("sending heartbeat, but no consumer and no producer");
            return;
        }
        if (!this.brokerAddrTable.isEmpty()) {
            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
            for (Map.Entry entry : this.brokerAddrTable.entrySet()) {
                String brokerName = (String)entry.getKey();
                HashMap oneTable = (HashMap)entry.getValue();
                if (oneTable == null) continue;
                for (Map.Entry entry1 : oneTable.entrySet()) {
                    Long id = (Long)entry1.getKey();
                    String addr = (String)entry1.getValue();
                    if (addr == null || consumerEmpty && id != 0L) continue;
                    try {
                        int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000L);
                        if (!this.brokerVersionTable.containsKey(brokerName)) {
                            this.brokerVersionTable.put(brokerName, new HashMap(4));
                        }
                        ((HashMap)this.brokerVersionTable.get(brokerName)).put(addr, version);
                        if (times % 20L != 0L) continue;
                        this.log.info("send heart beat to broker[{} {} {}] success", new Object[]{brokerName, id, addr});
                        this.log.info(heartbeatData.toString());
                    }
                    catch (Exception e) {
                        if (this.isBrokerInNameServer(addr)) {
                            this.log.info("send heart beat to broker[{} {} {}] failed", new Object[]{brokerName, id, addr, e});
                            continue;
                        }
                        this.log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", new Object[]{brokerName, id, addr, e});
                    }
                }
            }
        }
    }

    private void uploadFilterClassSource() {
        for (Map.Entry next : this.consumerTable.entrySet()) {
            MQConsumerInner consumer = (MQConsumerInner)next.getValue();
            if (ConsumeType.CONSUME_PASSIVELY != consumer.consumeType()) continue;
            Set<SubscriptionData> subscriptions = consumer.subscriptions();
            for (SubscriptionData sub : subscriptions) {
                if (!sub.isClassFilterMode() || sub.getFilterClassSource() == null) continue;
                String consumerGroup = consumer.groupName();
                String className = sub.getSubString();
                String topic = sub.getTopic();
                String filterClassSource = sub.getFilterClassSource();
                try {
                    this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
                }
                catch (Exception e) {
                    this.log.error("uploadFilterClassToAllFilterServer Exception", (Throwable)e);
                }
            }
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean updateTopicRouteInfoFromNameServer(String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (!this.lockNamesrv.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                this.log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", (Object)3000L);
                return false;
            }
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 3000L);
                    if (topicRouteData != null) {
                        for (QueueData data : topicRouteData.getQueueDatas()) {
                            int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000L);
                }
                if (topicRouteData != null) {
                    Object impl;
                    TopicRouteData old = (TopicRouteData)this.topicRouteTable.get(topic);
                    boolean changed = this.topicRouteDataIsChange(old, topicRouteData);
                    if (!changed) {
                        changed = this.isNeedUpdateTopicRouteInfo(topic);
                    } else {
                        this.log.info("the topic[{}] route info changed, old[{}] ,new[{}]", new Object[]{topic, old, topicRouteData});
                    }
                    if (!changed) return false;
                    TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                    for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                        this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                    }
                    TopicPublishInfo publishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
                    publishInfo.setHaveTopicRouterInfo(true);
                    for (Map.Entry entry : this.producerTable.entrySet()) {
                        impl = (MQProducerInner)entry.getValue();
                        if (impl == null) continue;
                        impl.updateTopicPublishInfo(topic, publishInfo);
                    }
                    Set<MessageQueue> subscribeInfo = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                    Iterator it = this.consumerTable.entrySet().iterator();
                    while (true) {
                        Map.Entry entry;
                        if (!it.hasNext()) {
                            this.log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", (Object)topic, (Object)cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            boolean bl = true;
                            return bl;
                        }
                        entry = it.next();
                        impl = (MQConsumerInner)entry.getValue();
                        if (impl == null) continue;
                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                    }
                }
                this.log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", (Object)topic);
                return false;
            }
            catch (MQClientException e) {
                if (topic.startsWith("%RETRY%")) return false;
                this.log.warn("updateTopicRouteInfoFromNameServer Exception", (Throwable)e);
                return false;
            }
            catch (RemotingException e) {
                this.log.error("updateTopicRouteInfoFromNameServer Exception", (Throwable)e);
                throw new IllegalStateException(e);
            }
            finally {
                this.lockNamesrv.unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.warn("updateTopicRouteInfoFromNameServer Exception", (Throwable)e);
        }
        return false;
    }

    private HeartbeatData prepareHeartbeatData() {
        Object impl;
        HeartbeatData heartbeatData = new HeartbeatData();
        heartbeatData.setClientID(this.clientId);
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());
            consumerData.setConsumeType(impl.consumeType());
            consumerData.setMessageModel(impl.messageModel());
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
            consumerData.setUnitMode(impl.isUnitMode());
            heartbeatData.getConsumerDataSet().add(consumerData);
        }
        for (Map.Entry entry : this.producerTable.entrySet()) {
            impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            ProducerData producerData = new ProducerData();
            producerData.setGroupName((String)entry.getKey());
            heartbeatData.getProducerDataSet().add(producerData);
        }
        return heartbeatData;
    }

    private boolean isBrokerInNameServer(String brokerAddr) {
        for (Map.Entry itNext : this.topicRouteTable.entrySet()) {
            List brokerDatas = ((TopicRouteData)itNext.getValue()).getBrokerDatas();
            for (BrokerData bd : brokerDatas) {
                boolean contain = bd.getBrokerAddrs().containsValue(brokerAddr);
                if (!contain) continue;
                return true;
            }
        }
        return false;
    }

    @Deprecated
    private void uploadFilterClassToAllFilterServer(String consumerGroup, String fullClassName, String topic, String filterClassSource) throws UnsupportedEncodingException {
        byte[] classBody = null;
        int classCRC = 0;
        try {
            classBody = filterClassSource.getBytes("UTF-8");
            classCRC = UtilAll.crc32((byte[])classBody);
        }
        catch (Exception e1) {
            this.log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", (Object)fullClassName, (Object)RemotingHelper.exceptionSimpleDesc((Throwable)e1));
        }
        TopicRouteData topicRouteData = (TopicRouteData)this.topicRouteTable.get(topic);
        if (topicRouteData != null && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
            for (Map.Entry next : topicRouteData.getFilterServerTable().entrySet()) {
                List value = (List)next.getValue();
                for (String fsAddr : value) {
                    try {
                        this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody, 5000L);
                        this.log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", new Object[]{fsAddr, consumerGroup, topic, fullClassName});
                    }
                    catch (Exception e) {
                        this.log.error("uploadFilterClassToAllFilterServer Exception", (Throwable)e);
                    }
                }
            }
        } else {
            this.log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}", new Object[]{consumerGroup, topic, fullClassName});
        }
    }

    private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
        if (olddata == null || nowdata == null) {
            return true;
        }
        TopicRouteData old = olddata.cloneTopicRouteData();
        TopicRouteData now = nowdata.cloneTopicRouteData();
        Collections.sort(old.getQueueDatas());
        Collections.sort(old.getBrokerDatas());
        Collections.sort(now.getQueueDatas());
        Collections.sort(now.getBrokerDatas());
        return !old.equals((Object)now);
    }

    private boolean isNeedUpdateTopicRouteInfo(String topic) {
        Object impl;
        Map.Entry entry;
        boolean result = false;
        Iterator it = this.producerTable.entrySet().iterator();
        while (it.hasNext() && !result) {
            entry = it.next();
            impl = (MQProducerInner)entry.getValue();
            if (impl == null) continue;
            result = impl.isPublishTopicNeedUpdate(topic);
        }
        it = this.consumerTable.entrySet().iterator();
        while (it.hasNext() && !result) {
            entry = it.next();
            impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            result = impl.isSubscribeTopicNeedUpdate(topic);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        if (!this.consumerTable.isEmpty()) {
            return;
        }
        if (!this.adminExtTable.isEmpty()) {
            return;
        }
        if (this.producerTable.size() > 1) {
            return;
        }
        MQClientInstance mQClientInstance = this;
        synchronized (mQClientInstance) {
            switch (this.serviceState) {
                case CREATE_JUST: {
                    break;
                }
                case RUNNING: {
                    this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
                    this.serviceState = ServiceState.SHUTDOWN_ALREADY;
                    this.pullMessageService.shutdown(true);
                    this.scheduledExecutorService.shutdown();
                    this.mQClientAPIImpl.shutdown();
                    this.rebalanceService.shutdown();
                    MQClientManager.getInstance().removeClientFactory(this.clientId);
                    this.log.info("the client factory [{}] shutdown OK", (Object)this.clientId);
                    break;
                }
                case SHUTDOWN_ALREADY: {
                    break;
                }
            }
        }
    }

    public boolean registerConsumer(String group, MQConsumerInner consumer) {
        if (null == group || null == consumer) {
            return false;
        }
        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
        if (prev != null) {
            this.log.warn("the consumer group[" + group + "] exist already.");
            return false;
        }
        return true;
    }

    public void unregisterConsumer(String group) {
        this.consumerTable.remove(group);
        this.unregisterClientWithLock(null, group);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void unregisterClientWithLock(String producerGroup, String consumerGroup) {
        block8: {
            try {
                if (this.lockHeartbeat.tryLock(3000L, TimeUnit.MILLISECONDS)) {
                    try {
                        this.unregisterClient(producerGroup, consumerGroup);
                        break block8;
                    }
                    catch (Exception e) {
                        this.log.error("unregisterClient exception", (Throwable)e);
                        break block8;
                    }
                    finally {
                        this.lockHeartbeat.unlock();
                    }
                }
                this.log.warn("lock heartBeat, but failed.");
            }
            catch (InterruptedException e) {
                this.log.warn("unregisterClientWithLock exception", (Throwable)e);
            }
        }
    }

    private void unregisterClient(String producerGroup, String consumerGroup) {
        for (Map.Entry entry : this.brokerAddrTable.entrySet()) {
            String brokerName = (String)entry.getKey();
            HashMap oneTable = (HashMap)entry.getValue();
            if (oneTable == null) continue;
            for (Map.Entry entry1 : oneTable.entrySet()) {
                String addr = (String)entry1.getValue();
                if (addr == null) continue;
                try {
                    this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000L);
                    this.log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", new Object[]{producerGroup, consumerGroup, brokerName, entry1.getKey(), addr});
                }
                catch (RemotingException e) {
                    this.log.error("unregister client exception from broker: " + addr, (Throwable)e);
                }
                catch (InterruptedException e) {
                    this.log.error("unregister client exception from broker: " + addr, (Throwable)e);
                }
                catch (MQBrokerException e) {
                    this.log.error("unregister client exception from broker: " + addr, (Throwable)e);
                }
            }
        }
    }

    public boolean registerProducer(String group, DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            this.log.warn("the producer group[{}] exist already.", (Object)group);
            return false;
        }
        return true;
    }

    public void unregisterProducer(String group) {
        this.producerTable.remove(group);
        this.unregisterClientWithLock(group, null);
    }

    public boolean registerAdminExt(String group, MQAdminExtInner admin) {
        if (null == group || null == admin) {
            return false;
        }
        MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
        if (prev != null) {
            this.log.warn("the admin group[{}] exist already.", (Object)group);
            return false;
        }
        return true;
    }

    public void unregisterAdminExt(String group) {
        this.adminExtTable.remove(group);
    }

    public void rebalanceImmediately() {
        this.rebalanceService.wakeup();
    }

    public void doRebalance() {
        for (Map.Entry entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl == null) continue;
            try {
                impl.doRebalance();
            }
            catch (Throwable e) {
                this.log.error("doRebalance exception", e);
            }
        }
    }

    public MQProducerInner selectProducer(String group) {
        return (MQProducerInner)this.producerTable.get(group);
    }

    public MQConsumerInner selectConsumer(String group) {
        return (MQConsumerInner)this.consumerTable.get(group);
    }

    public FindBrokerResult findBrokerAddressInAdmin(String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        HashMap map = (HashMap)this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry entry : map.entrySet()) {
                Long id = (Long)entry.getKey();
                brokerAddr = (String)entry.getValue();
                if (brokerAddr == null) continue;
                found = true;
                if (0L == id) {
                    slave = false;
                    break;
                }
                slave = true;
                break;
            }
        }
        if (found) {
            return new FindBrokerResult(brokerAddr, slave, this.findBrokerVersion(brokerName, brokerAddr));
        }
        return null;
    }

    public String findBrokerAddressInPublish(String brokerName) {
        HashMap map = (HashMap)this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            return (String)map.get(0L);
        }
        return null;
    }

    public FindBrokerResult findBrokerAddressInSubscribe(String brokerName, long brokerId, boolean onlyThisBroker) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;
        HashMap map = (HashMap)this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            brokerAddr = (String)map.get(brokerId);
            slave = brokerId != 0L;
            boolean bl = found = brokerAddr != null;
            if (!found && !onlyThisBroker) {
                Map.Entry entry = map.entrySet().iterator().next();
                brokerAddr = (String)entry.getValue();
                slave = (Long)entry.getKey() != 0L;
                found = true;
            }
        }
        if (found) {
            return new FindBrokerResult(brokerAddr, slave, this.findBrokerVersion(brokerName, brokerAddr));
        }
        return null;
    }

    public int findBrokerVersion(String brokerName, String brokerAddr) {
        if (this.brokerVersionTable.containsKey(brokerName) && ((HashMap)this.brokerVersionTable.get(brokerName)).containsKey(brokerAddr)) {
            return (Integer)((HashMap)this.brokerVersionTable.get(brokerName)).get(brokerAddr);
        }
        return 0;
    }

    public List<String> findConsumerIdList(String topic, String group) {
        String brokerAddr = this.findBrokerAddrByTopic(topic);
        if (null == brokerAddr) {
            this.updateTopicRouteInfoFromNameServer(topic);
            brokerAddr = this.findBrokerAddrByTopic(topic);
        }
        if (null != brokerAddr) {
            try {
                return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000L);
            }
            catch (Exception e) {
                this.log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, (Throwable)e);
            }
        }
        return null;
    }

    public String findBrokerAddrByTopic(String topic) {
        List brokers;
        TopicRouteData topicRouteData = (TopicRouteData)this.topicRouteTable.get(topic);
        if (topicRouteData != null && !(brokers = topicRouteData.getBrokerDatas()).isEmpty()) {
            int index = this.random.nextInt(brokers.size());
            BrokerData bd = (BrokerData)brokers.get(index % brokers.size());
            return bd.selectBrokerAddr();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
        DefaultMQPushConsumerImpl consumer = null;
        try {
            MQConsumerInner impl = (MQConsumerInner)this.consumerTable.get(group);
            if (impl == null || !(impl instanceof DefaultMQPushConsumerImpl)) {
                this.log.info("[reset-offset] consumer dose not exist. group={}", (Object)group);
                return;
            }
            consumer = (DefaultMQPushConsumerImpl)impl;
            consumer.suspend();
            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
            for (Map.Entry entry : processQueueTable.entrySet()) {
                MessageQueue mq = (MessageQueue)entry.getKey();
                if (!topic.equals(mq.getTopic()) || !offsetTable.containsKey(mq)) continue;
                ProcessQueue pq = (ProcessQueue)entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
            try {
                TimeUnit.SECONDS.sleep(10L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            Iterator iterator = processQueueTable.keySet().iterator();
            while (iterator.hasNext()) {
                MessageQueue mq = (MessageQueue)iterator.next();
                Long offset = offsetTable.get(mq);
                if (!topic.equals(mq.getTopic()) || offset == null) continue;
                try {
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, (ProcessQueue)processQueueTable.get(mq));
                    iterator.remove();
                }
                catch (Exception e) {
                    this.log.warn("reset offset failed. group={}, {}", new Object[]{group, mq, e});
                }
            }
        }
        finally {
            if (consumer != null) {
                consumer.resume();
            }
        }
    }

    public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
        MQConsumerInner impl = (MQConsumerInner)this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)impl;
            return consumer.getOffsetStore().cloneOffsetTable(topic);
        }
        if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
            DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl)impl;
            return consumer.getOffsetStore().cloneOffsetTable(topic);
        }
        return Collections.EMPTY_MAP;
    }

    public TopicRouteData getAnExistTopicRouteData(String topic) {
        return (TopicRouteData)this.topicRouteTable.get(topic);
    }

    public MQClientAPIImpl getMQClientAPIImpl() {
        return this.mQClientAPIImpl;
    }

    public MQAdminImpl getMQAdminImpl() {
        return this.mQAdminImpl;
    }

    public long getBootTimestamp() {
        return this.bootTimestamp;
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    public PullMessageService getPullMessageService() {
        return this.pullMessageService;
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return this.defaultMQProducer;
    }

    public ConcurrentMap<String, TopicRouteData> getTopicRouteTable() {
        return this.topicRouteTable;
    }

    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt msg, String consumerGroup, String brokerName) {
        MQConsumerInner mqConsumerInner = (MQConsumerInner)this.consumerTable.get(consumerGroup);
        if (null != mqConsumerInner) {
            DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl)mqConsumerInner;
            ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
            return result;
        }
        return null;
    }

    public ConsumerRunningInfo consumerRunningInfo(String consumerGroup) {
        MQConsumerInner mqConsumerInner = (MQConsumerInner)this.consumerTable.get(consumerGroup);
        ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
        List nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();
        StringBuilder strBuilder = new StringBuilder();
        if (nsList != null) {
            for (String addr : nsList) {
                strBuilder.append(addr).append(";");
            }
        }
        String nsAddr = strBuilder.toString();
        consumerRunningInfo.getProperties().put("PROP_NAMESERVER_ADDR", nsAddr);
        consumerRunningInfo.getProperties().put("PROP_CONSUME_TYPE", mqConsumerInner.consumeType().name());
        consumerRunningInfo.getProperties().put("PROP_CLIENT_VERSION", MQVersion.getVersionDesc((int)MQVersion.CURRENT_VERSION));
        return consumerRunningInfo;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.consumerStatsManager;
    }

    public NettyClientConfig getNettyClientConfig() {
        return this.nettyClientConfig;
    }

    public ClientConfig getClientConfig() {
        return this.clientConfig;
    }
}

