/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.sdk.dirtydata;

import com.google.common.base.Preconditions;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dirtydata.DirtyMessageWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InlongSdkDirtySender {
    private static final Logger log = LoggerFactory.getLogger(InlongSdkDirtySender.class);
    private String inlongGroupId;
    private String inlongStreamId;
    private String inlongManagerAddr;
    private int inlongManagerPort;
    private String authId;
    private String authKey;
    private boolean ignoreErrors;
    private int maxRetryTimes;
    private int maxCallbackSize;
    private boolean closed;
    private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
    private TcpMsgSender sender;
    private MsgSenderSingleFactory messageSenderFactory;
    private Executor executor;

    public void init() throws Exception {
        Preconditions.checkNotNull((Object)this.inlongGroupId, (Object)"inlongGroupId cannot be null");
        Preconditions.checkNotNull((Object)this.inlongStreamId, (Object)"inlongStreamId cannot be null");
        Preconditions.checkNotNull((Object)this.inlongManagerAddr, (Object)"inlongManagerAddr cannot be null");
        Preconditions.checkNotNull((Object)this.authId, (Object)"authId cannot be null");
        Preconditions.checkNotNull((Object)this.authKey, (Object)"authKey cannot be null");
        TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(false, this.inlongManagerAddr, this.inlongManagerPort, this.inlongGroupId, this.authId, this.authKey);
        this.messageSenderFactory = new MsgSenderSingleFactory();
        this.sender = this.messageSenderFactory.genTcpSenderByClusterId(proxyClientConfig);
        this.dirtyDataQueue = new LinkedBlockingQueue(this.maxCallbackSize);
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.execute(this::doSendDirtyMessage);
        log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", (Object)this.inlongGroupId, (Object)this.inlongStreamId);
    }

    public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws InterruptedException {
        this.dirtyDataQueue.put(messageWrapper);
    }

    public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) {
        return this.dirtyDataQueue.offer(messageWrapper);
    }

    private void doSendDirtyMessage() {
        ProcessResult procResult = new ProcessResult();
        while (!this.closed) {
            try {
                DirtyMessageWrapper messageWrapper = this.dirtyDataQueue.poll();
                if (messageWrapper == null) {
                    Thread.sleep(100L);
                    continue;
                }
                messageWrapper.increaseRetry();
                if (messageWrapper.getRetryTimes() > this.maxRetryTimes) {
                    log.error("failed to send dirty message after {} times, dirty data ={}", (Object)this.maxRetryTimes, (Object)messageWrapper);
                    continue;
                }
                if (this.sender.asyncSendMessage(new TcpEventInfo(this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis(), null, messageWrapper.format().getBytes()), (MsgSendCallback)new LogCallBack(messageWrapper), procResult)) continue;
                this.dirtyDataQueue.offer(messageWrapper);
            }
            catch (Throwable t) {
                log.error("failed to send inlong dirty message", t);
                if (this.ignoreErrors) continue;
                throw new RuntimeException("writing dirty message to inlong sdk failed", t);
            }
        }
    }

    public void close() {
        this.closed = true;
        this.dirtyDataQueue.clear();
        if (this.messageSenderFactory != null) {
            this.messageSenderFactory.shutdownAll();
        }
    }

    private static boolean $default$closed() {
        return false;
    }

    InlongSdkDirtySender(String inlongGroupId, String inlongStreamId, String inlongManagerAddr, int inlongManagerPort, String authId, String authKey, boolean ignoreErrors, int maxRetryTimes, int maxCallbackSize, boolean closed, LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue, TcpMsgSender sender, MsgSenderSingleFactory messageSenderFactory, Executor executor) {
        this.inlongGroupId = inlongGroupId;
        this.inlongStreamId = inlongStreamId;
        this.inlongManagerAddr = inlongManagerAddr;
        this.inlongManagerPort = inlongManagerPort;
        this.authId = authId;
        this.authKey = authKey;
        this.ignoreErrors = ignoreErrors;
        this.maxRetryTimes = maxRetryTimes;
        this.maxCallbackSize = maxCallbackSize;
        this.closed = closed;
        this.dirtyDataQueue = dirtyDataQueue;
        this.sender = sender;
        this.messageSenderFactory = messageSenderFactory;
        this.executor = executor;
    }

    public static InlongSdkDirtySenderBuilder builder() {
        return new InlongSdkDirtySenderBuilder();
    }

    public static class InlongSdkDirtySenderBuilder {
        private String inlongGroupId;
        private String inlongStreamId;
        private String inlongManagerAddr;
        private int inlongManagerPort;
        private String authId;
        private String authKey;
        private boolean ignoreErrors;
        private int maxRetryTimes;
        private int maxCallbackSize;
        private boolean closed$set;
        private boolean closed$value;
        private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
        private TcpMsgSender sender;
        private MsgSenderSingleFactory messageSenderFactory;
        private Executor executor;

        InlongSdkDirtySenderBuilder() {
        }

        public InlongSdkDirtySenderBuilder inlongGroupId(String inlongGroupId) {
            this.inlongGroupId = inlongGroupId;
            return this;
        }

        public InlongSdkDirtySenderBuilder inlongStreamId(String inlongStreamId) {
            this.inlongStreamId = inlongStreamId;
            return this;
        }

        public InlongSdkDirtySenderBuilder inlongManagerAddr(String inlongManagerAddr) {
            this.inlongManagerAddr = inlongManagerAddr;
            return this;
        }

        public InlongSdkDirtySenderBuilder inlongManagerPort(int inlongManagerPort) {
            this.inlongManagerPort = inlongManagerPort;
            return this;
        }

        public InlongSdkDirtySenderBuilder authId(String authId) {
            this.authId = authId;
            return this;
        }

        public InlongSdkDirtySenderBuilder authKey(String authKey) {
            this.authKey = authKey;
            return this;
        }

        public InlongSdkDirtySenderBuilder ignoreErrors(boolean ignoreErrors) {
            this.ignoreErrors = ignoreErrors;
            return this;
        }

        public InlongSdkDirtySenderBuilder maxRetryTimes(int maxRetryTimes) {
            this.maxRetryTimes = maxRetryTimes;
            return this;
        }

        public InlongSdkDirtySenderBuilder maxCallbackSize(int maxCallbackSize) {
            this.maxCallbackSize = maxCallbackSize;
            return this;
        }

        public InlongSdkDirtySenderBuilder closed(boolean closed) {
            this.closed$value = closed;
            this.closed$set = true;
            return this;
        }

        public InlongSdkDirtySenderBuilder dirtyDataQueue(LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue) {
            this.dirtyDataQueue = dirtyDataQueue;
            return this;
        }

        public InlongSdkDirtySenderBuilder sender(TcpMsgSender sender) {
            this.sender = sender;
            return this;
        }

        public InlongSdkDirtySenderBuilder messageSenderFactory(MsgSenderSingleFactory messageSenderFactory) {
            this.messageSenderFactory = messageSenderFactory;
            return this;
        }

        public InlongSdkDirtySenderBuilder executor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public InlongSdkDirtySender build() {
            boolean closed$value = this.closed$value;
            if (!this.closed$set) {
                closed$value = InlongSdkDirtySender.$default$closed();
            }
            return new InlongSdkDirtySender(this.inlongGroupId, this.inlongStreamId, this.inlongManagerAddr, this.inlongManagerPort, this.authId, this.authKey, this.ignoreErrors, this.maxRetryTimes, this.maxCallbackSize, closed$value, this.dirtyDataQueue, this.sender, this.messageSenderFactory, this.executor);
        }

        public String toString() {
            return "InlongSdkDirtySender.InlongSdkDirtySenderBuilder(inlongGroupId=" + this.inlongGroupId + ", inlongStreamId=" + this.inlongStreamId + ", inlongManagerAddr=" + this.inlongManagerAddr + ", inlongManagerPort=" + this.inlongManagerPort + ", authId=" + this.authId + ", authKey=" + this.authKey + ", ignoreErrors=" + this.ignoreErrors + ", maxRetryTimes=" + this.maxRetryTimes + ", maxCallbackSize=" + this.maxCallbackSize + ", closed$value=" + this.closed$value + ", dirtyDataQueue=" + this.dirtyDataQueue + ", sender=" + this.sender + ", messageSenderFactory=" + this.messageSenderFactory + ", executor=" + this.executor + ")";
        }
    }

    class LogCallBack
    implements MsgSendCallback {
        private final DirtyMessageWrapper wrapper;

        public LogCallBack(DirtyMessageWrapper wrapper) {
            this.wrapper = wrapper;
        }

        public void onMessageAck(ProcessResult result) {
            if (!result.isSuccess()) {
                InlongSdkDirtySender.this.dirtyDataQueue.offer(this.wrapper);
            }
        }

        public void onException(Throwable e) {
            log.error("failed to send inlong dirty message", e);
            if (!InlongSdkDirtySender.this.ignoreErrors) {
                throw new RuntimeException("writing dirty message to inlong sdk failed", e);
            }
        }

        public DirtyMessageWrapper getWrapper() {
            return this.wrapper;
        }
    }
}

