Use concurrent to execute multi-process tasks in Python

With the development of computer technology, such as GPU and supercomputing platforms are becoming more and more developed. The essence of these technologies does not bring about algorithmic innovation. The reason why the speed and scale of computing can be improved is largely due to the distributed advantages of formula and parallel computing. Here we introduce a simple multi-process code implementation that comes with python, using the concurrent tool, and we will also introduce how to better configure multi-process resources.

1.concurrent usage example

Concurrent is a multi-process implementation warehouse that comes with python, no additional installation is required. Here we first introduce a sample code without multi-processing:

# sleep.py
 
import time
 
def sleep(seconds):
    time. sleep(seconds)
 
if __name__ == '__main__':
    times = [1] * 10
    time0 = time. time()
    for t in times:
        sleep(t)
    time1 = time. time()
    print ('The time cost is: {}s'. format(time1 - time0))

The function of this code is actually to sleep for 10s, which is relatively easy. We can take a look at the execution result of this code:

[dechin@dechin-manjaro concurrent]$ python3 sleep.py
The time cost is: 10.014754295349121s

When we count the time, we find that it will be longer than 10s. This part of the time not only includes the execution time of the python program, but also has a certain relationship with the time statistics method. Generally, we can ignore this part of the gap time.

We assume that the function of the sleep function in the above program is not to sleep for 1s, but to perform a calculation task that takes 1s, and we have a lot of cpus, hoping to speed up the calculation process. At this time, we need to use Multi-process technology, the following is the code after modification to multi-process:

import concurrent. futures
import time
 
def sleep(seconds):
    time. sleep(seconds)
 
if __name__ == '__main__':
    times = [1] * 10
    time0 = time. time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor. map(sleep, times)
    time1 = time. time()
    print ('The time cost is: {}s'. format(time1 - time0))

The whole modification method is also very easy, that is, modify the for loop in the original code to a concurrent execution statement, let’s take a look at the execution result:

[dechin@dechin-manjaro concurrent]$ python3 concurrent_sleep.py
The time cost is: 2.0304219722747803s

From the results, we can see that the running time has been reduced from 10s to 2s. This shows that our multi-process task has brought a very significant optimization effect. As for why it is 2s instead of 3s or 1s after optimization, this issue will be introduced in the next chapter.

2. Optimal configuration for multiple processes

Using a multi-process solution, how much acceleration can be achieved depends largely on the hardware. Theoretically speaking, if there are n cpu cores, we can achieve n times acceleration. But in most cases, it will be limited by the algorithm or the task itself, and there is a certain gap between the n-fold acceleration. First let’s use ipython to see how many CPUs there are on the local computer:

[dechin@dechin-manjaro concurrent]$ ipython
Python 3.8.5 (default, Sep 4 2020, 07:30:14)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.19.0 -- An enhanced Interactive Python. Type '?' for help.
 
In [1]: import psutil
 
In [2]: psutil.cpu_count(logical=False)
Out[2]: 4
 
In [3]: psutil.cpu_count(logical=True)
Out[3]: 8

Here we use the psutil library instead of the commonly used os or multiprocessing, because it can better distinguish logical cores from physical cores. There are 4 physical cores on our local computer, and each physical core actually corresponds to 2 logical cores, so there are 8 logical cores in total. That is to say, theoretically, we can speed up the execution time of the algorithm by up to 8 times. Let’s test and verify it by configuring and modifying some parameters:

import concurrent. futures
import time
import sys
 
def sleep(seconds):
    time. sleep(seconds)
 
if __name__ == '__main__':
    if sys.argv[1] == '-t':
        times = [1] * int(sys.argv[2]) # Get the time input parameters of the command line
    time0 = time. time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor. map(sleep, times)
    time1 = time. time()
    print ('The time cost is: {}s'. format(time1 - time0))

In this example, for the convenience of adjustment, we set the total sleep time as the input parameter of the command line, and use the sys.argv function to obtain it. Note that the obtained parameters are in string format. The execution method and execution result of this string of code are as follows:

