package com.github.myzhan.locust4j.runtime;

import com.github.myzhan.locust4j.AbstractTask;
import com.github.myzhan.locust4j.Locust;
import com.github.myzhan.locust4j.message.Message;
import com.github.myzhan.locust4j.rpc.Client;
import com.github.myzhan.locust4j.stats.Stats;
import com.github.myzhan.locust4j.utils.Utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/myzhan/locust4j/runtime/Runner.class */
public class Runner {
    private static final Logger logger = LoggerFactory.getLogger(Runner.class);
    private RunnerState state;
    private List<AbstractTask> tasks;
    private Client rpcClient;
    private ExecutorService taskExecutor;
    private ExecutorService executor;
    private Stats stats;
    protected int numClients = 0;
    private int hatchRate = 0;
    private AtomicInteger threadNumber = new AtomicInteger();
    protected String nodeID = Utils.getNodeID();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/myzhan/locust4j/runtime/Runner$Heartbeat.class */
    public class Heartbeat implements Runnable {
        private static final int HEARTBEAT_INTERVAL = 1000;
        private Runner runner;

        private Heartbeat(Runner runner) {
            this.runner = runner;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(Thread.currentThread().getName() + "heartbeat");
            while (true) {
                try {
                    Thread.sleep(1000L);
                    HashMap hashMap = new HashMap(1);
                    hashMap.put("state", this.runner.state.name().toLowerCase());
                    this.runner.rpcClient.send(new Message("heartbeat", hashMap, this.runner.nodeID));
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    Runner.logger.error("Error in running the heartbeat", e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/myzhan/locust4j/runtime/Runner$Receiver.class */
    public class Receiver implements Runnable {
        private Runner runner;

        private Receiver(Runner runner) {
            this.runner = runner;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(Thread.currentThread().getName() + "receive-from-client");
            while (true) {
                try {
                    this.runner.onMessage(Runner.this.rpcClient.recv());
                } catch (Exception e) {
                    Runner.logger.error("Error while receiving a message", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/myzhan/locust4j/runtime/Runner$Sender.class */
    public class Sender implements Runnable {
        private Runner runner;

        private Sender(Runner runner) {
            this.runner = runner;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(Thread.currentThread().getName() + "send-to-client");
            while (true) {
                try {
                    Map<String, Object> take = this.runner.stats.getMessageToRunnerQueue().take();
                    if (this.runner.state != RunnerState.Ready && this.runner.state != RunnerState.Stopped) {
                        take.put("user_count", Integer.valueOf(this.runner.numClients));
                        this.runner.rpcClient.send(new Message("stats", take, this.runner.nodeID));
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Exception e2) {
                    Runner.logger.error("Error in running the sender", e2);
                }
            }
        }
    }

    public RunnerState getState() {
        return this.state;
    }

    public String getNodeID() {
        return this.nodeID;
    }

    public void setRPCClient(Client client) {
        this.rpcClient = client;
    }

    public void setStats(Stats stats) {
        this.stats = stats;
    }

    public void setTasks(List<AbstractTask> list) {
        this.tasks = list;
    }

    private void spawnWorkers(int i) {
        logger.debug("Hatching and swarming {} clients at the rate {} clients/s...", Integer.valueOf(i), Integer.valueOf(this.hatchRate));
        float f = 0.0f;
        while (this.tasks.iterator().hasNext()) {
            f += r0.next().getWeight();
        }
        for (AbstractTask abstractTask : this.tasks) {
            int size = 0.0f == f ? i / this.tasks.size() : Math.round(i * (abstractTask.getWeight() / f));
            logger.debug("Allocating {} threads to task, which name is {}", Integer.valueOf(size), abstractTask.getName());
            for (int i2 = 1; i2 <= size; i2++) {
                if (i2 % this.hatchRate == 0) {
                    try {
                        Thread.sleep(1000L);
                    } catch (Exception e) {
                        logger.error(e.getMessage());
                    }
                }
                this.numClients++;
                this.taskExecutor.submit(abstractTask);
            }
        }
    }

    protected void startHatching(int i, int i2) {
        this.stats.getClearStatsQueue().offer(true);
        Stats.getInstance().wakeMeUp();
        this.hatchRate = i2;
        this.numClients = 0;
        this.threadNumber.set(0);
        this.taskExecutor = new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: com.github.myzhan.locust4j.runtime.Runner.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setName("locust4j-worker#" + Runner.this.threadNumber.getAndIncrement());
                return thread;
            }
        });
        spawnWorkers(i);
    }

    protected void hatchComplete() {
        HashMap hashMap = new HashMap(1);
        hashMap.put("count", Integer.valueOf(this.numClients));
        try {
            this.rpcClient.send(new Message("hatch_complete", hashMap, this.nodeID));
        } catch (IOException e) {
            logger.error("Error while sending a message about the completed hatch", e);
        }
    }

    public void quit() {
        try {
            this.rpcClient.send(new Message("quit", null, this.nodeID));
            this.executor.shutdownNow();
        } catch (IOException e) {
            logger.error("Error while sending a message about quiting", e);
        }
    }

    private void shutdownThreadPool() {
        this.taskExecutor.shutdownNow();
        try {
            this.taskExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("Error while waiting for termination", e);
        }
        this.taskExecutor = null;
    }

    protected void stop() {
        shutdownThreadPool();
    }

    private boolean hatchMessageIsValid(Message message) {
        Float valueOf = Float.valueOf(message.getData().get("hatch_rate").toString());
        int intValue = Integer.valueOf(message.getData().get("num_clients").toString()).intValue();
        if (valueOf.intValue() != 0 && intValue != 0) {
            return true;
        }
        logger.debug("Invalid message (hatch_rate: {}, num_clients: {}) from master, ignored.", Integer.valueOf(valueOf.intValue()), Integer.valueOf(intValue));
        return false;
    }

    private void onHatchMessage(Message message) {
        Float valueOf = Float.valueOf(message.getData().get("hatch_rate").toString());
        int intValue = Integer.valueOf(message.getData().get("num_clients").toString()).intValue();
        try {
            this.rpcClient.send(new Message("hatching", null, this.nodeID));
        } catch (IOException e) {
            logger.error("Error while sending a message about hatching", e);
        }
        startHatching(intValue, valueOf.intValue());
        hatchComplete();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onMessage(Message message) {
        String type = message.getType();
        if (!"hatch".equals(type) && !"stop".equals(type) && !"quit".equals(type)) {
            logger.error("Got {} message from master, which is not supported, please report an issue to locust4j.", type);
            return;
        }
        if ("quit".equals(type)) {
            logger.debug("Got quit message from master, shutting down...");
            System.exit(0);
        }
        if (this.state == RunnerState.Ready) {
            if ("hatch".equals(type) && hatchMessageIsValid(message)) {
                this.state = RunnerState.Hatching;
                onHatchMessage(message);
                if (null != Locust.getInstance().getRateLimiter()) {
                    Locust.getInstance().getRateLimiter().start();
                }
                this.state = RunnerState.Running;
                return;
            }
            return;
        }
        if (this.state != RunnerState.Hatching && this.state != RunnerState.Running) {
            if (this.state == RunnerState.Stopped && "hatch".equals(type) && hatchMessageIsValid(message)) {
                this.state = RunnerState.Hatching;
                onHatchMessage(message);
                if (null != Locust.getInstance().getRateLimiter()) {
                    Locust.getInstance().getRateLimiter().start();
                }
                this.state = RunnerState.Running;
                return;
            }
            return;
        }
        if ("hatch".equals(type) && hatchMessageIsValid(message)) {
            stop();
            this.state = RunnerState.Hatching;
            onHatchMessage(message);
            this.state = RunnerState.Running;
            return;
        }
        if ("stop".equals(type)) {
            stop();
            if (null != Locust.getInstance().getRateLimiter()) {
                Locust.getInstance().getRateLimiter().stop();
            }
            this.state = RunnerState.Stopped;
            logger.debug("Recv stop message from master, all the workers are stopped");
            try {
                this.rpcClient.send(new Message("client_stopped", null, this.nodeID));
                this.rpcClient.send(new Message("client_ready", null, this.nodeID));
                this.state = RunnerState.Ready;
            } catch (IOException e) {
                logger.error("Error while switching from the state stopped to ready", e);
            }
        }
    }

    public void getReady() {
        this.executor = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: com.github.myzhan.locust4j.runtime.Runner.2
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable);
            }
        });
        this.state = RunnerState.Ready;
        this.executor.submit(new Receiver(this));
        try {
            this.rpcClient.send(new Message("client_ready", null, this.nodeID));
        } catch (IOException e) {
            logger.error("Error while sending a message that the system is ready", e);
        }
        this.executor.submit(new Sender(this));
        this.executor.submit(new Heartbeat(this));
    }
}
