Send a python thread pool with scalable thread pool size. Tested.

Send a thread pool with scalable thread pool size.

When there are not many tasks, do not open so many threads, and open more threads when there are many tasks. When there is no task for a long time, reduce the number of threads to a certain number.

Java’s Threadpoolexcutor can do this, but py’s can’t, it is modified into a thread pool with such characteristics.

"""
A thread pool that can automatically adjust the number of threads in real time.

"""

import atexit
import queue
import sys
import threading
import time
import weakref

from app.utils_ydf import LoggerMixin, nb_print, LoggerLevelSetterMixin

# noinspection PyShadowingBuiltins
# print = nb_print

_shutdown = False
_threads_queues = weakref. WeakKeyDictionary()


def _python_exit():
    global_shutdown
    _shutdown = True
    items = list(_threads_queues. items())
    for t, q in items:
        q. put(None)
    for t, q in items:
        t. join()


atexit. register(_python_exit)


class _WorkItem(LoggerMixin):
    def __init__(self, fn, args, kwargs):
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        # noinspection PyBroadException
        try:
            self.fn(*self.args, **self.kwargs)
        except BaseException as exc:
            self.logger.exception(f'An error occurred in the function {self.fn.__name__}, the error reason is {type(exc)} {exc}')

    def __str__(self):
        return f'{(self.fn.__name__, self.args, self.kwargs)}'


class CustomThreadPoolExecutor(LoggerMixin, LoggerLevelSetterMixin):
    def __init__(self, max_workers=None, thread_name_prefix=''):
        """
        It is best to be compatible with the official concurrent.futures.ThreadPoolExecutor and the revised BoundedThreadPoolExecutor. The name and number of input parameters are consistent.
        :param max_workers:
        :param thread_name_prefix:
        """
        self._max_workers = max_workers or 4
        self._min_workers = 5
        self._thread_name_prefix = thread_name_prefix
        self.work_queue = queue.Queue(max_workers)
        # self._threads = set()
        self._threads = weakref. WeakSet()
        self._lock_compute_threads_free_count = threading.Lock()
        self. threads_free_count = 0
        self._shutdown = False
        self._shutdown_lock = threading.Lock()

    def set_min_workers(self, min_workers=5):
        self._min_workers = min_workers
        return self

    def change_threads_free_count(self, change_num):
        with self._lock_compute_threads_free_count:
            self.threads_free_count += change_num

    def submit(self, func, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('Cannot add new tasks to the thread pool')
        self.work_queue.put(_WorkItem(func, args, kwargs))
        self._adjust_thread_count()

    def _adjust_thread_count(self):
        # if len(self._threads) < self._threads_num:
        self.logger.debug((self.threads_free_count, len(self._threads), len(_threads_queues), get_current_threads_num()))
        if self.threads_free_count < self._min_workers and len(self._threads) < self._max_workers:
            # t = threading.Thread(target=_work,
            # args=(self._work_queue,self))
            t = _CustomThread(self).set_log_level(self.logger.level)
            t.setDaemon(True) # Note here that it is a daemon thread. Because each thread in the thread pool has entered while 1, it can accept tasks at any time. If the daemon thread is not used, the main thread of the program will end, but the program still cannot end. Using a daemon thread allows for infinite access to tasks to be performed and the ability to terminate the code.
            t. start()
            self._threads.add(t)
            _threads_queues[t] = self.work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self.work_queue.put(None)
        if wait:
            for t in self._threads:
                t. join()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self. shutdown(wait=True)
        return False


class _CustomThread(threading.Thread, LoggerMixin, LoggerLevelSetterMixin):
    def __init__(self, executorx: CustomThreadPoolExecutor):
        super().__init__()
        self._executorx = executorx
        self._run_times = 0

    def _remove_thread(self, stop_resson=''):
        # noinspection PyUnresolvedReferences
        self.logger.debug(f'stop thread {self._ident}, trigger condition is {stop_resson}')
        self._executorx.change_threads_free_count(-1)
        self._executorx._threads. remove(self)
        _threads_queues. pop(self)

    # noinspection PyProtectedMember
    def run(self):
        # noinspection PyUnresolvedReferences
        self.logger.debug(f'new start thread {self._ident}')
        self._executorx.change_threads_free_count(1)
        while True:
            try:
                work_item = self._executorx.work_queue.get(block=True, timeout=60)
            except queue.Empty:
                # continue
                # self._remove_thread()
                #break
                if self._executorx.threads_free_count > self._executorx._min_workers:
                    self._remove_thread(f'the current thread has no tasks for more than 60 seconds, the number of threads in the thread pool that are not in the working state is {self._executorx.threads_free_count}, which exceeds the specified number {self._executorx._min_workers}')
                    break
                else:
                    continue

            # nb_print(work_item)
            if work_item is not None:
                self._executorx.change_threads_free_count(-1)
                work_item. run()
                del work_item
                self._executorx.change_threads_free_count(1)
                self._run_times += 1
                if self._run_times == 50:
                    self._remove_thread(f'runs more than 50 times, destroy thread')
                    break
                continue
            if _shutdown or self._executorx._shutdown:
                self._executorx.work_queue.put(None)
                break


# @decorators.tomorrow_threads(20)
def show_current_threads_num(sleep_time=60, process_name='', block=False):
    process_name = sys.argv[0] if process_name == '' else process_name

    def _show_current_threads_num():
        while True:
            nb_print(f'The number of threads of the {process_name} process is --> {threading.active_count()}')
            time. sleep(sleep_time)

    if block:
        _show_current_threads_num()
    else:
        t = threading.Thread(target=_show_current_threads_num, daemon=True)
        t. start()


def get_current_threads_num():
    return threading. active_count()


if __name__ == '__main__':
    from app.utils_ydf import decorators, BoundedThreadPoolExecutor


    # @decorators. keep_circulating(1)
    def f1(a):
        time. sleep(0.2)
        nb_print(f'{a}...')
        # raise Exception('Throw an error test')


    # show_current_threads_num()
    pool = CustomThreadPoolExecutor(200).set_log_level(10).set_min_workers()
    # pool = BoundedThreadPoolExecutor(200) # Test and compare the original written BoundedThreadPoolExecutor
    show_current_threads_num(sleep_time=5)
    for i in range(300):
        time.sleep(0.3) # The interval time simulation here, when the task is not intensive, only a few threads are needed to handle f1, because the consumption time of f1 is short, there is no need to open so many threads, one of the advantages of CustomThreadPoolExecutor over BoundedThreadPoolExecutor.
        pool. submit(f1, str(i))

    nb_print(6666)
    # pool. shutdown(wait=True)
    pool. submit(f1, 'yyyy')

    # The following test blocks the exit of the main thread. Comment out to measure the exit of the main thread.
    while True:
        time. sleep(10)