[dechin@dechin-manjaro concurrent]$ python3 concurrent_sleep.py -t 16

The time cost is: 2.0304934978485107s

In the above execution results, we found that the dormant task, which originally required 16s, was accelerated to 2s in the multi-process scenario, just in line with our expectation for the acceleration of the logical core. But as mentioned earlier, whether the multiple of logical cores can be accelerated is also related to the algorithm of the task itself. For example, in this use case, if the result of the algorithm allocation is that 17 sub-algorithms are required for implementation, then after two sleep tasks are performed on each logical core, another logical core needs to perform a sleep task again, and At this time, other logical cores need to wait for the end of the task of the logical core executing the task. The specific execution results are as follows:

[dechin@dechin-manjaro concurrent]$ python3 concurrent_sleep.py -t 17

The time cost is: 3.0313029289245605s

This result also verified our previous expectation, because the execution of the 16s task takes 2s, and after the execution of the 16s task, it needs to wait for the remaining 1s long task to be executed, which takes a total of 3s. Here, if we do not configure max_worker, we will allocate multi-process tasks according to the highest number of logical cores in the system. However, in actual scenarios, we need to consider the limitations of various factors, such as the balanced configuration of memory and number of processes (in In large-memory tasks, if the process is fully open, it may cause insufficient memory). Only when all system constraints are met can the best performance of the hardware be brought into play. In the following code, we will give a plan on how to configure the number of cores for executing tasks:

import concurrent. futures
import time
import sys
 
def sleep(seconds):
    time. sleep(seconds)
 
if __name__ == '__main__':
    if sys.argv[1] == '-t':
        times = [1] * int(sys. argv[2])
    time0 = time. time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        executor. map(sleep, times)
    time1 = time. time()

    print ('The time cost is: {}s'. format(time1 - time0))

The configuration method is also very easy, which is to introduce max_worker into the input parameter of the ProcessPoolExecutor function. Here we first set the maximum number of cores used to 4, and then revisit the execution results of the above use case:

[dechin@dechin-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The time cost is: 4.032958030700684s
[dechin@dechin-manjaro concurrent]$ python3 concurrent_sleep.py -t 17

The time cost is: 5.032677173614502s

For the 16s task, because 4 cores are parallelized, the task is completed within 4s. The 17s task also needs to wait for 1s more, and the total time is 5s.

3. Get return value

If the tasks can be executed independently of each other and do not need to communicate with each other, it is naturally the most ideal situation. But in more cases, we want to collect the return value of each process, and communicate between each process through this return value. In the case of concurrent, the return value of the map function is directly a list composed of the return values of all processes, which is more convenient for our task execution.

import concurrent. futures
import time
import sys
 
def sleep(seconds):
    time. sleep(seconds)
    return seconds
'''
Encountered problems in the study and no one answered? The editor created a Python learning exchange group: 711312441
'''
if __name__ == '__main__':
    if sys.argv[1] == '-t':
        times = [1] * int(sys. argv[2])
    time0 = time. time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        results = executor. map(sleep, times)
    print ('The total sleep cpu time is: {}s'. format(sum(results)))
    time1 = time. time()

    print ('The time cost is: {}s'. format(time1 - time0))

In this use case, we store the result of the map function in the results parameter, and finally sum the results. In this simple example, the returned result is actually the total input sleep time. The execution result is as follows:

[dechin@dechin-manjaro concurrent]$ python3 concurrent_sleep.py -t 16
The total sleep cpu time is: 16s

The time cost is: 4.034112930297852s

As you can see, all return values are successfully obtained.

Summary

Multi-process technology is an optimization technology independent of the algorithm task itself. Through the concurrent library in python, we can easily implement multi-process tasks to optimize existing algorithms. Here we also give some reference schemes for multi-process configuration information, which can be used in the fields related to GPU and supercomputing.