/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.agent.plugin.sinks.dataproxy;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.AgentStatusManager;
import org.apache.inlong.agent.message.file.SenderMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricItemSet;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Sender {
    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
    private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
    public static final int RESEND_QUEUE_WAIT_MS = 10;
    private List<TcpMsgSender> senders = new ArrayList<TcpMsgSender>();
    private AtomicLong senderIndex = new AtomicLong(0L);
    private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), (ThreadFactory)new AgentThreadFactory("sender-manager"));
    private ThreadFactory SHARED_FACTORY;
    private static final AtomicLong METRIC_INDEX = new AtomicLong(0L);
    private final String managerAddr;
    private final int totalAsyncBufSize;
    private final int aliveConnectionNum;
    private final boolean isCompress;
    private final int msgType;
    private final long maxSenderTimeout;
    private final int maxSenderRetry;
    private final long retrySleepTime;
    private final String inlongGroupId;
    private final int maxSenderPerGroup;
    private final String sourcePath;
    private final boolean proxySend;
    private volatile boolean shutdown = false;
    private AgentMetricItemSet metricItemSet;
    private Map<String, String> dimensions;
    private int ioThreadNum;
    private boolean enableBusyWait;
    private String authSecretId;
    private String authSecretKey;
    protected int batchFlushInterval;
    protected InstanceProfile profile;
    private volatile boolean resendRunning = false;
    private volatile boolean started = false;
    private static final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
    private long auditVersion;

    public Sender(InstanceProfile profile, String inlongGroupId, String sourcePath) {
        this.profile = profile;
        this.auditVersion = Long.parseLong(profile.get("task.auditVersion"));
        this.managerAddr = agentConf.get("agent.manager.addr");
        this.proxySend = profile.getBoolean("task.proxySend", false);
        this.totalAsyncBufSize = agentConf.getInt("proxy.total.async.proxy.size", 204800);
        this.aliveConnectionNum = agentConf.getInt("proxy.alive.connection.num", 10);
        this.isCompress = agentConf.getBoolean("proxy.is.compress", true);
        this.maxSenderPerGroup = agentConf.getInt("proxy.max.sender.per.group", 3);
        this.msgType = agentConf.getInt("proxy.msgType", 7);
        this.maxSenderTimeout = agentConf.getInt("proxy.sender.maxTimeout", 60);
        this.maxSenderRetry = agentConf.getInt("proxy.sender.maxRetry", 5);
        this.retrySleepTime = agentConf.getLong("proxy.retry.sleep", 500L);
        this.ioThreadNum = agentConf.getInt("client.iothread.num", CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
        this.enableBusyWait = agentConf.getBoolean("client.enable.busy.wait", false);
        this.batchFlushInterval = agentConf.getInt("proxy.batch.flush.interval", 1);
        this.authSecretId = agentConf.get("agent.manager.auth.secretId");
        this.authSecretKey = agentConf.get("agent.manager.auth.secretKey");
        this.sourcePath = sourcePath;
        this.inlongGroupId = inlongGroupId;
        this.dimensions = new HashMap<String, String>();
        this.dimensions.put("pluginId", this.getClass().getSimpleName());
        String metricName = String.join((CharSequence)"-", this.getClass().getSimpleName(), String.valueOf(METRIC_INDEX.incrementAndGet()));
        this.metricItemSet = new AgentMetricItemSet(metricName);
        MetricRegister.register((MetricItemSet)this.metricItemSet);
        this.resendQueue = new LinkedBlockingQueue();
    }

    public void Start() throws Exception {
        this.createMessageSender();
        EXECUTOR_SERVICE.execute(this.flushResendQueue());
        this.started = true;
    }

    public void Stop() {
        LOGGER.info("stop send manager");
        this.shutdown = true;
        if (!this.started) {
            return;
        }
        while (this.resendRunning) {
            AgentUtils.silenceSleepInMs((long)1L);
        }
        this.closeMessageSender();
        LOGGER.info("stop send manager end");
    }

    private void closeMessageSender() {
        Long start = AgentUtils.getCurrentTime();
        this.senders.forEach(sender -> {
            if (sender != null) {
                sender.close();
            }
        });
        LOGGER.info("close sender elapse {} ms instance {}", (Object)(AgentUtils.getCurrentTime() - start), (Object)this.profile.getInstanceId());
    }

    private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) {
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("pluginId", this.getClass().getSimpleName());
        dimensions.putAll(otherDimensions);
        return (AgentMetricItem)this.metricItemSet.findMetricItem(dimensions);
    }

    private AgentMetricItem getMetricItem(String groupId, String streamId) {
        HashMap<String, String> dims = new HashMap<String, String>();
        dims.put("inlongGroupId", groupId);
        dims.put("inlongStreamId", streamId);
        return this.getMetricItem(dims);
    }

    private void createMessageSender() throws Exception {
        TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(this.managerAddr, this.inlongGroupId, this.authSecretId, this.authSecretKey);
        proxyClientConfig.setMaxInFlightSizeInKb(this.totalAsyncBufSize);
        proxyClientConfig.setAliveConnections(this.aliveConnectionNum);
        proxyClientConfig.setRequestTimeoutMs(this.maxSenderTimeout * 1000L);
        proxyClientConfig.setNettyWorkerThreadNum(this.ioThreadNum);
        proxyClientConfig.setEnableEpollBusyWait(this.enableBusyWait);
        proxyClientConfig.setSdkMsgType(MsgType.valueOf((int)this.msgType));
        proxyClientConfig.setEnableDataCompress(this.isCompress);
        this.SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + this.sourcePath, Thread.currentThread().isDaemon());
        boolean hasError = false;
        ProcessResult procResult = null;
        for (int i = 0; i < this.maxSenderPerGroup; ++i) {
            InLongTcpMsgSender sender2 = new InLongTcpMsgSender(proxyClientConfig, this.SHARED_FACTORY);
            procResult = new ProcessResult();
            if (!sender2.start(procResult)) {
                hasError = true;
                break;
            }
            this.senders.add((TcpMsgSender)sender2);
        }
        if (hasError) {
            this.senders.forEach(sender -> sender.close());
            throw new ProxySdkException("Start sender failure, " + procResult);
        }
    }

    public void sendBatch(SenderMessage message) {
        while (!this.shutdown && !this.resendQueue.isEmpty()) {
            AgentUtils.silenceSleepInMs((long)this.retrySleepTime);
        }
        if (!this.shutdown) {
            this.sendBatchWithRetryCount(message, 0);
        }
    }

    private void sendBatchWithRetryCount(SenderMessage message, int retry) {
        boolean suc = false;
        while (!suc && !this.shutdown) {
            try {
                AgentSenderCallback cb = new AgentSenderCallback(message, retry);
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_TRY_SEND, (String)message.getGroupId(), (String)message.getStreamId(), (long)message.getDataTime(), (int)message.getMsgCnt(), (long)message.getTotalSize(), (long)this.auditVersion);
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_TRY_SEND_REAL_TIME, (String)message.getGroupId(), (String)message.getStreamId(), (long)AgentUtils.getCurrentTime(), (int)message.getMsgCnt(), (long)message.getTotalSize(), (long)this.auditVersion);
                this.asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(), message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(), message.getExtraMap(), this.proxySend);
                this.getMetricItem((String)message.getGroupId(), (String)message.getStreamId()).pluginSendCount.addAndGet(message.getMsgCnt());
                suc = true;
            }
            catch (Exception exception) {
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION, (String)message.getGroupId(), (String)message.getStreamId(), (long)message.getDataTime(), (int)message.getMsgCnt(), (long)message.getTotalSize(), (long)this.auditVersion);
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME, (String)message.getGroupId(), (String)message.getStreamId(), (long)AgentUtils.getCurrentTime(), (int)message.getMsgCnt(), (long)message.getTotalSize(), (long)this.auditVersion);
                suc = false;
                if (retry > this.maxSenderRetry) {
                    if (retry % 10 == 0) {
                        LOGGER.error("max retry reached, sample log Exception caught", (Throwable)exception);
                    }
                } else {
                    LOGGER.error("Exception caught", (Throwable)exception);
                }
                ++retry;
                AgentUtils.silenceSleepInMs((long)this.retrySleepTime);
                ThreadUtils.threadThrowableHandler((Thread)Thread.currentThread(), (Throwable)exception);
            }
        }
    }

    private void asyncSendByMessageSender(MsgSendCallback cb, List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) throws Exception {
        ProcessResult procResult = new ProcessResult();
        int index = (int)Math.abs(this.senderIndex.getAndAdd(1L) % (long)this.maxSenderPerGroup);
        boolean isSuccess = isProxySend ? this.senders.get(index).asyncSendMsgWithSinkAck(new TcpEventInfo(groupId, streamId, dataTime, msgUUID, extraAttrMap, bodyList), cb, procResult) : this.senders.get(index).asyncSendMessage(new TcpEventInfo(groupId, streamId, dataTime, msgUUID, extraAttrMap, bodyList), cb, procResult);
        if (!isSuccess) {
            throw new ProxySdkException("Send message failure, " + procResult);
        }
    }

    private Runnable flushResendQueue() {
        return () -> {
            AgentThreadFactory.nameThread((String)("flushResendQueue-" + this.profile.getTaskId() + "-" + this.profile.getInstanceId()));
            LOGGER.info("start flush resend queue {}:{}", (Object)this.inlongGroupId, (Object)this.sourcePath);
            this.resendRunning = true;
            while (!this.shutdown) {
                try {
                    AgentSenderCallback callback = this.resendQueue.poll(10L, TimeUnit.MILLISECONDS);
                    if (callback == null) continue;
                    SenderMessage message = callback.message;
                    AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_RESEND, (String)message.getGroupId(), (String)message.getStreamId(), (long)message.getDataTime(), (int)message.getMsgCnt(), (long)message.getTotalSize(), (long)this.auditVersion);
                    AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_RESEND_REAL_TIME, (String)message.getGroupId(), (String)message.getStreamId(), (long)AgentUtils.getCurrentTime(), (int)message.getMsgCnt(), (long)message.getTotalSize(), (long)this.auditVersion);
                    this.sendBatchWithRetryCount(callback.message, callback.retry + 1);
                }
                catch (Exception e) {
                    LOGGER.error("error caught", (Throwable)e);
                    ThreadUtils.threadThrowableHandler((Thread)Thread.currentThread(), (Throwable)e);
                }
                finally {
                    AgentUtils.silenceSleepInMs((long)this.batchFlushInterval);
                }
            }
            LOGGER.info("stop flush resend queue {}:{}", (Object)this.inlongGroupId, (Object)this.sourcePath);
            this.resendRunning = false;
        };
    }

    private void putInResendQueue(AgentSenderCallback batchMessageCallBack) {
        try {
            this.resendQueue.put(batchMessageCallBack);
        }
        catch (Throwable throwable) {
            LOGGER.error("putInResendQueue e = {}", throwable);
        }
    }

    private class AgentSenderCallback
    implements MsgSendCallback {
        private final int retry;
        private final SenderMessage message;
        private final int msgCnt;

        AgentSenderCallback(SenderMessage message, int retry) {
            this.message = message;
            this.retry = retry;
            this.msgCnt = message.getDataList().size();
        }

        public void onMessageAck(ProcessResult result) {
            String groupId = this.message.getGroupId();
            String streamId = this.message.getStreamId();
            String taskId = this.message.getTaskId();
            String instanceId = this.message.getInstanceId();
            long dataTime = this.message.getDataTime();
            if (result.isSuccess()) {
                this.message.getOffsetAckList().forEach(ack -> ack.setHasAck(Boolean.valueOf(true)));
                ((Sender)Sender.this).getMetricItem((String)groupId, (String)streamId).pluginSendSuccessCount.addAndGet(this.msgCnt);
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, (String)groupId, (String)streamId, (long)dataTime, (int)this.message.getMsgCnt(), (long)this.message.getTotalSize(), (long)Sender.this.auditVersion);
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, (String)groupId, (String)streamId, (long)AgentUtils.getCurrentTime(), (int)this.message.getMsgCnt(), (long)this.message.getTotalSize(), (long)Sender.this.auditVersion);
                AgentStatusManager.sendPackageCount.addAndGet(this.message.getMsgCnt());
                AgentStatusManager.sendDataLen.addAndGet(this.message.getTotalSize());
            } else {
                LOGGER.error("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, error {}", new Object[]{groupId, streamId, taskId, instanceId, dataTime, this.retry, result});
                ((Sender)Sender.this).getMetricItem((String)groupId, (String)streamId).pluginSendFailCount.addAndGet(this.msgCnt);
                Sender.this.putInResendQueue(new AgentSenderCallback(this.message, this.retry));
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, (String)groupId, (String)streamId, (long)dataTime, (int)this.message.getMsgCnt(), (long)this.message.getTotalSize(), (long)Sender.this.auditVersion);
                AuditUtils.add((int)AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, (String)groupId, (String)streamId, (long)AgentUtils.getCurrentTime(), (int)this.message.getMsgCnt(), (long)this.message.getTotalSize(), (long)Sender.this.auditVersion);
            }
        }

        public void onException(Throwable e) {
            ((Sender)Sender.this).getMetricItem((String)this.message.getGroupId(), (String)this.message.getStreamId()).pluginSendFailCount.addAndGet(this.msgCnt);
            LOGGER.error("exception caught", e);
        }
    }
}

