Python multi-process and multi-thread

1 Python multithreading

1.1 GIL

In other languages, when the CPU is multi-core, it supports the simultaneous execution of multiple threads. But in Python, whether it is single-core or multi-core, only one thread can execute at the same time. The root of this is the presence of the GIL. The full name of GIL is Global Interpreter Lock (Global Interpreter Lock). The source is the consideration at the beginning of Python design and the decision made for data security. If a thread wants to execute, it must first obtain the GIL. We can regard the GIL as a “passport”, and in a Python process, there is only one GIL. Threads that cannot get a pass are not allowed to enter the CPU for execution.

At present, there are many interpreters for Python, such as:

  • CPython: CPython is a Python interpreter implemented in C language. As an official implementation, it is the most widely used Python interpreter.
  • PyPy: PyPy is an interpreter implemented in RPython. RPython is a subset of Python with static typing. This interpreter features just-in-time compilation and supports multiple backends (C, CLI, JVM). PyPy is designed to improve performance while maintaining maximum compatibility (see CPython’s implementation).
  • Jython: Jython is an implementation that compiles Python code into Java bytecode and runs on the JVM (Java Virtual Machine). Additionally, it can import and use any Java class as if it were a Python module.
  • IronPython: IronPython is a Python implementation for the .NET Framework. It can use Python and .NET framework libraries, and can also expose Python code to other languages in the .NET framework.

The GIL is only available in CPython, and there is no GIL in PyPy and Jython.

Every time the GIL lock is released, threads compete for locks and switch threads, consuming resources. This leads to a long execution time of the printing thread, and the reason for the longer time-consuming will be found.

And because of the GIL lock, a process in Python can only execute one thread at the same time (the thread that gets the GIL can execute), which is the fundamental reason why Python’s multi-threading efficiency is not high on multi-core CPUs.

1.2 Create multi-thread

Python provides two modules for multi-threaded operations, namely thread and threading. The former is a relatively low-level module for lower-level operations and general application-level development uncommonly used.

  • Method 1: Directly use threading.Thread()
import threading
 
# This function name can be freely defined
def run(n):
    print("current task:", n)
 
if __name__ == "__main__":
    t1 = threading.Thread(target=run, args=("thread 1",))
    t2 = threading.Thread(target=run, args=("thread 2",))
    t1. start()
    t2. start()
  • Method 2: Inherit threading.Thread to customize the thread class and rewrite the run method
import threading
 
class MyThread(threading. Thread):
    def __init__(self, n):
        super(MyThread, self).__init__() # The refactored run function must be written
        self.n = n
 
    def run(self):
        print("current task:", n)
 
if __name__ == "__main__":
    t1 = MyThread("thread 1")
    t2 = MyThread("thread 2")
 
    t1. start()
    t2. start()

1.3 join function

In Python, the join() method is used to wait for the thread to finish executing. It makes the main thread wait for other child threads to complete their tasks before continuing. When the join() method is called, the main thread will block until the thread that called it finishes executing.

Here is an example that demonstrates the join() method in action:

import threading
import time

def task():
    print("Thread started")
    time.sleep(2) # Simulate threads to perform time-consuming operations
    print("Thread finished")

if __name__ == "__main__":
    t = threading.Thread(target=task)
    t. start()
    print("Main thread continues executing")
    t.join() # Wait for thread t to finish executing
    print("Main thread resumed after thread t finished")

In the above example, we created a child thread t and started it in the main thread. The child thread executes the task() function, in which a time-consuming operation time.sleep(2) is simulated. The main thread continues execution immediately after starting the child thread, and prints “Main thread continues executing”.

However, we call t.join() in the next step of the main thread. This means that the main thread will wait for the child thread to finish executing before continuing. In this example, the main thread blocks until the child thread finishes executing.

When the child thread completes the task, “Thread finished” will be printed out. At this time, the main thread will continue to execute and print out “Main thread resumed after thread t finished”.

By calling t.join(), we ensure that the main thread waits for the child thread to finish before continuing, which is useful for situations that require coordination and synchronization between threads.

1.4 Thread synchronization and mutex

Data is shared between threads. When multiple threads operate on a certain shared data, thread safety issues need to be considered. In Python’s threading module, the Lock class is a tool for creating lock objects that allow synchronization between threads.

