package org.joyqueue.broker.kafka.handler;

import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.cluster.ClusterNameService;
import org.joyqueue.broker.kafka.KafkaCommandType;
import org.joyqueue.broker.kafka.KafkaContext;
import org.joyqueue.broker.kafka.KafkaContextAware;
import org.joyqueue.broker.kafka.KafkaErrorCode;
import org.joyqueue.broker.kafka.command.TopicMetadataRequest;
import org.joyqueue.broker.kafka.command.TopicMetadataResponse;
import org.joyqueue.broker.kafka.config.KafkaConfig;
import org.joyqueue.broker.kafka.helper.KafkaClientHelper;
import org.joyqueue.broker.kafka.model.KafkaBroker;
import org.joyqueue.broker.kafka.model.KafkaPartitionMetadata;
import org.joyqueue.broker.kafka.model.KafkaTopicMetadata;
import org.joyqueue.domain.Broker;
import org.joyqueue.domain.Partition;
import org.joyqueue.domain.Subscription;
import org.joyqueue.domain.TopicConfig;
import org.joyqueue.domain.TopicName;
import org.joyqueue.network.transport.Transport;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.toolkit.delay.AbstractDelayedOperation;
import org.joyqueue.toolkit.delay.DelayedOperationKey;
import org.joyqueue.toolkit.delay.DelayedOperationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/handler/TopicMetadataRequestHandler.class */
public class TopicMetadataRequestHandler extends AbstractKafkaCommandHandler implements KafkaContextAware {
    protected static final Logger logger = LoggerFactory.getLogger(TopicMetadataRequestHandler.class);
    private KafkaConfig config;
    private ClusterNameService clusterNameService;
    private DelayedOperationManager delayPurgatory;
    private Cache<String, Map<String, TopicConfig>> appCache;

    @Override // org.joyqueue.broker.kafka.KafkaContextAware
    public void setKafkaContext(KafkaContext kafkaContext) {
        this.config = kafkaContext.getConfig();
        this.clusterNameService = kafkaContext.getBrokerContext().getClusterNameService();
        this.appCache = CacheBuilder.newBuilder().expireAfterWrite(this.config.getMetadataCacheExpireTime(), TimeUnit.MILLISECONDS).build();
        this.delayPurgatory = new DelayedOperationManager("kafka-metadata-delayed");
        this.delayPurgatory.start();
    }

    public Command handle(final Transport transport, final Command command) {
        TopicMetadataRequest topicMetadataRequest = (TopicMetadataRequest) command.getPayload();
        String parseClient = KafkaClientHelper.parseClient(topicMetadataRequest.getClientId());
        Map<String, TopicConfig> emptyMap = Collections.emptyMap();
        if (!CollectionUtils.isEmpty(topicMetadataRequest.getTopics()) || !StringUtils.isNotBlank(parseClient)) {
            emptyMap = getTopicConfigs(topicMetadataRequest.getTopics());
        } else if (this.config.getMetadataFuzzySearchEnable() && KafkaClientHelper.isMetadataFuzzySearch(topicMetadataRequest.getClientId())) {
            emptyMap = getAllTopicConfigs(parseClient);
        }
        List<KafkaBroker> topicBrokers = getTopicBrokers(emptyMap);
        List<KafkaTopicMetadata> topicMetadata = getTopicMetadata(topicMetadataRequest.getTopics(), emptyMap);
        TopicMetadataResponse topicMetadataResponse = new TopicMetadataResponse(topicMetadata, topicBrokers);
        final Command command2 = new Command(topicMetadataResponse);
        if (this.config.getLogDetail(parseClient)) {
            logger.info("get topic metadata, transport: {}, app: {}, request: {}, response: {}", new Object[]{transport, parseClient, topicMetadataRequest, topicMetadataResponse});
        }
        if (!CollectionUtils.isEmpty(topicMetadata) || !this.config.getMetadataDelayEnable()) {
            return command2;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("get topic metadata, topics: {}, address: {}, metadata: {}, app: {}", new Object[]{topicMetadataRequest.getTopics(), transport.remoteAddress(), JSON.toJSONString(topicMetadata), topicMetadataRequest.getClientId()});
        }
        this.delayPurgatory.tryCompleteElseWatch(new AbstractDelayedOperation(this.config.getMetadataDelay()) { // from class: org.joyqueue.broker.kafka.handler.TopicMetadataRequestHandler.1
            protected void onComplete() {
                transport.acknowledge(command, command2);
            }
        }, Sets.newHashSet(new DelayedOperationKey[]{new DelayedOperationKey(new Object[0])}));
        return null;
    }

    public int type() {
        return KafkaCommandType.METADATA.getCode();
    }

