Python: Multi-process synchronization shared global variables (locks, counters, atomic booleans)

Scenarios and methods for multi-process variable synchronization

Scenario: When using Python multi-process parallelism, variables need to be shared between processes. These shared variables can better control and grasp the execution of tasks, such as checking task progress, stopping tasks in advance, etc.
Method: To share variables in multi-threads, define variables in the main thread, use the global keyword to get the variables in each sub-thread, and then cooperate with threading.RLock ()It is enough to obtain and release the lock (acquire and release) when operating on variables, but in multiple processes, variables are placed in different sub-processes. In the data area of a process, each process has an independent address space, so variables cannot be shared using general methods. The multiprocessing module provides Array, Manager, and Value class to define shared variables, which can realize variable sharing of numbers, strings, lists, dictionaries, and instance objects between processes

Shared integer variables

Multi-process sharing of integers is a common scenario. For example, when using multi-process parallel tasks, it is necessary to record execution logs to record task progress. The example code is as follows

import multiprocessing
from multiprocessing import Pool, Lock, Value

from utils.logger_utils import logging_

LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log"))
lock = Lock()
Counter = Value('i', 0)
ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ ent_name_predict.txt"), "r", encoding="utf8").readlines()]))
TOTAL = len(ENT_LIST)

def get_one_res(data):
    global TOTAL, lock, Counter
    res = {}
    try:
        ent_name = data
        res = get_feature(ent_name, PREDICT_DATE)
        res["updatedate"] = PREDICT_DATE
        res["uid"] = get_md5(formatted_ent(ent_name))
    except Exception as e:
        LOGGER.error(data + ":error:" + e.args[0])
    finally:
        with lock:
            Counter.value + = 1
        LOGGER.info("Execution completed: (%d / %d) Process ID: %d --------------- %s", Counter.value, TOTAL, os.getpid() , data)
    return res


if __name__ == '__main__':
    pool = Pool(int(get_string("process_num")))
    res = pool.map(get_one_res, ENT_LIST)
    LOGGER.info("All execution completed, close process pool")
    pool.close()
    pool.join()

Run to view execution log

2021-11-18 15:19:15 [predict_main] INFO [42] Execution completed: (1 / 1400) Process number: 15 --------------- Shenzhen Shunya Investment Co., Ltd
2021-11-18 15:19:16 [predict_main] INFO [42] Execution completed: (2 / 1400) Process number: 25 --------------- Wuhu Xinyang Investment Partnership (Limited Partnership)
2021-11-18 15:19:18 [predict_main] INFO [42] Execution completed: (3 / 1400) Process number: 24 --------------- Baoding Longrui Real Estate Development Co., Ltd. company
2021-11-18 15:19:19 [predict_main] INFO [42] Execution completed: (4 / 1400) Process number: 11 --------------- Yunnan Junfa Kaifeng Real Estate Development Co., Ltd.

Define locks and counters globally, Value('i', 0) means that the defined shared variable is of type int and the initial value is 0. If you want to define a double variable, use Value('d' , 0), which is equivalent to the atomic variable in Java. Call the with context in the execution function and call Counter.value + = 1 after completing the task to realize the count + 1. Finally, in the process The execution method is called in the pool. After each parallel task is executed, the lock will be called to increase the counter by 1. At the same time, only one child process gets the lock to achieve process synchronization. If the lock method is not used, the counters in the log will be out of order. But in the end the total value is the same

Shared Boolean variables

In this case, a Boolean variable is recorded globally. Before each task is executed, the variable is obtained to determine whether it is consistent with expectations. If an error is reported during execution and the variable status is modified, it is mostly used in the child process to report an error and end all tasks early. The code is as follows

from multiprocessing import Pool, Lock, Manager
from ctypes import c_bool
import os

lock = Lock()
ERROR = Manager().Value(c_bool, False)


def run(fn):
    global tests_count, lock, ERROR
    if not ERROR.value:
        try:
            print('Execute task. PID: %d ' % (os.getpid()))
            1/0
        except Exception as e:
            with lock:
                ERROR.value = True
    else:
        print("The child process reported an error and the task ended")