Here is an example of using the Lock class:

import threading

shared_resource = 0 # shared resource
lock = threading.Lock() # create a lock object

def increment():
    global shared_resource
    for _ in range(100000):
        lock.acquire() # acquire a lock
        shared_resource += 1
        lock.release() # release the lock

if __name__ == "__main__":
    threads = []
    for _ in range(5):
        t = threading.Thread(target=increment)
        threads. append(t)
        t. start()

    for t in threads:
        t. join()

    print("Final value of shared_resource:", shared_resource)

In the above example, we created a shared resource shared_resource with an initial value of 0. We also create a Lock object lock.

Then, we define a increment function that increments the value of the shared resource multiple times through a loop. Before each increment, we acquire the lock using lock.acquire() to ensure that the current thread has exclusive access to the shared resource. After adding, we release the lock with lock.release().

In the main program, we create 5 threads and add them to a thread list. Then, we start these threads and wait for them to finish executing.

Finally, we print out the final shared_resource value. Since locks are used to protect access to shared resources, each thread will sequentially acquire locks and increase the value of shared resources, thereby ensuring the correctness of the final result.

By using the Lock class, we can achieve safe access and modification of shared resources in a multi-threaded environment, avoiding the occurrence of race conditions.

1.4.1 Race condition

Race condition refers to the uncertainty and unpredictability of multiple concurrently executing threads or processes when accessing shared resources or performing operations. Race conditions can cause programs to produce incorrect results or behave abnormally in a multithreaded environment.

A race condition occurs when multiple threads or processes access a shared resource or perform operations simultaneously without proper synchronization and in an indeterminate order. Specifically, when multiple threads or processes read and write shared resources, their order of execution may cause mutual interference, mutual coverage, or produce inconsistent results.

Here is a simple example of a race condition:

import threading

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter + = 1

if __name__ == "__main__":
    threads = []
    for _ in range(5):
        t = threading.Thread(target=increment)
        threads. append(t)
        t. start()

    for t in threads:
        t. join()

    print("Final value of counter:", counter)

In the above example, we created a global counter counter and defined an increment function that increments the value of the counter using a loop. We created 5 threads, and each thread will call the increment function.

A race condition occurs due to multiple threads writing (incrementing) the counter at the same time. Because the order of execution between threads is indeterminate, the following situations may occur:

  • Thread A reads the value of counter as 0.
  • Thread B reads the value of counter as 0.
  • Thread A increments the value of counter by 1.
  • Thread B increments the value of counter by 1.
  • The end result is that the counter has a value of 1 instead of the expected 2.

This example demonstrates the problem of race conditions, where multiple threads concurrently modify a shared resource leading to non-deterministic end results.

https://blog.csdn.net/zong596568821xp/article/details/99678390

1.5 Reentrant lock (recursive lock)

In order to meet the needs of multiple requests for the same resource in the same thread, Python provides reentrant locks (RLock). RLock maintains a Lock and a counter variable inside, and the counter records the number of acquires, so that resources can be required multiple times. Until all acquires of a thread are released, other threads can obtain resources. The specific usage is as follows:

2 Python multi-process

2.1 Create multiple processes

To perform multi-process operation in Python, you need to use the muiltprocessing library, where the Process class is very similar to the Thread class of the threading module. So just look at the code and get familiar with multi-process.

  • Method 1: Use Process directly, the code is as follows:
from multiprocessing import Process
 
def show(name):
    print("Process name is " + name)
 
if __name__ == "__main__":
    proc = Process(target=show, args=('subprocess',))
    proc. start()
    proc. join()
  • Method 2: Inherit Process to customize the process class, rewrite the run method, the code is as follows:
from multiprocessing import Process
import time
 
class MyProcess(Process):
    def __init__(self, name):
        super(MyProcess, self).__init__()
        self.name = name
 
    def run(self):
        print('process name:' + str(self.name))
        time. sleep(1)
 
if __name__ == '__main__':
    for i in range(3):
        p = MyProcess(i)
        p. start()
    for i in range(3):
        p. join()

2.2 Multi-process communication

No data is shared between processes. If communication between processes is required, the Queue module or the Pipe module is used to implement it.

  • Queue