    protected Map<String, TopicConfig> getAllTopicConfigs(final String str) {
        if (!this.config.getMetadataCacheEnable()) {
            return doGetAllTopicConfigs(str);
        }
        try {
            return (Map) this.appCache.get(str, new Callable<Map<String, TopicConfig>>() { // from class: org.joyqueue.broker.kafka.handler.TopicMetadataRequestHandler.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Map<String, TopicConfig> call() throws Exception {
                    return TopicMetadataRequestHandler.this.doGetAllTopicConfigs(str);
                }
            });
        } catch (ExecutionException e) {
            logger.error("getAllTopicConfigs exception, clientId: {}", str, e);
            return Collections.emptyMap();
        }
    }

    protected Map<String, TopicConfig> doGetAllTopicConfigs(String str) {
        String[] split = str.split("\\.");
        Map topicConfigByApp = this.clusterNameService.getTopicConfigByApp(str, Subscription.Type.CONSUMPTION);
        Map topicConfigByApp2 = this.clusterNameService.getTopicConfigByApp(split[0], Subscription.Type.PRODUCTION);
        HashMap newHashMap = Maps.newHashMap();
        if (MapUtils.isNotEmpty(topicConfigByApp)) {
            for (Map.Entry entry : topicConfigByApp.entrySet()) {
                newHashMap.put(((TopicName) entry.getKey()).getFullName(), entry.getValue());
            }
        }
        if (MapUtils.isNotEmpty(topicConfigByApp2)) {
            for (Map.Entry entry2 : topicConfigByApp2.entrySet()) {
                newHashMap.put(((TopicName) entry2.getKey()).getFullName(), entry2.getValue());
            }
        }
        return newHashMap;
    }

    protected Map<String, TopicConfig> getTopicConfigs(List<String> list) {
        return this.clusterNameService.getTopicConfigs(list);
    }

    protected List<KafkaBroker> getTopicBrokers(Map<String, TopicConfig> map) {
        HashSet newHashSet = Sets.newHashSet();
        Iterator<Map.Entry<String, TopicConfig>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getValue().fetchAllBroker().entrySet().iterator();
            while (it2.hasNext()) {
                Broker broker = (Broker) ((Map.Entry) it2.next()).getValue();
                newHashSet.add(new KafkaBroker(broker.getId().intValue(), broker.getIp(), broker.getPort()));
            }
        }
        return Lists.newArrayList(newHashSet);
    }

    protected List<KafkaTopicMetadata> getTopicMetadata(List<String> list, Map<String, TopicConfig> map) {
        LinkedList newLinkedList = Lists.newLinkedList();
        if (CollectionUtils.isEmpty(list)) {
            for (Map.Entry<String, TopicConfig> entry : map.entrySet()) {
                newLinkedList.add(new KafkaTopicMetadata(entry.getKey(), getPartitionMetadata(entry.getValue()), KafkaErrorCode.NONE.getCode()));
            }
        } else {
            for (String str : list) {
                TopicConfig topicConfig = map.get(str);
                if (topicConfig != null) {
                    newLinkedList.add(new KafkaTopicMetadata(str, getPartitionMetadata(topicConfig), KafkaErrorCode.NONE.getCode()));
                } else {
                    newLinkedList.add(new KafkaTopicMetadata(str, Collections.emptyList(), KafkaErrorCode.TOPIC_AUTHORIZATION_FAILED.getCode()));
                }
            }
        }
        return newLinkedList;
    }

    protected List<KafkaPartitionMetadata> getPartitionMetadata(TopicConfig topicConfig) {
        LinkedList newLinkedList = Lists.newLinkedList();
        for (Partition partition : topicConfig.fetchPartitionMetadata()) {
            short code = KafkaErrorCode.NONE.getCode();
            KafkaBroker kafkaBroker = null;
            LinkedList newLinkedList2 = Lists.newLinkedList();
            LinkedList newLinkedList3 = Lists.newLinkedList();
            if (partition.getLeader() != null) {
                kafkaBroker = new KafkaBroker(partition.getLeader().getId().intValue(), partition.getLeader().getIp(), partition.getLeader().getPort());
            } else {
                code = KafkaErrorCode.LEADER_NOT_AVAILABLE.getCode();
            }
            if (CollectionUtils.isNotEmpty(partition.getReplicas())) {
                for (Broker broker : partition.getReplicas()) {
                    newLinkedList2.add(new KafkaBroker(broker.getId().intValue(), broker.getIp(), broker.getPort()));
                }
            }
            if (CollectionUtils.isNotEmpty(partition.getIsrs())) {
                for (Broker broker2 : partition.getIsrs()) {
                    newLinkedList3.add(new KafkaBroker(broker2.getId().intValue(), broker2.getIp(), broker2.getPort()));
                }
            }
            newLinkedList.add(new KafkaPartitionMetadata(partition.getPartitionId(), kafkaBroker, newLinkedList2, newLinkedList3, code));
        }
        return newLinkedList;
    }
}
