/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.tubemq.example;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentCallback;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.apache.inlong.tubemq.example.MsgSendReceiveStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MAMessageProducerExample {
    private static final Logger logger = LoggerFactory.getLogger(MAMessageProducerExample.class);
    private static final MsgSendReceiveStats msgSendStats = new MsgSendReceiveStats(true);
    private static final Map<Integer, Tuple2<MessageSessionFactory, Set<MessageProducer>>> sessionFactoryProducerMap = new HashMap<Integer, Tuple2<MessageSessionFactory, Set<MessageProducer>>>();
    private static final AtomicLong totalSentCnt = new AtomicLong(0L);
    private static ExecutorService sendExecutorService;

    public static void main(String[] args) throws Throwable {
        String masterServers = args[0];
        String pubTopicAndFilterItems = args[1];
        long msgCount = Long.parseLong(args[2]);
        int pkgSize = 1024;
        if (args.length > 3) {
            pkgSize = MixedUtils.mid((int)Integer.parseInt(args[3]), (int)1, (int)0x100000);
        }
        int clientCnt = 2;
        if (args.length > 4) {
            clientCnt = MixedUtils.mid((int)Integer.parseInt(args[4]), (int)1, (int)100);
        }
        int sessionFactoryCnt = 10;
        if (args.length > 5) {
            sessionFactoryCnt = MixedUtils.mid((int)Integer.parseInt(args[5]), (int)1, (int)20);
        }
        Map topicAndFiltersMap = MixedUtils.parseTopicParam((String)pubTopicAndFilterItems);
        TubeClientConfig clientConfig = new TubeClientConfig(masterServers);
        for (int i = 0; i < sessionFactoryCnt; ++i) {
            sessionFactoryProducerMap.put(i, (Tuple2<MessageSessionFactory, Set<MessageProducer>>)new Tuple2((Object)new TubeMultiSessionFactory(clientConfig), new HashSet()));
        }
        sendExecutorService = Executors.newFixedThreadPool(clientCnt, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable);
            }
        });
        Thread statisticThread = new Thread((Runnable)msgSendStats, "Sent Statistic Thread");
        statisticThread.start();
        byte[] bodyData = MixedUtils.buildTestData((int)pkgSize);
        List buildTopicFilterTuples = MixedUtils.buildTopicFilterTupleList((Map)topicAndFiltersMap);
        for (int indexId = 0; indexId < clientCnt; ++indexId) {
            Tuple2<MessageSessionFactory, Set<MessageProducer>> sessionProducerMap = sessionFactoryProducerMap.get(indexId % sessionFactoryCnt);
            MessageProducer producer = ((MessageSessionFactory)sessionProducerMap.getF0()).createProducer();
            producer.publish(topicAndFiltersMap.keySet());
            ((Set)sessionProducerMap.getF1()).add(producer);
            sendExecutorService.submit(new Sender(indexId, producer, bodyData, buildTopicFilterTuples, msgCount));
        }
        try {
            long needSentCnt = msgCount * (long)clientCnt;
            while (totalSentCnt.get() < needSentCnt) {
                logger.info("Sending task is running, total = {}, finished = {}", (Object)needSentCnt, (Object)totalSentCnt.get());
                Thread.sleep(30000L);
            }
        }
        catch (Throwable e) {
            logger.error("Throwable: ", e);
        }
        sendExecutorService.shutdownNow();
        for (int i = 0; i < sessionFactoryCnt; ++i) {
            ((MessageSessionFactory)sessionFactoryProducerMap.get(i).getF0()).shutdown();
        }
        msgSendStats.stopStats();
        logger.info("Sending task is finished, total sent {} messages", (Object)totalSentCnt.get());
    }

    private static class DefaultSendCallback
    implements MessageSentCallback {
        private DefaultSendCallback() {
        }

        public void onMessageSent(MessageSentResult result) {
            totalSentCnt.incrementAndGet();
            if (result.isSuccess()) {
                msgSendStats.addMsgCount(result.getMessage().getTopic(), 1);
            } else {
                logger.error("Send message failed!" + result.getErrMsg());
            }
        }

        public void onException(Throwable e) {
            totalSentCnt.incrementAndGet();
            logger.error("Send message error!", e);
        }
    }

    public static class Sender
    implements Runnable {
        private final int indexId;
        private final MessageProducer producer;
        private final byte[] bodyData;
        private final long msgCount;
        private final List<Tuple2<String, String>> topicFilterTuples;

        public Sender(int indexId, MessageProducer producer, byte[] bodyData, List<Tuple2<String, String>> topicFilterTuples, long msgCount) {
            this.indexId = indexId;
            this.producer = producer;
            this.bodyData = bodyData;
            this.msgCount = msgCount;
            this.topicFilterTuples = topicFilterTuples;
        }

        @Override
        public void run() {
            long sentCount = 0L;
            int roundIndex = 0;
            int targetCnt = this.topicFilterTuples.size();
            while (this.msgCount < 0L || sentCount < this.msgCount) {
                roundIndex = (int)(sentCount++ % (long)targetCnt);
                Tuple2<String, String> target = this.topicFilterTuples.get(roundIndex);
                try {
                    this.producer.sendMessage(MixedUtils.buildMessage((String)((String)target.getF0()), (String)((String)target.getF1()), (byte[])this.bodyData, (long)sentCount), (MessageSentCallback)new DefaultSendCallback());
                }
                catch (InterruptedException | TubeClientException e) {
                    logger.error("Send message failed!", e);
                }
                MixedUtils.coolSending((long)sentCount);
            }
            try {
                this.producer.shutdown();
            }
            catch (Throwable e) {
                logger.error("producer shutdown error: ", e);
            }
            logger.info("The message sending task(" + this.indexId + ") has been completed!");
        }
    }
}

