/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.testclient;

import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageAcknowledger;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.testclient.CmdBase;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@CommandLine.Command(name="simulation-client", description={"Simulate client load by maintaining producers and consumers for topics."})
public class LoadSimulationClient
extends CmdBase {
    private static final Logger log = LoggerFactory.getLogger(LoadSimulationClient.class);
    public static final byte CHANGE_COMMAND = 0;
    public static final byte STOP_COMMAND = 1;
    public static final byte TRADE_COMMAND = 2;
    public static final byte CHANGE_GROUP_COMMAND = 3;
    public static final byte STOP_GROUP_COMMAND = 4;
    public static final byte FIND_COMMAND = 5;
    private ExecutorService executor;
    private final Map<Integer, byte[]> payloadCache = new ConcurrentHashMap<Integer, byte[]>();
    private final Map<String, TradeUnit> topicsToTradeUnits = new ConcurrentHashMap<String, TradeUnit>();
    private PulsarAdmin admin;
    private PulsarClient client;
    @CommandLine.Option(names={"--port"}, description={"Port to listen on for controller"}, required=true)
    public int port;
    @CommandLine.Option(names={"--service-url"}, description={"Pulsar Service URL"}, required=true)
    public String serviceURL;
    @CommandLine.Option(names={"-ml", "--memory-limit"}, description={"Configure the Pulsar client memory limit (eg: 32M, 64M)"}, converter={ByteUnitToLongConverter.class})
    public long memoryLimit = 0L;
    private static final MessageListener<byte[]> ackListener = MessageAcknowledger::acknowledgeAsync;

    private void handle(Socket socket) throws Exception {
        int command;
        DataInputStream inputStream = new DataInputStream(socket.getInputStream());
        while ((command = inputStream.read()) != -1) {
            this.handle((byte)command, inputStream, new DataOutputStream(socket.getOutputStream()));
        }
    }

    private void decodeProducerOptions(TradeConfiguration tradeConf, DataInputStream inputStream) throws Exception {
        tradeConf.topic = inputStream.readUTF();
        tradeConf.size = inputStream.readInt();
        tradeConf.rate = inputStream.readDouble();
    }

    private void decodeGroupOptions(TradeConfiguration tradeConf, DataInputStream inputStream) throws Exception {
        tradeConf.tenant = inputStream.readUTF();
        tradeConf.group = inputStream.readUTF();
    }

    private void handle(byte command, DataInputStream inputStream, DataOutputStream outputStream) throws Exception {
        TradeConfiguration tradeConf = new TradeConfiguration();
        tradeConf.command = command;
        switch (command) {
            case 0: {
                this.decodeProducerOptions(tradeConf, inputStream);
                if (!this.topicsToTradeUnits.containsKey(tradeConf.topic)) break;
                this.topicsToTradeUnits.get(tradeConf.topic).change(tradeConf);
                break;
            }
            case 1: {
                tradeConf.topic = inputStream.readUTF();
                if (!this.topicsToTradeUnits.containsKey(tradeConf.topic)) break;
                this.topicsToTradeUnits.get((Object)tradeConf.topic).stop.set(true);
                break;
            }
            case 2: {
                this.decodeProducerOptions(tradeConf, inputStream);
                TradeUnit tradeUnit = new TradeUnit(tradeConf, this.client, this.payloadCache);
                this.topicsToTradeUnits.put(tradeConf.topic, tradeUnit);
                this.executor.submit(() -> {
                    try {
                        String topic = tradeConf.topic;
                        String namespace = topic.substring("persistent://".length(), topic.lastIndexOf(47));
                        try {
                            this.admin.namespaces().createNamespace(namespace);
                        }
                        catch (PulsarAdminException.ConflictException conflictException) {
                            // empty catch block
                        }
                        tradeUnit.start();
                    }
                    catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                });
                break;
            }
            case 3: {
                this.decodeGroupOptions(tradeConf, inputStream);
                tradeConf.size = inputStream.readInt();
                tradeConf.rate = inputStream.readDouble();
                String groupRegex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
                for (Map.Entry<String, TradeUnit> entry : this.topicsToTradeUnits.entrySet()) {
                    String topic = entry.getKey();
                    TradeUnit unit = entry.getValue();
                    if (!topic.matches(groupRegex)) continue;
                    unit.change(tradeConf);
                }
                break;
            }
            case 4: {
                this.decodeGroupOptions(tradeConf, inputStream);
                String regex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*";
                for (Map.Entry<String, TradeUnit> entry : this.topicsToTradeUnits.entrySet()) {
                    String topic = entry.getKey();
                    TradeUnit unit = entry.getValue();
                    if (!topic.matches(regex)) continue;
                    unit.stop.set(true);
                }
                break;
            }
            case 5: {
                outputStream.writeBoolean(this.topicsToTradeUnits.containsKey(inputStream.readUTF()));
                outputStream.flush();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unrecognized command code received: " + command);
            }
        }
    }

    public LoadSimulationClient() throws PulsarClientException {
        super("simulation-client");
    }

    @Override
    public void run() throws Exception {
        this.admin = PulsarAdmin.builder().serviceHttpUrl(this.serviceURL).build();
        this.client = PulsarClient.builder().memoryLimit(this.memoryLimit, SizeUnit.BYTES).serviceUrl(this.serviceURL).connectionsPerBroker(4).ioThreads(Runtime.getRuntime().availableProcessors()).statsInterval(0L, TimeUnit.SECONDS).build();
        this.executor = Executors.newCachedThreadPool((ThreadFactory)new DefaultThreadFactory("test-client"));
        PerfClientUtils.printJVMInformation(log);
        this.start();
    }

    public void start() throws Exception {
        ServerSocket serverSocket = new ServerSocket(this.port);
        while (true) {
            log.info("Listening for controller command...");
            Socket socket = serverSocket.accept();
            log.info("Connected to {}", (Object)socket.getInetAddress().getHostName());
            this.executor.submit(() -> {
                try {
                    this.handle(socket);
                }
                catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            });
        }
    }

    private static class TradeConfiguration {
        public byte command = (byte)-1;
        public String topic;
        public double rate = 100.0;
        public int size = 1024;
        public String tenant;
        public String group;
    }

    private static class TradeUnit {
        Future<Consumer<byte[]>> consumerFuture;
        final AtomicBoolean stop;
        final RateLimiter rateLimiter;
        final AtomicReference<byte[]> payload;
        final PulsarClient client;
        final String topic;
        final Map<Integer, byte[]> payloadCache;

        public TradeUnit(TradeConfiguration tradeConf, PulsarClient client, Map<Integer, byte[]> payloadCache) throws Exception {
            this.consumerFuture = client.newConsumer().topic(new String[]{tradeConf.topic}).subscriptionName("Subscriber-" + tradeConf.topic).messageListener(ackListener).subscribeAsync();
            this.payload = new AtomicReference();
            this.payloadCache = payloadCache;
            this.client = client;
            this.topic = tradeConf.topic;
            this.payload.set(payloadCache.computeIfAbsent(tradeConf.size, byte[]::new));
            this.rateLimiter = RateLimiter.create((double)tradeConf.rate);
            this.stop = new AtomicBoolean(false);
        }

        public void change(TradeConfiguration tradeConf) {
            this.rateLimiter.setRate(tradeConf.rate);
            this.payload.set(this.payloadCache.computeIfAbsent(tradeConf.size, byte[]::new));
        }

        private Producer<byte[]> getNewProducer() throws Exception {
            while (true) {
                try {
                    return this.client.newProducer().topic(this.topic).sendTimeout(0, TimeUnit.SECONDS).create();
                }
                catch (Exception e) {
                    Thread.sleep(10000L);
                    continue;
                }
                break;
            }
        }

        public void start() throws Exception {
            Producer<byte[]> producer = this.getNewProducer();
            Consumer<byte[]> consumer = this.consumerFuture.get();
            while (!this.stop.get()) {
                MutableBoolean wellnessFlag = new MutableBoolean();
                Function<Throwable, MessageId> exceptionHandler = e -> {
                    wellnessFlag.value = false;
                    return null;
                };
                while (!this.stop.get() && wellnessFlag.value) {
                    producer.sendAsync((Object)this.payload.get()).exceptionally(exceptionHandler);
                    this.rateLimiter.acquire();
                }
                producer.closeAsync();
                if (!this.stop.get()) {
                    producer = this.getNewProducer();
                    continue;
                }
                consumer.closeAsync();
            }
        }

        private class MutableBoolean {
            public volatile boolean value = true;

            private MutableBoolean() {
            }
        }
    }
}

