python built-in module multiprocessing, process

1. Introduction

A process is the basic unit of resource allocation by the operating system, that is to say, every time a process is started, the operating system will allocate certain operating resources (memory resources) to it to ensure the operation of the process; each process runs independently without interfering with each other
Note: Process locks and thread locks are similar, this article does not explain too much about process locks, please refer to: python built-in module threading

2. Basic use
import multiprocessing
import time
import os

start_time = time. time()

def process1(text):
    print('process1:', os. getpid())
    time. sleep(1)
    print(text)

def process2(text):
    print('process2:', os. getpid())
    time. sleep(2)
    print(text)



if __name__ == '__main__':
    t1 = multiprocessing.Process(target=process1, args=('process1_sleep',))
    t2 = multiprocessing.Process(target=process2, args=('process2_sleep',))

    t1. start()
    t2. start()

    t1.join()#Wait for the process to end
    t2.join()#Wait for the process to end

    end_time = time. time()

    print('The main process ends', end_time - start_time)
3. Method
  1. multiprocessing.Process(target,name,args,kwargs) creates a process, target: the name of the target task to be executed, name: the name of the process, args: pass parameters to the execution task as a tuple, kwargs: pass parameters to the execution task as a dictionary
  • process.start() to run the process
  • process.join(timeout) blocks the operation, if you encounter a delayed task, use join, the main process will wait until the delayed task ends before executing the main process
  • process.terminate(): Regardless of whether the task is completed, the child process is terminated immediately; similar to the daemon thread in the thread
  • process.is_alive() Determines whether the process is still alive
  • process.name: process name, if no name is set, the system will automatically assign a name, you can use it to set the process name
  • process.daemon: Daemon process, the main process ends, even if there are sub-processes running, the program will end, it cannot be used with join, join will block the main process
  1. multiprocessing.Lock() Synchronization lock: at most one can hold the lock. The locked process will not hand over the execution right when it is running. Only when the process is unlocked, the execution right will be handed over to others through system scheduling. process
  • lock.locked(): Determine whether the lock object is locked
  • lock.acquire(timeout=1)) : lock the process, timeout expiration time
  • lock.release() : release the process
  1. multiprocessing.Rlock() recursive lock, RLock allows multiple acquires in the same process, Lock does not allow
  • rlock.locked(): Determine whether the lock object is locked
  • rlock.acquire(timeout=1)) : lock the process, timeout expiration time
  • rlock.release() : release the process
  1. multiprocessing.Condition() Conditional lock, conditional lock is based on the recursive lock to add the function of suspending the running of the process, you can use wait() and notify() to control the number of process execution
  • clock.acquire(timeout=1)) : lock the process, timeout expiration time
  • clock.release() : Release the process
  • lockObject.wait(timeout=None) : Set the current process to the “waiting” state. Only when the process receives the “notification” or the timeout expires will the process continue to run. The process in the “waiting” state will allow the system to The strategy switches to run in another process by itself
  • lockObject.wait_for(predicate, timeout=None) : Set the current process to the “wait” state. Only when the predicate of the process returns a True or the timeout period expires will it continue to run. The process in the “wait” state will allow The system switches to run in other processes by itself according to the policy. Note: The predicate parameter should pass in a callable object, and the return result is bool type
  • lockObject.notify(n=1) : Notify a process whose current status is “waiting” to continue running, or multiple notifications can be passed through the parameter n
  • lockObject.notify_all(): Notify all processes whose current state is “waiting” to continue running
  1. multiprocessing.Event() Event lock, event lock is based on conditional lock. The difference between it and conditional lock is that it can only release all of them at a time, and cannot release any number of child processes to continue running
  • elock.clear() Set the event lock to the red light state, that is, all processes are suspended
  • elock.is_set() is used to judge the current event lock status, the red light is False, and the green light is True
  • elock.set() Set the event lock to the green light state, that is, all processes resume running
  • elock.wait(timeout=None) Set the current process to the “waiting” state. Only when the process receives the “green light notification” or the timeout period expires will the process continue to run. The process in the “waiting” state will allow the system to The strategy switches to run in another process by itself
  1. multiprocessing.Semaphore() semaphore lock, release a specific process in the “locked” state in batches
  • slock.acquire(blocking=True, timeout=1) lock, built-in counter -1, block until it is 0
  • slock.release() unlocks, built-in counter + 1, and changes acquire() of a process from blocking to non-blocking
  1. multiprocessing.BoundedSemaphor(value) The semaphore lock will check the value of the internal counter and ensure that it will not be greater than the initial value. If it exceeds, a ValueError will be raised
  2. multiprocessing.Queue(size): process queue, the number of size queues
  • queue.qsize() returns the size of the queue
  • queue.empty() returns True if the queue is empty, otherwise False
  • queue.full() returns True if the queue is full, otherwise False
  • queue.get(block,timeout) Get the queue, block: take the value from the queue, if the value cannot be obtained, the program will not end, timeout: when the value of the block is true, how many seconds timeout is used to wait
  • queue.get_nowait() is equivalent to queue.get(block=False)
  • queue.put(value,block,timeout) write into the queue, value: the value written into the queue, block: if the queue is full, it will wait if you put a value in the queue again, timeout: when the value of the block is true When, timeout is used to wait for how many seconds
  • queue.put_nowait(value) is equivalent to queue.put(value, block=False)
  • queue.close() closes the queue
    '''
    Because the processes are run independently, global variables will not be shared. If you need to share, you need to use a queue
    The following example will find that the global num will not be shared between processes, but the put queue data in task1 can be shared
    '''
    
    import multiprocessing
    
    num = 1
    
    def task1(q):
        global num
        num = 2
        q. put(1)
        q. put(2)
    
    def task2(q):
        print(f'task2: num={num}')
        print('queue:', q.get())
        
    
    
    if __name__ == '__main__':
        process_q = multiprocessing. Queue()
    
        process1 = multiprocessing. Process(target=task1, args=(process_q,))
        process2 = multiprocessing. Process(target=task2, args=(process_q,))
    
        process1. start()
        process2. start()
    
        process1. join()
        process2. join()
    
        print('queue:', process_q.get())
        print(f'global variable: num={num}')
    
  1. multiprocessing.SimpleQueue() Simplified queue, without the function of tracking tasks, only has empty, get, put 3 methods
  • queue.empty() returns True if the queue is empty, otherwise False
  • queue.get(block,timeout) Get the queue, block: take the value from the queue, if the value cannot be obtained, the program will not end, timeout: when the value of the block is true, how many seconds timeout is used to wait
  • queue.put(value,block,timeout) write into the queue, value: the value written into the queue, block: if the queue is full, it will wait if you put a value in the queue again, timeout: when the value of the block is true When, timeout is used to wait for how many seconds
  1. multiprocessing.JoinableQueue(size): Blockable process queue, number of size queues
  • queue.qsize() returns the size of the queue
  • queue.empty() returns True if the queue is empty, otherwise False
  • queue.full() returns True if the queue is full, otherwise False
  • queue.get(block,timeout) Get the queue, block: take the value from the queue, if the value cannot be obtained, the program will not end, timeout: when the value of the block is true, how many seconds timeout is used to wait
  • queue.get_nowait() is equivalent to queue.get(block=False)
  • queue.put(value,block,timeout) write into the queue, value: the value written into the queue, block: if the queue is full, it will wait if you put a value in the queue again, timeout: when the value of the block is true When, timeout is used to wait for how many seconds
  • queue.put_nowait(value) is equivalent to queue.put(value, block=False)
  • queue.join() blocks the queue, the process blocks before all the queue elements are processed
  • queue.task_done() Unblock the join, complete a task, counting mechanism -1
  • queue.join_thread() connects the background thread of the queue, this method is used to wait for all queue items to be consumed after calling the q.close() method
  • queue.cancel_join_thread() prevents the join_thread() method from blocking, and does not automatically connect to the background thread when the process exits
  • queue.task_done() Send task_done() to the queue, so that the queue does not join
  • queue.close() closes the queue, indicating that the current process will no longer put objects into the queue
    import multiprocessing
    import time
    import random
    
    
    def task1(q):
        for i in range(10):
            q. put(i)
            time. sleep(random. randint(1, 3))
            q. join()
            print('data processing completed')
    
    
    def task2(q):
        while True:
            print('task2 queue:', q.get())
            q.task_done()
    
    
    if __name__ == '__main__':
        process_q = multiprocessing. JoinableQueue()
    
        process1 = multiprocessing.Process(target=task1, args=(process_q,))
        process2 = multiprocessing.Process(target=task2, args=(process_q,))
    
        process2.daemon = True
    
        process1. start()
        process2. start()
    
        process1. join()
    
        print('The main process ends')
    
  1. multiprocessing.Pipe(): Process channel, the usage is basically the same as Queue, return a pair of Connection objects (conn1, conn2), conn1 can only be used to receive messages, conn2 can only be used to send messages
    from multiprocessing import Process, Pipe
    import time
    
    
    def task(i, pip):
        while True:
            a = pip.recv()
            print(f"Process {i} received a {a}")
            time. sleep(0.5)
            pip. send(a + 1)
    
    
    if __name__ == '__main__':
        pip_start, pip_end = Pipe()
        Process(target=task, args=(1, pip_start)).start()
        Process(target=task, args=(2, pip_end)).start()
        pip_start. send(1)
    
  2. process.current_process() returns the current process
  3. process.active_children() returns the list of currently alive processes
  4. process.parent_process(): returns the parent process
  5. process.cpu_count(): returns the number of cpus
