package com.kalvan.mq.consumer;

import cn.hutool.core.lang.Assert;
import com.alibaba.fastjson.JSON;
import com.kalvan.mq.MqCondition;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Conditional({MqCondition.class})
@Configuration
/* loaded from: input_file:com/kalvan/mq/consumer/MqConsumerManager.class */
public class MqConsumerManager implements InitializingBean, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(MqConsumerManager.class);
    private Map<String, String> topics;

    @Value("${rocketmq.namesrvAddr}")
    private String namesrvAddr;
    private ApplicationContext applicationContext;
    private Map<String, DefaultMQPushConsumer> consumerMap = new HashMap();
    private Map<String, Map<String, MsgHandler>> msgHandlerCache = new HashMap();

    private void createMqConsumer(List<MsgHandlerMetaData> list) {
        if (StringUtils.isEmpty(this.namesrvAddr)) {
            log.info("mq地址未配置，不创建消费者");
            return;
        }
        Iterator<Map.Entry<String, ConsumerConfig>> it = mergeTopic(list).entrySet().iterator();
        while (it.hasNext()) {
            ConsumerConfig value = it.next().getValue();
            log.info("创建MQ消费者：{}", JSON.toJSON(value));
            String topic = value.getTopic();
            String tag = value.getTag();
            try {
                DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(String.format("TM_%s_GROUP", topic));
                defaultMQPushConsumer.setNamesrvAddr(this.namesrvAddr);
                defaultMQPushConsumer.subscribe(topic, tag);
                defaultMQPushConsumer.setMessageModel(value.getMessageModel());
                defaultMQPushConsumer.setConsumeThreadMin(value.getThreadMin());
                defaultMQPushConsumer.setConsumeThreadMax(value.getThreadMax());
                defaultMQPushConsumer.registerMessageListener((list2, consumeConcurrentlyContext) -> {
                    if (CollectionUtils.isEmpty(list2)) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    int i = 0;
                    while (i < list2.size()) {
                        try {
                            try {
                                MessageExt messageExt = (MessageExt) list2.get(i);
                                log.debug("MQ消息：{}", JSON.toJSON(messageExt));
                                String topic2 = messageExt.getTopic();
                                String tags = messageExt.getTags();
                                String str = new String(messageExt.getBody(), StandardCharsets.UTF_8);
                                Map<String, MsgHandler> map = this.msgHandlerCache.get(topic2);
                                if (CollectionUtils.isEmpty(map)) {
                                    log.warn("主题[{}]不存在消息处理器", topic2);
                                } else {
                                    MsgHandler msgHandler = map.get("*");
                                    if (msgHandler != null) {
                                        log.warn("[{}-{}]存在“*”消息处理器", topic2, tags);
                                        if (!msgHandler.handle(str)) {
                                            ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                                            if (i < list2.size()) {
                                                consumeConcurrentlyContext.setAckIndex(i + 1);
                                            }
                                            return consumeConcurrentlyStatus;
                                        }
                                    }
                                    MsgHandler msgHandler2 = map.get(tags);
                                    if (msgHandler2 == null) {
                                        log.warn("未找到消息处理器[{}-{}]", topic2, tags);
                                    } else if (!msgHandler2.handle(str)) {
                                        ConsumeConcurrentlyStatus consumeConcurrentlyStatus2 = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                                        if (i < list2.size()) {
                                            consumeConcurrentlyContext.setAckIndex(i + 1);
                                        }
                                        return consumeConcurrentlyStatus2;
                                    }
                                }
                                i++;
                            } catch (Throwable th) {
                                log.error("消费数据异常", th);
                                ConsumeConcurrentlyStatus consumeConcurrentlyStatus3 = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                                if (i < list2.size()) {
                                    consumeConcurrentlyContext.setAckIndex(i + 1);
                                }
                                return consumeConcurrentlyStatus3;
                            }
                        } catch (Throwable th2) {
                            if (i < list2.size()) {
                                consumeConcurrentlyContext.setAckIndex(i + 1);
                            }
                            throw th2;
                        }
                    }
                    if (i < list2.size()) {
                        consumeConcurrentlyContext.setAckIndex(i + 1);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                defaultMQPushConsumer.start();
                this.consumerMap.put(topic, defaultMQPushConsumer);
            } catch (MQClientException e) {
                log.error("consumer 创建异常", e);
            }
        }
    }

    private Map<String, ConsumerConfig> mergeTopic(List<MsgHandlerMetaData> list) {
        HashMap hashMap = new HashMap(10);
        for (MsgHandlerMetaData msgHandlerMetaData : list) {
            String str = msgHandlerMetaData.topic();
            ConsumerConfig consumerConfig = (ConsumerConfig) hashMap.get(str);
            if (consumerConfig == null) {
                consumerConfig = new ConsumerConfig();
            }
            String tag = msgHandlerMetaData.tag();
            if (!StringUtils.isEmpty(tag)) {
                StringBuilder sb = new StringBuilder(consumerConfig.getTag() == null ? "" : consumerConfig.getTag());
                if (sb.length() > 0) {
                    sb.append("||");
                }
                sb.append(tag);
                consumerConfig.setTag(sb.toString());
            }
            consumerConfig.setMessageModel(msgHandlerMetaData.consumerMessageModel());
            consumerConfig.setThreadMax(msgHandlerMetaData.consumerThreadMax());
            consumerConfig.setThreadMin(msgHandlerMetaData.consumerThreadMin());
            consumerConfig.setTopic(msgHandlerMetaData.topic());
            hashMap.put(str, consumerConfig);
        }
        return hashMap;
    }

    public void afterPropertiesSet() {
        List<MsgHandlerMetaData> loadMsgHandler = loadMsgHandler();
        if (CollectionUtils.isEmpty(loadMsgHandler)) {
            return;
        }
        createMqConsumer(loadMsgHandler);
    }

    private List<MsgHandlerMetaData> loadMsgHandler() {
        ArrayList arrayList = new ArrayList();
        Assert.notNull(this.applicationContext, "applicationContext 为空，消息处理器初始化失败", new Object[0]);
        Map beansOfType = this.applicationContext.getBeansOfType(MsgHandler.class);
        if (beansOfType.isEmpty()) {
            log.warn("未发现消息处理器");
            return arrayList;
        }
        Iterator it = beansOfType.entrySet().iterator();
        while (it.hasNext()) {
            MsgHandler msgHandler = (MsgHandler) ((Map.Entry) it.next()).getValue();
            MsgHandlerMetaData msgHandlerMetaData = (MsgHandlerMetaData) msgHandler.getClass().getAnnotation(MsgHandlerMetaData.class);
            if (msgHandlerMetaData != null) {
                arrayList.add(msgHandlerMetaData);
                String str = msgHandlerMetaData.topic();
                String tag = msgHandlerMetaData.tag();
                if (StringUtils.isEmpty(str) || StringUtils.isEmpty(tag)) {
                    log.warn("消息处理器Topic、tag都不能为空[{}-{}]", str, tag);
                } else {
                    Map<String, MsgHandler> map = this.msgHandlerCache.get(str);
                    if (map == null) {
                        map = new HashMap(20);
                    }
                    for (String str2 : tag.split("\\|")) {
                        map.put(str2, msgHandler);
                    }
                    this.msgHandlerCache.put(str, map);
                }
            }
        }
        return arrayList;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @PreDestroy
    public void stop() {
        if (CollectionUtils.isEmpty(this.consumerMap)) {
            return;
        }
        Iterator<Map.Entry<String, DefaultMQPushConsumer>> it = this.consumerMap.entrySet().iterator();
        while (it.hasNext()) {
            DefaultMQPushConsumer value = it.next().getValue();
            value.shutdown();
            log.info("关闭MQ消费者：{}{", value.getInstanceName());
        }
    }

    public Map<String, String> getTopics() {
        return this.topics;
    }

    public String getNamesrvAddr() {
        return this.namesrvAddr;
    }

    public Map<String, DefaultMQPushConsumer> getConsumerMap() {
        return this.consumerMap;
    }

    public ApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    public Map<String, Map<String, MsgHandler>> getMsgHandlerCache() {
        return this.msgHandlerCache;
    }

    public void setTopics(Map<String, String> map) {
        this.topics = map;
    }

    public void setNamesrvAddr(String str) {
        this.namesrvAddr = str;
    }

    public void setConsumerMap(Map<String, DefaultMQPushConsumer> map) {
        this.consumerMap = map;
    }

    public void setMsgHandlerCache(Map<String, Map<String, MsgHandler>> map) {
        this.msgHandlerCache = map;
    }
}
