Locust single-machine multi-core stress test, and data communication processing of master-slave nodes

1. Background

This is an interface performance test done 2 months ago, about the single-machine multi-core operation of the locust script, and the data communication between the master and slave nodes.

Let me briefly explain the background first. Before the APP is launched, the login interface needs to be performance tested. After evaluation, I gave priority to locust for script development. This time I used locust’s single-machine multi-core running capability, but it also involves data communication between master and slave nodes. There are very few effective documents available for reference, so it is more reliable to cross the river by reading the official documents yourself.

By the way, the best tutorial for learning about frameworks is actually the official documents and framework source code. Here is the link to the locust official documentation. If you need it, you can learn by yourself: https://docs.locust.io/en/ stable/what-is-locust.html

2. Code writing

In fact, a major focus of writing script code is how to process test data. Different testing requirements require different processing of test data. For example, in this request, the mobile phone number cannot be repeated. In addition, considering the long-term load pressure, the amount of data must be sufficient.

Finally, the test data still needs to be processed, so the test number range I used is not a real number range. After the test is completed, the mobile phone number in the corresponding number range can be queried to clean up the relevant business data.

1. Code Overview

It’s still the same as before, first attach all the code, and then break down its structure to explain.

import random
import time
from collections import deque

from locust import HttpUser, task, run_single_user, TaskSet, events
from locust.runners import WorkerRunner, MasterRunner

CURRENT_TIMESTAMP = str(round(time.time() * 1000))
RANDOM = str(random.randint(10000000, 99999999))
MOBILE_HEADER = {
    "skip-request-expired": "true",
    "skip-auth": "true",
    "skip-sign": "true",
    "os": "IOS",
    "device-id": "198EA6A4677649018708B400F3DF69FB",
    "nonce": RANDOM,
    "sign": "12333",
    "version": "1.2.0",
    "timestamp": CURRENT_TIMESTAMP,
    "Content-Type": "application/json"
}

last_mobile = ""
worker_mobile_deque = deque()


# 13300120000, 13300160000 New user registration number segment

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        mobile_list = []
        for i in range(13300120000, 13300160000):
            mobile_list.append(i)
        mobile_list_length = len(mobile_list)
        print("The list has been generated, total quantity:", mobile_list_length)
        worker_count = environment.runner.worker_count
        chunk_size = int(mobile_list_length / worker_count)
        print(f"The average number of mobile phone numbers assigned to each worker: {chunk_size}")

        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size
            if i + 1 < worker_count:
                end_index = start_index + chunk_size
            else:
                end_index = len(mobile_list)
            data = mobile_list[start_index:end_index]
            environment.runner.send_message("mobile_list", data, worker)


def setup_mobile_list(environment, msg, **kwargs):
    len_msg_data = len(msg.data)
    print(f"The data number segment sent by the master received by the worker: {msg.data[0]} ~ {msg.data[len_msg_data-1]}")
    global worker_mobile_deque
    worker_mobile_deque = deque(msg.data)


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('mobile_list', setup_mobile_list)


