/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.dataproxy.sink.mq;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.sink.mq.CacheClusterSelector;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueClusterProducer;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageQueueZoneProducer {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueZoneProducer.class);
    private static final long MAX_RESERVED_TIME = 60000L;
    private final MessageQueueZoneSink zoneSink;
    private final MessageQueueZoneSinkContext context;
    private final CacheClusterSelector cacheClusterSelector;
    private final AtomicInteger clusterIndex = new AtomicInteger(0);
    private List<String> currentClusterNames = new ArrayList<String>();
    private final ConcurrentHashMap<String, Long> usingTimeMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, MessageQueueClusterProducer> usingClusterMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, MessageQueueClusterProducer> deletingClusterMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, Long> deletingTimeMap = new ConcurrentHashMap();
    private final Set<String> lastRefreshTopics = new HashSet<String>();

    public MessageQueueZoneProducer(MessageQueueZoneSink zoneSink, MessageQueueZoneSinkContext context) {
        this.zoneSink = zoneSink;
        this.context = context;
        this.cacheClusterSelector = context.createCacheClusterSelector();
    }

    public void start() {
        try {
            logger.info("{} start MessageQueueZoneProducer", (Object)this.zoneSink.getName());
            this.reloadMetaConfig();
        }
        catch (Exception e) {
            logger.error(e.getMessage(), (Throwable)e);
        }
    }

    public void close() {
        for (MessageQueueClusterProducer clusterProducer : this.deletingClusterMap.values()) {
            if (clusterProducer == null) continue;
            clusterProducer.stop();
        }
        for (MessageQueueClusterProducer clusterProducer : this.usingClusterMap.values()) {
            if (clusterProducer == null) continue;
            clusterProducer.stop();
        }
        this.deletingClusterMap.clear();
        this.deletingTimeMap.clear();
        this.usingClusterMap.clear();
        this.usingTimeMap.clear();
    }

    public void reloadMetaConfig() {
        this.checkAndReloadClusterInfo();
        this.checkAndPublishTopics();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void clearExpiredProducers() {
        if (this.deletingClusterMap.isEmpty()) {
            return;
        }
        HashSet<String> expired = new HashSet<String>();
        ConcurrentHashMap<String, MessageQueueClusterProducer> concurrentHashMap = this.deletingClusterMap;
        synchronized (concurrentHashMap) {
            long curTime = System.currentTimeMillis();
            for (Map.Entry<String, Long> entry : this.deletingTimeMap.entrySet()) {
                if (entry == null || entry.getKey() == null || entry.getValue() == null || curTime - entry.getValue() < 60000L) continue;
                expired.add(entry.getKey());
            }
            if (expired.isEmpty()) {
                return;
            }
            for (String clusterName : expired) {
                this.deletingTimeMap.remove(clusterName);
                MessageQueueClusterProducer tmpProducer = this.deletingClusterMap.remove(clusterName);
                if (tmpProducer == null) continue;
                tmpProducer.stop();
            }
        }
        logger.info("{} cleared expired cluster producer {}", (Object)this.zoneSink.getName(), expired);
    }

    public boolean send(PackProfile profile) {
        MessageQueueClusterProducer clusterProducer;
        while (true) {
            List<String> tmpClusters;
            if ((tmpClusters = this.currentClusterNames) == null || tmpClusters.isEmpty()) {
                this.context.fileMetricIncSumStats("sink.cluster.empty");
                this.sleepSomeTime(100L);
                continue;
            }
            String clusterName = tmpClusters.get(Math.abs(this.clusterIndex.getAndIncrement()) % tmpClusters.size());
            if (clusterName == null) {
                this.context.fileMetricIncSumStats("sink.cluster.unmatched");
                this.sleepSomeTime(100L);
                continue;
            }
            clusterProducer = this.usingClusterMap.get(clusterName);
            if (clusterProducer != null) break;
            this.context.fileMetricIncWithDetailStats("sink.cluster.producer.null", clusterName);
            this.sleepSomeTime(100L);
        }
        return clusterProducer.send(profile);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndReloadClusterInfo() {
        try {
            MessageQueueClusterProducer tmpProducer;
            List<CacheClusterConfig> allConfigList = ConfigManager.getInstance().getCachedCLusterConfig();
            List<CacheClusterConfig> newConfigList = this.cacheClusterSelector.select(allConfigList);
            if (newConfigList == null || newConfigList.size() == 0) {
                return;
            }
            boolean changed = false;
            ArrayList<String> lastClusterNames = new ArrayList<String>();
            ArrayList<CacheClusterConfig> addedItems = new ArrayList<CacheClusterConfig>();
            ConcurrentHashMap<String, MessageQueueClusterProducer> concurrentHashMap = this.deletingClusterMap;
            synchronized (concurrentHashMap) {
                for (CacheClusterConfig cacheClusterConfig : newConfigList) {
                    if (cacheClusterConfig == null) continue;
                    if (this.usingTimeMap.containsKey(cacheClusterConfig.getClusterName())) {
                        lastClusterNames.add(cacheClusterConfig.getClusterName());
                        continue;
                    }
                    if (this.deletingTimeMap.containsKey(cacheClusterConfig.getClusterName())) {
                        this.deletingTimeMap.remove(cacheClusterConfig.getClusterName());
                        tmpProducer = this.deletingClusterMap.remove(cacheClusterConfig.getClusterName());
                        if (tmpProducer == null) {
                            addedItems.add(cacheClusterConfig);
                            continue;
                        }
                        this.usingClusterMap.put(cacheClusterConfig.getClusterName(), tmpProducer);
                        this.usingTimeMap.put(cacheClusterConfig.getClusterName(), System.currentTimeMillis());
                        lastClusterNames.add(cacheClusterConfig.getClusterName());
                        continue;
                    }
                    addedItems.add(cacheClusterConfig);
                }
            }
            if (!addedItems.isEmpty()) {
                changed = true;
                long curTime = System.currentTimeMillis();
                for (CacheClusterConfig cacheClusterConfig : addedItems) {
                    if (cacheClusterConfig == null) continue;
                    MessageQueueClusterProducer tmpCluster = new MessageQueueClusterProducer(this.zoneSink.getName(), cacheClusterConfig, this.context);
                    tmpCluster.start();
                    this.usingClusterMap.put(cacheClusterConfig.getClusterName(), tmpCluster);
                    this.usingTimeMap.put(cacheClusterConfig.getClusterName(), curTime);
                    lastClusterNames.add(cacheClusterConfig.getClusterName());
                }
            }
            if (!lastClusterNames.equals(this.currentClusterNames)) {
                this.currentClusterNames = lastClusterNames;
                changed = true;
            }
            HashSet<String> needRmvs = new HashSet<String>();
            ConcurrentHashMap<String, MessageQueueClusterProducer> concurrentHashMap2 = this.deletingClusterMap;
            synchronized (concurrentHashMap2) {
                for (Map.Entry<String, MessageQueueClusterProducer> entry : this.usingClusterMap.entrySet()) {
                    if (entry == null || entry.getKey() == null || entry.getValue() == null || lastClusterNames.contains(entry.getKey())) continue;
                    needRmvs.add(entry.getKey());
                }
                if (!needRmvs.isEmpty()) {
                    changed = true;
                    long l = System.currentTimeMillis();
                    for (String clusterName : needRmvs) {
                        tmpProducer = this.usingClusterMap.remove(clusterName);
                        this.usingTimeMap.remove(clusterName);
                        if (tmpProducer == null) continue;
                        this.deletingClusterMap.put(clusterName, tmpProducer);
                        this.deletingTimeMap.put(clusterName, l);
                    }
                }
            }
            if (!changed) {
                return;
            }
            if (this.zoneSink.isMqClusterStarted()) {
                logger.info("{} reload cluster info, current cluster are {}, removed {}, created {}", new Object[]{this.zoneSink.getName(), lastClusterNames, needRmvs, addedItems});
            } else {
                this.zoneSink.setMQClusterStarted();
                ConfigManager.getInstance().setMqClusterReady();
                logger.info("{} reload cluster info, and updated sink status, current cluster are {}, removed {}, created {}", new Object[]{this.zoneSink.getName(), lastClusterNames, needRmvs, addedItems});
            }
        }
        catch (Throwable e) {
            logger.error("{} reload cluster info failure", (Object)this.zoneSink.getName(), (Object)e);
        }
    }

    private void checkAndPublishTopics() {
        Set<String> curTopicSet = ConfigManager.getInstance().getAllTopicNames();
        if (curTopicSet.isEmpty() || this.lastRefreshTopics.equals(curTopicSet)) {
            return;
        }
        logger.info("{} reload topics changed, current topics are {}, last topics are {}", new Object[]{this.zoneSink.getName(), curTopicSet, this.lastRefreshTopics});
        this.lastRefreshTopics.addAll(curTopicSet);
        for (MessageQueueClusterProducer clusterProducer : this.usingClusterMap.values()) {
            if (clusterProducer == null) continue;
            clusterProducer.publishTopic(curTopicSet);
        }
    }

    private void sleepSomeTime(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }
}