if __name__ == "__main__":
    pool = Pool(10)
    # 80 tasks will run run() 80 times, each time passing in an element of the xrange array
    pool.map(run, list(range(80)))
    pool.close()
    pool.join()

View execution output

Execute task. PID: 27374
The child process reports an error and the task ends
The child process reports an error and the task ends
The child process reports an error and the task ends
...
Process finished with exit code 0

Initialize a shared variable to a Boolean type of False. Each process first gets the shared variable to determine whether it is False before execution. If it is, the task will be executed, otherwise the execution will be skipped directly. Initialize the Boolean variable using the Manager class to instantiate it and then call the Value method. c_bool is a data type under Ctypes. The related types are as follows

The other is to judge the shared variables in the main process and call map_async so that the main process is not blocked by the child process. If the main process judges that the global variables do not meet expectations, it will exit directly and call terminate Terminate thread pool

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
ERROR = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        1/0
    except:
        with lock:
            ERROR.value = True
    finally:
        with lock:
            COUNTER.value + = 1
            print('Execute task (%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("The main process determines...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if ERROR.value:
            print("The child process reported an error and the main process exited early")
            pool.terminate()
            break
    pool.join()

The output is as follows. Check the global variable ERROR every 1 second. If it becomes True, the main process terminates the process pool.

Main process judgment...
Execute task (1 / 80). PID: 4168
Execute task (2 / 80). PID: 4169
Execute task (3 / 80). PID: 4177
Execute task(4 / 80). PID: 4171
Execute task (5 / 80). PID: 4173
Execute task (6 / 80). PID: 4182
Execute task (7 / 80). PID: 4183
Execute task (8 / 80). PID: 4174
Execute task (9 / 80). PID: 4179
Execute task (10 / 80). PID: 4181
The child process reports an error and the main process exits early

Process finished with exit code 0

A practical example is that multiple processes find the first value in a list that meets the requirements, and if found, exit the multiple processes.

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
FOUND = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        res = fn + 1
        if res == 10:
            print("The result is: {}".format(fn))
            with lock:
                FOUND.value = True
            return fn
    except Exception as e:
        print(e)
    finally:
        with lock:
            COUNTER.value + = 1
            print('Execute task (%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    t1 = time.time()
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("The main process determines...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if FOUND.value:
            print("Result found")
            pool.terminate()
            break
    pool.join()
    t2 = time.time()
    print(t2 - t1)

Shared dictionary and array variables

Use Manager to create recently, Manager().dict(), Manager().list(), the test code is as follows

from multiprocessing.pool import Pool
from multiprocessing import Manager, Lock
import time
import datetime

LOCK = Lock()
DICT = Manager().dict()
LIST = Manager().list()


def job(ent):
    with LOCK:
        if len(LIST) < 5:
            time.sleep(1)
            LIST.append(ent)
        else:
            if len(LIST) and ent <= len(LIST) - 1:
                LIST.pop(ent)
        print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format (ent),
              "LIST:{}".format(LIST))


def job2(ent):
    if len(LIST) < 5:
        time.sleep(1)
        LIST.append(ent)
    else:
        if len(LIST) and ent <= len(LIST) - 1:
            LIST.pop(ent)
    print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format (ent),
          "LIST:{}".format(LIST))


if __name__ == '__main__':
    pool = Pool(10)
    pool.map(job2, list(range(10)) * 2)
    pool.close()
    pool.join()

The output of executing the job is as follows. The output result is consistent with the result of a single process and a single thread. In order, each process is queued outside the lock.

dt:2021-11-20 22:01:28 ent:0 LIST:[0]
dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1]
dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3]
dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4]
dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1]
dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3]
dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5]

Process finished with exit code 0

If job2 is executed and the synchronization of shared variables is not controlled, it will be completely out of control and an error pop index out of range will be reported. The reason is that other processes are also operating on the variables and get unexpected results.

dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5]
dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6]
dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8]
dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
...
    raise self._value
IndexError: pop index out of range

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Python entry skill treeHomepageOverview 339641 people are learning the system