Queue is a multi-process safe queue, which can realize data transfer between multiple processes. It mainly has two functions put and get.

put() is used to insert data into the queue, put also has two optional parameters: blocked and timeout. If blocked is True (the default) and timeout is positive, the method blocks for the time specified by timeout until the queue has room left. If it times out, a Queue.Full exception will be thrown. If blocked is False, but the Queue is full, a Queue.Full exception will be thrown immediately.

get() can read and delete an element from the queue. Similarly get has two optional parameters: blocked and timeout. If blocked is True (the default value), and timeout is a positive value, then no elements are fetched within the waiting time, and a Queue.Empty exception will be thrown. If blocked is False, there are two cases. If the Queue has a value available, it returns the value immediately, otherwise, if the queue is empty, the Queue.Empty exception is thrown immediately.

The specific usage is as follows:

from multiprocessing import Process, Queue
 
def put(queue):
    queue.put('Queue Usage')
 
if __name__ == '__main__':
    queue = Queue()
    pro = Process(target=put, args=(queue,))
    pro. start()
    print(queue. get())
    pro. join()
  • pipe

The essence of Pipe is to use pipeline data transfer between processes, rather than data sharing, which is a bit like socket. pipe() returns two connection objects representing the two ends of the pipe, each with send() and recv() functions. If two processes try to read and write at the same end at the same time, this can corrupt the data in the pipe, as follows:

from multiprocessing import Process, Pipe
 
def show(conn):
    conn.send('Pipe Usage')
    conn. close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    pro = Process(target=show, args=(child_conn,))
    pro. start()
    print(parent_conn. recv())
    pro. join()

2.3 Process pool

To create multiple processes, we don’t have to create them one by one stupidly. We can use the Pool module for this. The commonly used methods of Pool are as follows:

See the sample code for specific usage:

#coding: utf-8
import multiprocessing
import time
 
def func(msg):
    print("msg:", msg)
    time. sleep(3)
    print("end")
 
if __name__ == "__main__":
    # Maintain the total number of executed processes as processes, when a process is executed, a new process will be added into it
    pool = multiprocessing. Pool(processes = 3)
    for i in range(5):
        msg = "hello %d" %(i)
        # Non-blocking, the child process does not affect the execution of the main process, it will run directly to pool.join()
        pool.apply_async(func, (msg, ))
 
        # Blocking, execute the child process first, and then execute the main process
        # pool.apply(func, (msg, ))
 
    print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
    # Before calling join, call the close function first, otherwise an error will occur.
    pool. close()
    # After executing close, no new process will be added to the pool, and the join function waits for all child processes to end
    pool. join()
    print("Sub-process(es) done.")

As above, after the process pool Pool is created, even if the actual number of processes to be created is far greater than the maximum limit of the process pool, the p.apply_async(test) code will continue to execute without stopping and waiting; The process pool submits 10 requests, which will be put into a queue;
After executing the code p1 = Pool(5), 5 processes have been created, but they have not been assigned tasks. That is to say, no matter how many tasks there are, the actual number of processes is only 5. The computer Each time up to 5 processes in parallel.
When a process task in the pool is completed, the process resource will be released, and the pool will take out a new request for the idle process to continue execution according to the first-in-first-out principle;
When all the process tasks of the Pool are completed, 5 zombie processes will be generated. If the main thread does not end, the system will not automatically recycle resources, and the join function needs to be called to recycle.
The join function is for the main process to wait for the sub-process to finish reclaiming system resources. If there is no join, the sub-process will be forcibly killed after the main program exits regardless of whether the sub-process ends;
When creating a Pool pool, if the maximum number of processes is not specified, the number of processes created by default is the number of cores in the system.

3 Choose multi-thread or multi-process?

On this issue, first look at what type of program your program belongs to. There are generally two types: CPU-intensive and I/O-intensive.

  • CPU-intensive: The program is more focused on calculations and requires frequent use of the CPU for calculations. For example, scientific computing programs, machine learning programs, etc.
  • I/O-intensive: As the name implies, the program requires frequent input and output operations. A crawler program is a typical I/O-intensive program.

If the program is CPU-intensive, it is recommended to use multi-process. Multithreading is more suitable for I/O-intensive programs.