Scenarios and methods for multi-process variable synchronization
Scenario: When using Python multi-process parallelism, variables need to be shared between processes. These shared variables can better control and grasp the execution of tasks, such as checking task progress, stopping tasks in advance, etc.
Method: To share variables in multi-threads, define variables in the main thread, use the global
keyword to get the variables in each sub-thread, and then cooperate with threading.RLock ()
It is enough to obtain and release the lock (acquire
and release
) when operating on variables, but in multiple processes, variables are placed in different sub-processes. In the data area of a process, each process has an independent address space, so variables cannot be shared using general methods. The multiprocessing module provides Array
, Manager
, and Value
class to define shared variables, which can realize variable sharing of numbers, strings, lists, dictionaries, and instance objects between processes
Shared integer variables
Multi-process sharing of integers is a common scenario. For example, when using multi-process parallel tasks, it is necessary to record execution logs to record task progress. The example code is as follows
import multiprocessing from multiprocessing import Pool, Lock, Value from utils.logger_utils import logging_ LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log")) lock = Lock() Counter = Value('i', 0) ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ ent_name_predict.txt"), "r", encoding="utf8").readlines()])) TOTAL = len(ENT_LIST) def get_one_res(data): global TOTAL, lock, Counter res = {} try: ent_name = data res = get_feature(ent_name, PREDICT_DATE) res["updatedate"] = PREDICT_DATE res["uid"] = get_md5(formatted_ent(ent_name)) except Exception as e: LOGGER.error(data + ":error:" + e.args[0]) finally: with lock: Counter.value + = 1 LOGGER.info("Execution completed: (%d / %d) Process ID: %d --------------- %s", Counter.value, TOTAL, os.getpid() , data) return res if __name__ == '__main__': pool = Pool(int(get_string("process_num"))) res = pool.map(get_one_res, ENT_LIST) LOGGER.info("All execution completed, close process pool") pool.close() pool.join()
Run to view execution log
2021-11-18 15:19:15 [predict_main] INFO [42] Execution completed: (1 / 1400) Process number: 15 --------------- Shenzhen Shunya Investment Co., Ltd 2021-11-18 15:19:16 [predict_main] INFO [42] Execution completed: (2 / 1400) Process number: 25 --------------- Wuhu Xinyang Investment Partnership (Limited Partnership) 2021-11-18 15:19:18 [predict_main] INFO [42] Execution completed: (3 / 1400) Process number: 24 --------------- Baoding Longrui Real Estate Development Co., Ltd. company 2021-11-18 15:19:19 [predict_main] INFO [42] Execution completed: (4 / 1400) Process number: 11 --------------- Yunnan Junfa Kaifeng Real Estate Development Co., Ltd.
Define locks and counters globally, Value('i', 0)
means that the defined shared variable is of type int and the initial value is 0. If you want to define a double variable, use Value('d' , 0)
, which is equivalent to the atomic variable in Java. Call the with context in the execution function and call Counter.value + = 1
after completing the task to realize the count + 1. Finally, in the process The execution method is called in the pool. After each parallel task is executed, the lock will be called to increase the counter by 1. At the same time, only one child process gets the lock to achieve process synchronization. If the lock method is not used, the counters in the log will be out of order. But in the end the total value is the same
Shared Boolean variables
In this case, a Boolean variable is recorded globally. Before each task is executed, the variable is obtained to determine whether it is consistent with expectations. If an error is reported during execution and the variable status is modified, it is mostly used in the child process to report an error and end all tasks early. The code is as follows
from multiprocessing import Pool, Lock, Manager from ctypes import c_bool import os lock = Lock() ERROR = Manager().Value(c_bool, False) def run(fn): global tests_count, lock, ERROR if not ERROR.value: try: print('Execute task. PID: %d ' % (os.getpid())) 1/0 except Exception as e: with lock: ERROR.value = True else: print("The child process reported an error and the task ended") if __name__ == "__main__": pool = Pool(10) # 80 tasks will run run() 80 times, each time passing in an element of the xrange array pool.map(run, list(range(80))) pool.close() pool.join()
View execution output
Execute task. PID: 27374 The child process reports an error and the task ends The child process reports an error and the task ends The child process reports an error and the task ends ... Process finished with exit code 0
Initialize a shared variable to a Boolean type of False. Each process first gets the shared variable to determine whether it is False before execution. If it is, the task will be executed, otherwise the execution will be skipped directly. Initialize the Boolean variable using the Manager
class to instantiate it and then call the Value method. c_bool
is a data type under Ctypes. The related types are as follows
The other is to judge the shared variables in the main process and call map_async
so that the main process is not blocked by the child process. If the main process judges that the global variables do not meet expectations, it will exit directly and call terminate
Terminate thread pool
from multiprocessing import Pool, Lock, Manager, Value from ctypes import c_bool import os import time lock = Lock() ERROR = Manager().Value(c_bool, False) COUNTER = Value('i', 0) def run(fn): global tests_count, lock, ERROR try: time.sleep(2) 1/0 except: with lock: ERROR.value = True finally: with lock: COUNTER.value + = 1 print('Execute task (%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid())) if __name__ == "__main__": pool = Pool(10) pool.map_async(run, list(range(80))) pool.close() print("The main process determines...") while COUNTER.value != len(list(range(80))): time.sleep(1) if ERROR.value: print("The child process reported an error and the main process exited early") pool.terminate() break pool.join()
The output is as follows. Check the global variable ERROR every 1 second. If it becomes True, the main process terminates the process pool.
Main process judgment... Execute task (1 / 80). PID: 4168 Execute task (2 / 80). PID: 4169 Execute task (3 / 80). PID: 4177 Execute task(4 / 80). PID: 4171 Execute task (5 / 80). PID: 4173 Execute task (6 / 80). PID: 4182 Execute task (7 / 80). PID: 4183 Execute task (8 / 80). PID: 4174 Execute task (9 / 80). PID: 4179 Execute task (10 / 80). PID: 4181 The child process reports an error and the main process exits early Process finished with exit code 0
A practical example is that multiple processes find the first value in a list that meets the requirements, and if found, exit the multiple processes.
from multiprocessing import Pool, Lock, Manager, Value from ctypes import c_bool import os import time lock = Lock() FOUND = Manager().Value(c_bool, False) COUNTER = Value('i', 0) def run(fn): global tests_count, lock, ERROR try: time.sleep(2) res = fn + 1 if res == 10: print("The result is: {}".format(fn)) with lock: FOUND.value = True return fn except Exception as e: print(e) finally: with lock: COUNTER.value + = 1 print('Execute task (%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid())) if __name__ == "__main__": t1 = time.time() pool = Pool(10) pool.map_async(run, list(range(80))) pool.close() print("The main process determines...") while COUNTER.value != len(list(range(80))): time.sleep(1) if FOUND.value: print("Result found") pool.terminate() break pool.join() t2 = time.time() print(t2 - t1)
Shared dictionary and array variables
Use Manager to create recently, Manager().dict()
, Manager().list()
, the test code is as follows
from multiprocessing.pool import Pool from multiprocessing import Manager, Lock import time import datetime LOCK = Lock() DICT = Manager().dict() LIST = Manager().list() def job(ent): with LOCK: if len(LIST) < 5: time.sleep(1) LIST.append(ent) else: if len(LIST) and ent <= len(LIST) - 1: LIST.pop(ent) print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format (ent), "LIST:{}".format(LIST)) def job2(ent): if len(LIST) < 5: time.sleep(1) LIST.append(ent) else: if len(LIST) and ent <= len(LIST) - 1: LIST.pop(ent) print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format (ent), "LIST:{}".format(LIST)) if __name__ == '__main__': pool = Pool(10) pool.map(job2, list(range(10)) * 2) pool.close() pool.join()
The output of executing the job is as follows. The output result is consistent with the result of a single process and a single thread. In order, each process is queued outside the lock.
dt:2021-11-20 22:01:28 ent:0 LIST:[0] dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1] dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2] dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3] dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4] dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4] dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4] dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4] dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4] dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4] dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4] dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1] dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1] dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3] dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1] dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5] dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5] dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5] dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5] dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5] Process finished with exit code 0
If job2 is executed and the synchronization of shared variables is not controlled, it will be completely out of control and an error pop index out of range will be reported. The reason is that other processes are also operating on the variables and get unexpected results.
dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2] dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2] dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2] dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5] dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6] dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8] dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7] dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7] ... raise self._value IndexError: pop index out of range
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Python entry skill treeHomepageOverview 339641 people are learning the system