/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.sdk.dataproxy.example;

import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

public class ExampleUtils {
    private static final SecureRandom cntRandom = new SecureRandom(Long.toString(System.nanoTime()).getBytes());

    public static void sendTcpMessages(TcpMsgSender msgSender, boolean isSync, boolean isMultiItem, String groupId, String streamId, int reqCnt, int baseBodyLen, int msgItemCnt, ProcessResult procResult) {
        boolean sucCnt = false;
        int curCount = 0;
        byte[] itemBody = ExampleUtils.buildBoydData(baseBodyLen);
        ArrayList<byte[]> multiBodys = new ArrayList<byte[]>();
        for (int i = 0; i < msgItemCnt; ++i) {
            multiBodys.add(itemBody);
        }
        HashMap<String, String> localAttrs = new HashMap<String, String>();
        if (isSync) {
            if (isMultiItem) {
                while (curCount++ < reqCnt) {
                    TcpEventInfo eventInfo;
                    try {
                        if (curCount > 1) {
                            localAttrs.clear();
                            localAttrs.put("index", String.valueOf(curCount));
                        }
                        Map<String, String> filteredAttrs = ProxyUtils.getValidAttrs(localAttrs);
                        eventInfo = new TcpEventInfo(groupId, streamId, System.currentTimeMillis(), filteredAttrs, multiBodys);
                    }
                    catch (Throwable ex) {
                        System.out.println("Build tcp event failure, ex=" + ex);
                        continue;
                    }
                    if (!msgSender.sendMessage(eventInfo, procResult)) {
                        System.out.println("Sync request index=" + curCount + ", process result=" + procResult);
                        continue;
                    }
                    ++curCount;
                }
            } else {
                while (curCount++ < reqCnt) {
                    TcpEventInfo eventInfo;
                    try {
                        if (curCount > 1) {
                            localAttrs.clear();
                            localAttrs.put("index", String.valueOf(curCount));
                            localAttrs.put("multi", String.valueOf(false));
                        }
                        eventInfo = new TcpEventInfo(groupId, streamId, System.currentTimeMillis(), localAttrs, itemBody);
                    }
                    catch (Throwable ex) {
                        System.out.println("Build tcp event failure, ex=" + ex);
                        continue;
                    }
                    if (!msgSender.sendMessage(eventInfo, procResult)) {
                        System.out.println("Sync request index=" + curCount + ", process result=" + procResult);
                        continue;
                    }
                    ++curCount;
                }
            }
        } else if (isMultiItem) {
            while (curCount++ < reqCnt) {
                TcpEventInfo eventInfo;
                try {
                    if (curCount > 1) {
                        localAttrs.clear();
                        localAttrs.put("index", String.valueOf(curCount));
                        localAttrs.put("multi", String.valueOf(true));
                    }
                    eventInfo = new TcpEventInfo(groupId, streamId, System.currentTimeMillis(), localAttrs, multiBodys);
                }
                catch (Throwable ex) {
                    System.out.println("Build multiple tcp event failure, ex=" + ex);
                    continue;
                }
                if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) {
                    System.out.println("Async request index=" + curCount + ", post result=" + procResult);
                    continue;
                }
                ++curCount;
            }
        } else {
            while (curCount++ < reqCnt) {
                TcpEventInfo eventInfo;
                try {
                    eventInfo = new TcpEventInfo(groupId, streamId, System.currentTimeMillis(), null, itemBody);
                }
                catch (Throwable ex) {
                    System.out.println("Build tcp event failure, ex=" + ex);
                    continue;
                }
                if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) {
                    System.out.println("Async request index=" + curCount + ", post result=" + procResult);
                    continue;
                }
                ++curCount;
            }
        }
    }

    public static void sendHttpMessages(HttpMsgSender msgSender, boolean isSync, boolean isMultiItem, String groupId, String streamId, int reqCnt, int baseBodyLen, int msgItemCnt, ProcessResult procResult) {
        boolean sucCnt = false;
        int curCount = 0;
        String itemBody = ExampleUtils.getRandomString(baseBodyLen);
        ArrayList<String> multiBodys = new ArrayList<String>();
        for (int i = 0; i < msgItemCnt; ++i) {
            multiBodys.add(itemBody);
        }
        if (isSync) {
            if (isMultiItem) {
                while (curCount++ < reqCnt) {
                    HttpEventInfo eventInfo;
                    try {
                        eventInfo = new HttpEventInfo(groupId, streamId, System.currentTimeMillis(), multiBodys);
                    }
                    catch (Throwable ex) {
                        System.out.println("Build multiple http event failure, ex=" + ex);
                        continue;
                    }
                    if (!msgSender.syncSendMessage(eventInfo, procResult)) {
                        System.out.println("Sync request index=" + curCount + ", process result=" + procResult);
                        continue;
                    }
                    ++curCount;
                }
            } else {
                while (curCount++ < reqCnt) {
                    HttpEventInfo eventInfo;
                    try {
                        eventInfo = new HttpEventInfo(groupId, streamId, System.currentTimeMillis(), itemBody);
                    }
                    catch (Throwable ex) {
                        System.out.println("Build single http event failure, ex=" + ex);
                        continue;
                    }
                    if (!msgSender.syncSendMessage(eventInfo, procResult)) {
                        System.out.println("Sync request index=" + curCount + ", process result=" + procResult);
                        continue;
                    }
                    ++curCount;
                }
            }
        } else if (isMultiItem) {
            while (curCount++ < reqCnt) {
                HttpEventInfo eventInfo;
                try {
                    eventInfo = new HttpEventInfo(groupId, streamId, System.currentTimeMillis(), multiBodys);
                }
                catch (Throwable ex) {
                    System.out.println("Build multiple http event failure, ex=" + ex);
                    continue;
                }
                if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) {
                    System.out.println("Async request index=" + curCount + ", post result=" + procResult);
                    continue;
                }
                ++curCount;
            }
        } else {
            while (curCount++ < reqCnt) {
                HttpEventInfo eventInfo;
                try {
                    eventInfo = new HttpEventInfo(groupId, streamId, System.currentTimeMillis(), itemBody);
                }
                catch (Throwable ex) {
                    System.out.println("Build single http event failure, ex=" + ex);
                    continue;
                }
                if (!msgSender.asyncSendMessage(eventInfo, new MyMsgSendBack(curCount), procResult)) {
                    System.out.println("Async request: index=" + curCount + ", post result=" + procResult);
                    continue;
                }
                ++curCount;
            }
        }
    }

    private static String getRandomString(int length) {
        String strBase = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < length; ++i) {
            int number = cntRandom.nextInt(strBase.length());
            sb.append(strBase.charAt(number));
        }
        return sb.toString();
    }

    private static byte[] buildBoydData(int bodySize) {
        byte[] itemBaseData = StringUtils.getBytesUtf8((String)"inglong tcp test data!");
        ByteBuffer dataBuffer = ByteBuffer.allocate(bodySize);
        while (dataBuffer.hasRemaining()) {
            int offset = dataBuffer.arrayOffset();
            dataBuffer.put(itemBaseData, offset, Math.min(dataBuffer.remaining(), itemBaseData.length));
        }
        dataBuffer.flip();
        return dataBuffer.array();
    }

    private static class MyMsgSendBack
    implements MsgSendCallback {
        private final int msgId;

        public MyMsgSendBack(int msgId) {
            this.msgId = msgId;
        }

        @Override
        public void onMessageAck(ProcessResult result) {
        }

        @Override
        public void onException(Throwable ex) {
            System.out.println("msgId=" + this.msgId + ", send exception=" + ex);
        }
    }
}