class VcodeLoginUser(TaskSet):
    # wait_time = between(5, 5)

    @task
    def vcode_login(self):
        test_mobile = worker_mobile_deque.popleft()
        print("Currently obtained mobile phone number:", test_mobile)
        # print("Current queue size:", len(worker_mobile_deque))
        global last_mobile
        last_mobile = test_mobile
        with self.client.post("/g/sendMobileVcode",
                              headers=MOBILE_HEADER,
                              json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:
            try:
                send_response_json = send_response.json()
                if send_response_json["message"] == "success":
                    params = {"mobile": str(test_mobile), "vcode": "111111"}
                    # print(test_mobile, "Login request parameters:", params)
                    with self.client.post("/g/vcodeLogin",
                                          json=params,
                                          headers=MOBILE_HEADER,
                                          catch_response=True) as login_response:
                        # print(login_response.json)
                        login_response_json = login_response.json()
                        if login_response_json["message"] != "success":
                            login_response.failure("message not equal success")
                        elif login_response_json["code"] != 0:
                            login_response.failure("code not equal 0")
                        elif login_response_json["data"]["rId"] == "":
                            login_response.failure("rid is null")
                        elif login_response_json["data"]["mobile"] != str(test_mobile):
                            login_response.failure("mobile is error, enter mobile phone number {}, return mobile phone number {}"
                                                   .format(test_mobile, login_response.json()["data"]["mobile"]))
                        # print(test_mobile, "Request result:", login_response.json())
                else:
                    send_response.failure("{} send code fail".format(test_mobile))
            except Exception as e:
                send_response.failure("send code fail {}".format(e))

    @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        print("Script ends")
        print("Current queue size:", len(worker_mobile_deque))
        print("Last mobile phone number:", last_mobile)


class LocustLogin(HttpUser):
    tasks = [VcodeLoginUser]
    host = "https://qa.test.com"


if __name__ == '__main__':
    run_single_user(LocustLogin)
2. Code disassembly-add necessary assertions

First of all, the large structure of the http request script developed based on locust remains unchanged. It is still composed of two large blocks: HttpUser and TaskSet. I will not explain them here, everyone. You will understand after reading the official documentation.

Next is the class VcodeLoginUser. You can see that the detailed actions of a single user are defined in it. Note that necessary assertions should be added here. Otherwise, it is not enough to rely on the framework’s non-200 error assertion.

For example, here I focus on several necessary fields after successful login: code, rId, mobile. These must comply with assertions.

Sure enough, during the stress test, a problem that would occur under concurrency conditions was discovered: the mobile phone number entered was a, and the mobile phone number returned by the interface was b. The greater the concurrency, the more errors there are. If I only assert code=0, then this problem is not easy to find. Although the codes returned by the interface are all successful, there are already errors in the business.

...
        with self.client.post("/g/sendMobileVcode",
                              headers=MOBILE_HEADER,
                              json={"busiType": "login", "mobile": str(test_mobile)}) as send_response:
            try:
                send_response_json = send_response.json()
                if send_response_json["message"] == "success":
                    params = {"mobile": str(test_mobile), "vcode": "111111"}
                    # print(test_mobile, "Login request parameters:", params)
                    with self.client.post("/g/vcodeLogin",
                                          json=params,
                                          headers=MOBILE_HEADER,
                                          catch_response=True) as login_response:
                        # print(login_response.json)
                        login_response_json = login_response.json()
                        if login_response_json["message"] != "success":
                            login_response.failure("message not equal success")
                        elif login_response_json["code"] != 0:
                            login_response.failure("code not equal 0")
                        elif login_response_json["data"]["rId"] == "":
                            login_response.failure("rid is null")
                        elif login_response_json["data"]["mobile"] != str(test_mobile):
                            login_response.failure("mobile is error, enter mobile phone number {}, return mobile phone number {}"
                                                   .format(test_mobile, login_response.json()["data"]["mobile"]))
                        # print(test_mobile, "Request result:", login_response.json())
                else:
                    send_response.failure("{} send code fail".format(test_mobile))
            except Exception as e:
                send_response.failure("send code fail {}".format(e))
...
3. Code disassembly-single machine multi-core processing

Next is the focus, how to use multiple CPUs on a single machine. I ignored this at first, but later I found that the load could not be increased. When I opened the resource monitor, I found that only one CPU was running at full load.

The diagram here is for reference only. My win laptop is 12c.

Because Locust is a single process and cannot fully utilize multi-core CPUs, we need to open a master process on the press, and then open multiple slave processes to form a single-machine distributed system. System is enough.

The way to turn it on is also very simple:

# Start master
locust -f locustfile.py --master

# Start slave
locust -f locustfile.py --slave

Here when we open the slave node, we can open multiple corresponding command line windows. We did not take a screenshot at that time, so we borrowed pictures from the Internet to illustrate:

After turning it on, your web interface can see the number of currently started nodes in real time.

4. Code disassembly – processing master-slave node data communication

It is very easy to turn on the master and slave nodes, but the test data needs to be processed accordingly.

Because the mobile phone number used for my test login cannot be repeated, I must ensure that the mobile phone numbers generated by codes running simultaneously on different slave nodes cannot be repeated.

I continued to browse the official documents and found that my needs can be achieved by adding event listener.

Here I added three listeners to handle different things:

  • @events.init.add_listener: Executed when locust is running and initialized

  • @events.test_start.add_listener: Executed when the test code starts running

  • @events.test_stop.add_listener: Executed when the test code ends running

@events.test_start.add_listener First, in @events.test_start.add_listener, I mainly deal with the generation of full data and evenly distribute these mobile phone numbers to the generated slaves node.

@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        mobile_list = []
        for i in range(13300120000, 13300160000):
            mobile_list.append(i)
        mobile_list_length = len(mobile_list)
        print("The list has been generated, total quantity:", mobile_list_length)
        worker_count = environment.runner.worker_count
        chunk_size = int(mobile_list_length / worker_count)
        print(f"The average number of mobile phone numbers assigned to each worker: {chunk_size}")

        for i, worker in enumerate(environment.runner.clients):
            start_index = i * chunk_size
            if i + 1 < worker_count:
                end_index = start_index + chunk_size
            else:
                end_index = len(mobile_list)
            data = mobile_list[start_index:end_index]
            environment.runner.send_message("mobile_list", data, worker)

Note that mobile_list defined in the last line here needs to define a corresponding function to receive this data.

def setup_mobile_list(environment, msg, **kwargs):
    len_msg_data = len(msg.data)
    print(f"The data number segment sent by the master received by the worker: {msg.data[0]} ~ {msg.data[len_msg_data-1]}")
    global worker_mobile_deque
    worker_mobile_deque = deque(msg.data)

In this way, different slave node scripts are assigned different mobile phone number segments, which solves the problem of duplication of test data.

In addition, I define another global variable worker_mobile_deque, so that the data received by different slave nodes can be put into the queue. When running, it is taken from the queue, using one less one until the data in the queue is run out.

@events.init.add_listenerThe next step is to register the data fields and processing functions defined above in @events.init.add_listener.

@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    if not isinstance(environment.runner, MasterRunner):
        environment.runner.register_message('mobile_list', setup_mobile_list)

@events.test_stop.add_listenerFinally, you can do some post-processing in @events.test_stop.add_listener. For the sake of simplicity, I just recorded and output the test results. Which number range is reached, so that when I run the script next time, I can start from the later data, maximizing the use of test data and not wasting it.

 @events.test_stop.add_listener
    def on_test_stop(environment, **kwargs):
        print("Script ends")
        print("Current queue size:", len(worker_mobile_deque))
        print("Last mobile phone number:", last_mobile)

3. Summary

After the script is debugged, it can run stably. The next step is the testing process, which tests the server’s single-node and multi-node load capabilities, horizontal expansion capabilities, dynamic expansion of services, and long-term high load testing. From the perspective of testing, observe the test report and the status of various service indicators. It’s just that when it comes to the development side, I haven’t been involved in much tuning and analysis work. But these are probably still common questions, and I will share them separately when I have the opportunity later.

From a usage perspective, locust is my favorite. Compared with jemter, it is much lighter, the code flexibility is also very high, and the load capacity of a single machine is also very good. This is much better than jemter. My project does not require a very high volume, so I only used 8c for a single machine. If there are friends who need very high concurrency, locust also supports multi-machine distribution to further expand concurrency capabilities.

Take action, it is better to be on the road than to wait and see all the time. In the future, you will definitely thank yourself for working hard now! If you want to learn and improve but can’t find the information and there is no one to answer your questions, please join the group in time: 786229024. There are various test and development materials and technologies in which you can communicate together.

Finally: The complete software testing video tutorial below has been compiled and uploaded. Friends who need it can get it by themselves [Guaranteed 100% Free]

Software testing interview document

We must study to find a high-paying job. The following interview questions are the latest interview materials from first-tier Internet companies such as Alibaba, Tencent, Byte, etc., and some Byte bosses have given authoritative answers. After finishing this set I believe everyone can find a satisfactory job based on the interview information.