3. Process pool

The communication between the multi-processes of the process pool multiprocessing.Pool() should use multiprocessing.Manager().Queue()

  1. multiprocessing.Pool()
    from multiprocessing import Process,Pool
    import time
    
    def task(task):
        time. sleep(1)
        print(task * task)
        
    
    
    if __name__ == '__main__':
        task_list = [1, 2, 3, 4, 5, 6]
        print('order:')
        start_time = time. time()
        for i in task_list:
            process = Process(target=task, args=(i,))
            process. start()
            process. join()
        end_time = time. time()
        print('sequential execution time', end_time - start_time)
    
        print('multi-process process pool:')
        start_time = time. time()
        pool = Pool(6)
        pool. map(task, task_list)
        pool. close()
        pool. join()
        end_time = time. time()
        print('multi-process execution time', end_time - start_time)
    
  • pool.map(fun,iterable) The map method in the Pool class is basically the same as the built-in map function usage behavior, it will block the process until the result is returned
  • pool.map_async(fun,iterable) is basically the same as the built-in map function, but it is non-blocking
  • pool.close() closes the process pool (pool) so that it no longer accepts new tasks
  • pool.terminal() ends the worker process and no longer processes unprocessed tasks
  • pool.join() The main process blocks and waits for the exit of the child process. The join method should be used after close or terminate
  • pool.is_alive() Determines whether the process is still alive
  1. concurrent.futures module, thread pool, process pool