/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.sdk.dataproxy;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseMsgSenderFactory {
    private static final Logger logger = LoggerFactory.getLogger(BaseMsgSenderFactory.class);
    private static final LogCounter exptCounter = new LogCounter(10L, 100000L, 60000L);
    private final MsgSenderFactory msgSenderFactory;
    private final String factoryNo;
    private final PkgCacheQuota pkgCacheQuota;
    private final ReentrantReadWriteLock senderCacheLock = new ReentrantReadWriteLock();
    private final ConcurrentHashMap<String, BaseSender> groupIdSenderMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, BaseSender> clusterIdSenderMap = new ConcurrentHashMap();

    protected BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String factoryNo, int factoryPkgCntPermits, int factoryPkgSizeKbPermits) {
        this.msgSenderFactory = msgSenderFactory;
        this.factoryNo = factoryNo;
        this.pkgCacheQuota = new PkgCacheQuota(true, factoryNo, factoryPkgCntPermits, factoryPkgSizeKbPermits, 200);
        logger.info("MsgSenderFactory({}) started, factoryPkgCntPermits={}, factoryPkgSizeKbPermits={}", new Object[]{this.factoryNo, factoryPkgCntPermits, factoryPkgSizeKbPermits});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        long startTime = System.currentTimeMillis();
        logger.info("MsgSenderFactory({}) is closing", (Object)this.factoryNo);
        this.senderCacheLock.writeLock().lock();
        try {
            int totalSenderCnt = this.releaseAllGroupIdSenders(this.groupIdSenderMap);
        }
        finally {
            this.senderCacheLock.writeLock().unlock();
        }
        logger.info("MsgSenderFactory({}) closed, release {} inlong senders, cost {} ms", new Object[]{this.factoryNo, totalSenderCnt += this.releaseAllClusterIdSenders(this.clusterIdSenderMap), System.currentTimeMillis() - startTime});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeClient(BaseSender msgSender) {
        boolean removed;
        if (msgSender == null || msgSender.getSenderFactory() == null || msgSender.getSenderFactory() != this.msgSenderFactory) {
            return;
        }
        String senderId = msgSender.getSenderId();
        this.senderCacheLock.writeLock().lock();
        try {
            removed = msgSender.getFactoryClusterIdKey() == null ? this.removeGroupIdSender(msgSender, this.groupIdSenderMap) : this.removeClusterIdSender(msgSender, this.clusterIdSenderMap);
        }
        finally {
            this.senderCacheLock.writeLock().unlock();
        }
        if (removed) {
            logger.info("MsgSenderFactory({}) removed sender({})", (Object)this.factoryNo, (Object)senderId);
        }
    }

    public PkgCacheQuota getPkgCacheQuota() {
        return this.pkgCacheQuota;
    }

    public int getMsgSenderCount() {
        return this.groupIdSenderMap.size() + this.clusterIdSenderMap.size();
    }

    public InLongTcpMsgSender genTcpSenderByGroupId(TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException {
        this.validProxyConfigNotNull(configure);
        String metaConfigKey = configure.getGroupMetaConfigKey();
        InLongTcpMsgSender messageSender = (InLongTcpMsgSender)this.groupIdSenderMap.get(metaConfigKey);
        if (messageSender != null) {
            return messageSender;
        }
        ProcessResult procResult = new ProcessResult();
        this.qryProxyMetaConfigure(configure, procResult);
        this.senderCacheLock.writeLock().lock();
        try {
            messageSender = (InLongTcpMsgSender)this.groupIdSenderMap.get(metaConfigKey);
            if (messageSender != null) {
                InLongTcpMsgSender inLongTcpMsgSender = messageSender;
                return inLongTcpMsgSender;
            }
            messageSender = new InLongTcpMsgSender(configure, selfDefineFactory, this.msgSenderFactory, null);
            if (!messageSender.start(procResult)) {
                messageSender.close();
                throw new ProxySdkException("Failed to start groupId sender: " + procResult);
            }
            this.groupIdSenderMap.put(metaConfigKey, messageSender);
            logger.info("MsgSenderFactory({}) generated a new groupId({}) sender({})", new Object[]{this.factoryNo, metaConfigKey, messageSender.getSenderId()});
            InLongTcpMsgSender inLongTcpMsgSender = messageSender;
            return inLongTcpMsgSender;
        }
        catch (Throwable ex) {
            if (exptCounter.shouldPrint()) {
                logger.warn("MsgSenderFactory({}) build groupId sender({}) exception", new Object[]{this.factoryNo, metaConfigKey, ex});
            }
            throw new ProxySdkException("Failed to build groupId sender: " + ex.getMessage());
        }
        finally {
            this.senderCacheLock.writeLock().unlock();
        }
    }

    public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig configure) throws ProxySdkException {
        this.validProxyConfigNotNull(configure);
        String metaConfigKey = configure.getGroupMetaConfigKey();
        InLongHttpMsgSender messageSender = (InLongHttpMsgSender)this.groupIdSenderMap.get(metaConfigKey);
        if (messageSender != null) {
            return messageSender;
        }
        ProcessResult procResult = new ProcessResult();
        this.qryProxyMetaConfigure(configure, procResult);
        this.senderCacheLock.writeLock().lock();
        try {
            messageSender = (InLongHttpMsgSender)this.groupIdSenderMap.get(metaConfigKey);
            if (messageSender != null) {
                InLongHttpMsgSender inLongHttpMsgSender = messageSender;
                return inLongHttpMsgSender;
            }
            messageSender = new InLongHttpMsgSender(configure, this.msgSenderFactory, null);
            if (!messageSender.start(procResult)) {
                messageSender.close();
                throw new ProxySdkException("Failed to start groupId sender: " + procResult);
            }
            this.groupIdSenderMap.put(metaConfigKey, messageSender);
            logger.info("MsgSenderFactory({}) generated a new groupId({}) sender({})", new Object[]{this.factoryNo, metaConfigKey, messageSender.getSenderId()});
            InLongHttpMsgSender inLongHttpMsgSender = messageSender;
            return inLongHttpMsgSender;
        }
        catch (Throwable ex) {
            if (exptCounter.shouldPrint()) {
                logger.warn("MsgSenderFactory({}) build groupId sender({}) exception", new Object[]{this.factoryNo, metaConfigKey, ex});
            }
            throw new ProxySdkException("Failed to build groupId sender: " + ex.getMessage());
        }
        finally {
            this.senderCacheLock.writeLock().unlock();
        }
    }

    public InLongTcpMsgSender genTcpSenderByClusterId(TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws ProxySdkException {
        this.validProxyConfigNotNull(configure);
        ProcessResult procResult = new ProcessResult();
        ProxyConfigEntry proxyConfigEntry = this.qryProxyMetaConfigure(configure, procResult);
        String clusterIdKey = ProxyUtils.buildClusterIdKey(configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId());
        InLongTcpMsgSender messageSender = (InLongTcpMsgSender)this.clusterIdSenderMap.get(clusterIdKey);
        if (messageSender != null) {
            return messageSender;
        }
        this.senderCacheLock.writeLock().lock();
        try {
            messageSender = (InLongTcpMsgSender)this.clusterIdSenderMap.get(clusterIdKey);
            if (messageSender != null) {
                InLongTcpMsgSender inLongTcpMsgSender = messageSender;
                return inLongTcpMsgSender;
            }
            messageSender = new InLongTcpMsgSender(configure, selfDefineFactory, this.msgSenderFactory, clusterIdKey);
            if (!messageSender.start(procResult)) {
                messageSender.close();
                throw new ProxySdkException("Failed to start cluster sender: " + procResult);
            }
            this.clusterIdSenderMap.put(clusterIdKey, messageSender);
            logger.info("MsgSenderFactory({}) generated a new clusterId({}) sender({})", new Object[]{this.factoryNo, clusterIdKey, messageSender.getSenderId()});
            InLongTcpMsgSender inLongTcpMsgSender = messageSender;
            return inLongTcpMsgSender;
        }
        catch (Throwable ex) {
            if (exptCounter.shouldPrint()) {
                logger.warn("MsgSenderFactory({}) build cluster sender({}) exception", new Object[]{this.factoryNo, clusterIdKey, ex});
            }
            throw new ProxySdkException("Failed to build cluster sender: " + ex.getMessage());
        }
        finally {
            this.senderCacheLock.writeLock().unlock();
        }
    }

    public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig configure) throws ProxySdkException {
        this.validProxyConfigNotNull(configure);
        ProcessResult procResult = new ProcessResult();
        ProxyConfigEntry proxyConfigEntry = this.qryProxyMetaConfigure(configure, procResult);
        String clusterIdKey = ProxyUtils.buildClusterIdKey(configure.getDataRptProtocol(), configure.getRegionName(), proxyConfigEntry.getClusterId());
        InLongHttpMsgSender messageSender = (InLongHttpMsgSender)this.clusterIdSenderMap.get(clusterIdKey);
        if (messageSender != null) {
            return messageSender;
        }
        this.senderCacheLock.writeLock().lock();
        try {
            messageSender = (InLongHttpMsgSender)this.clusterIdSenderMap.get(clusterIdKey);
            if (messageSender != null) {
                InLongHttpMsgSender inLongHttpMsgSender = messageSender;
                return inLongHttpMsgSender;
            }
            messageSender = new InLongHttpMsgSender(configure, this.msgSenderFactory, clusterIdKey);
            if (!messageSender.start(procResult)) {
                messageSender.close();
                throw new ProxySdkException("Failed to start cluster sender: " + procResult);
            }
            this.clusterIdSenderMap.put(clusterIdKey, messageSender);
            logger.info("MsgSenderFactory({}) generated a new clusterId({}) sender({})", new Object[]{this.factoryNo, clusterIdKey, messageSender.getSenderId()});
            InLongHttpMsgSender inLongHttpMsgSender = messageSender;
            return inLongHttpMsgSender;
        }
        catch (Throwable ex) {
            if (exptCounter.shouldPrint()) {
                logger.warn("MsgSenderFactory({}) build cluster sender({}) exception", new Object[]{this.factoryNo, clusterIdKey, ex});
            }
            throw new ProxySdkException("Failed to build cluster sender: " + ex.getMessage());
        }
        finally {
            this.senderCacheLock.writeLock().unlock();
        }
    }

    private ProxyConfigEntry qryProxyMetaConfigure(ProxyClientConfig proxyConfig, ProcessResult procResult) throws ProxySdkException {
        ProxyConfigManager inlongMetaQryMgr = new ProxyConfigManager(proxyConfig);
        if (!inlongMetaQryMgr.getGroupIdConfigure(true, procResult)) {
            throw new ProxySdkException("Failed to query remote group config: " + procResult);
        }
        if (proxyConfig.isEnableReportEncrypt() && !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) {
            throw new ProxySdkException("Failed to query remote encrypt config: " + procResult);
        }
        return (ProxyConfigEntry)procResult.getRetData();
    }

    private boolean removeGroupIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
        BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
        if (tmpSender == null || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
            return false;
        }
        return senderMap.remove(msgSender.getMetaConfigKey()) != null;
    }

    private boolean removeClusterIdSender(BaseSender msgSender, Map<String, BaseSender> senderMap) {
        BaseSender tmpSender = senderMap.get(msgSender.getFactoryClusterIdKey());
        if (tmpSender == null || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
            return false;
        }
        return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
    }

    private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
        int totalSenderCnt = 0;
        for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
            block3: {
                if (entry == null || entry.getKey() == null || entry.getValue() == null) continue;
                try {
                    entry.getValue().close();
                }
                catch (Throwable ex) {
                    if (!exptCounter.shouldPrint()) break block3;
                    logger.warn("MsgSenderFactory({}) close groupId({})'s sender failure", new Object[]{this.factoryNo, entry.getKey(), ex});
                }
            }
            ++totalSenderCnt;
        }
        senderMap.clear();
        return totalSenderCnt;
    }

    private int releaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
        int totalSenderCnt = 0;
        for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
            block3: {
                if (entry == null || entry.getKey() == null || entry.getValue() == null) continue;
                try {
                    entry.getValue().close();
                }
                catch (Throwable ex) {
                    if (!exptCounter.shouldPrint()) break block3;
                    logger.warn("MsgSenderFactory({}) close clusterId({})'s sender failure", new Object[]{this.factoryNo, entry.getKey(), ex});
                }
            }
            ++totalSenderCnt;
        }
        senderMap.clear();
        return totalSenderCnt;
    }

    private void validProxyConfigNotNull(ProxyClientConfig configure) throws ProxySdkException {
        if (configure == null) {
            throw new ProxySdkException("configure is null!");
        }
    }
}

