Send a thread pool with scalable thread pool size.
When there are not many tasks, do not open so many threads, and open more threads when there are many tasks. When there is no task for a long time, reduce the number of threads to a certain number.
Java’s Threadpoolexcutor can do this, but py’s can’t, it is modified into a thread pool with such characteristics.
""" A thread pool that can automatically adjust the number of threads in real time. """ import atexit import queue import sys import threading import time import weakref from app.utils_ydf import LoggerMixin, nb_print, LoggerLevelSetterMixin # noinspection PyShadowingBuiltins # print = nb_print _shutdown = False _threads_queues = weakref. WeakKeyDictionary() def _python_exit(): global_shutdown _shutdown = True items = list(_threads_queues. items()) for t, q in items: q. put(None) for t, q in items: t. join() atexit. register(_python_exit) class _WorkItem(LoggerMixin): def __init__(self, fn, args, kwargs): self.fn = fn self.args = args self.kwargs = kwargs def run(self): # noinspection PyBroadException try: self.fn(*self.args, **self.kwargs) except BaseException as exc: self.logger.exception(f'An error occurred in the function {self.fn.__name__}, the error reason is {type(exc)} {exc}') def __str__(self): return f'{(self.fn.__name__, self.args, self.kwargs)}' class CustomThreadPoolExecutor(LoggerMixin, LoggerLevelSetterMixin): def __init__(self, max_workers=None, thread_name_prefix=''): """ It is best to be compatible with the official concurrent.futures.ThreadPoolExecutor and the revised BoundedThreadPoolExecutor. The name and number of input parameters are consistent. :param max_workers: :param thread_name_prefix: """ self._max_workers = max_workers or 4 self._min_workers = 5 self._thread_name_prefix = thread_name_prefix self.work_queue = queue.Queue(max_workers) # self._threads = set() self._threads = weakref. WeakSet() self._lock_compute_threads_free_count = threading.Lock() self. threads_free_count = 0 self._shutdown = False self._shutdown_lock = threading.Lock() def set_min_workers(self, min_workers=5): self._min_workers = min_workers return self def change_threads_free_count(self, change_num): with self._lock_compute_threads_free_count: self.threads_free_count += change_num def submit(self, func, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('Cannot add new tasks to the thread pool') self.work_queue.put(_WorkItem(func, args, kwargs)) self._adjust_thread_count() def _adjust_thread_count(self): # if len(self._threads) < self._threads_num: self.logger.debug((self.threads_free_count, len(self._threads), len(_threads_queues), get_current_threads_num())) if self.threads_free_count < self._min_workers and len(self._threads) < self._max_workers: # t = threading.Thread(target=_work, # args=(self._work_queue,self)) t = _CustomThread(self).set_log_level(self.logger.level) t.setDaemon(True) # Note here that it is a daemon thread. Because each thread in the thread pool has entered while 1, it can accept tasks at any time. If the daemon thread is not used, the main thread of the program will end, but the program still cannot end. Using a daemon thread allows for infinite access to tasks to be performed and the ability to terminate the code. t. start() self._threads.add(t) _threads_queues[t] = self.work_queue def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self.work_queue.put(None) if wait: for t in self._threads: t. join() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self. shutdown(wait=True) return False class _CustomThread(threading.Thread, LoggerMixin, LoggerLevelSetterMixin): def __init__(self, executorx: CustomThreadPoolExecutor): super().__init__() self._executorx = executorx self._run_times = 0 def _remove_thread(self, stop_resson=''): # noinspection PyUnresolvedReferences self.logger.debug(f'stop thread {self._ident}, trigger condition is {stop_resson}') self._executorx.change_threads_free_count(-1) self._executorx._threads. remove(self) _threads_queues. pop(self) # noinspection PyProtectedMember def run(self): # noinspection PyUnresolvedReferences self.logger.debug(f'new start thread {self._ident}') self._executorx.change_threads_free_count(1) while True: try: work_item = self._executorx.work_queue.get(block=True, timeout=60) except queue.Empty: # continue # self._remove_thread() #break if self._executorx.threads_free_count > self._executorx._min_workers: self._remove_thread(f'the current thread has no tasks for more than 60 seconds, the number of threads in the thread pool that are not in the working state is {self._executorx.threads_free_count}, which exceeds the specified number {self._executorx._min_workers}') break else: continue # nb_print(work_item) if work_item is not None: self._executorx.change_threads_free_count(-1) work_item. run() del work_item self._executorx.change_threads_free_count(1) self._run_times += 1 if self._run_times == 50: self._remove_thread(f'runs more than 50 times, destroy thread') break continue if _shutdown or self._executorx._shutdown: self._executorx.work_queue.put(None) break # @decorators.tomorrow_threads(20) def show_current_threads_num(sleep_time=60, process_name='', block=False): process_name = sys.argv[0] if process_name == '' else process_name def _show_current_threads_num(): while True: nb_print(f'The number of threads of the {process_name} process is --> {threading.active_count()}') time. sleep(sleep_time) if block: _show_current_threads_num() else: t = threading.Thread(target=_show_current_threads_num, daemon=True) t. start() def get_current_threads_num(): return threading. active_count() if __name__ == '__main__': from app.utils_ydf import decorators, BoundedThreadPoolExecutor # @decorators. keep_circulating(1) def f1(a): time. sleep(0.2) nb_print(f'{a}...') # raise Exception('Throw an error test') # show_current_threads_num() pool = CustomThreadPoolExecutor(200).set_log_level(10).set_min_workers() # pool = BoundedThreadPoolExecutor(200) # Test and compare the original written BoundedThreadPoolExecutor show_current_threads_num(sleep_time=5) for i in range(300): time.sleep(0.3) # The interval time simulation here, when the task is not intensive, only a few threads are needed to handle f1, because the consumption time of f1 is short, there is no need to open so many threads, one of the advantages of CustomThreadPoolExecutor over BoundedThreadPoolExecutor. pool. submit(f1, str(i)) nb_print(6666) # pool. shutdown(wait=True) pool. submit(f1, 'yyyy') # The following test blocks the exit of the main thread. Comment out to measure the exit of the main thread. while True: time. sleep(10)