Simpler batch operations for the three major databases of redis, mongodb, and mysql. Automatic aggregator for batch tasks.

1. The python packages of redis, mongodb, and mysql all provide batch insert operations, but you need to decompose a task such as 1000 001 externally into 1 small batch for every 1000, and also deal with the remainder after dividing the number of batches , it’s okay if you do it once, but if you have to do it for many tasks, it’s a bit troublesome.

For example, the same is true for redis and mongo. It is necessary to prepare a batch list externally. After the cycle is completed, do not miss tasks that do not reach the number of batches.

city_items is an iterator, the length is a bit large, and it is not easy to divide evenly at once. Every time, it is necessary to write a bunch of batches and compatible remainders, as follows

 for city_item in city_items:
            
            task_dict = OrderedDict()
            task_dict['city_cn'] = city_item.get('city')
            task_dict['city_en'] = city_item.get('cityEn')
            task_dict['is_international'] = is_international
            task_dict['url'] = url_city
            self. logger. debug(task_dict)
            task_dict_list.append(task_dict)
      
            if len(task_dict_list) == 2000:
                self.logger.debug('execute 2000 city task insertion')
                with self.redis_local_db7.pipeline(transaction=False) as p:
                    for task_dict in task_dict_list:
                        p.sadd(self.start_urls_key, json.dumps(task_dict))
                    p. execute()
                task_dict_list.clear()
        task_dict_list_lenth = len(task_dict_list)
        if task_dict_list_lenth > 0:
            self.logger.debug('execute {} city tasks to insert'.format(task_dict_list_lenth))
            with self.redis_local_db7.pipeline(transaction=False) as p:
                for task_dict in task_dict_list:
                    p.sadd(self.start_urls_key, json.dumps(task_dict))
                p. execute()
            task_dict_list.clear()
        self.logger.debug(total_city_count)

2. The simpler operation should be like this. Just submit a single task outside the class. You only need to call an api for submitting tasks, and automatically aggregate multiple tasks into a batch in the class. If you want to process quickly, you must insert multiple tasks in batches at a time. , instead of using multi-threading, each thread inserts a task at a time, the efficiency of the two is very different, especially for remote public network ip writing.

Issue a simple batch operation API for the three major databases, and use it in unittest. The batch operations implemented in it are based on the batch operation api of redis mongo mysql itself.

# coding=utf8
"""
@author:Administrator
@file: bulk_operation.py
@time: 2018/08/27

Easier batch operations for the three major databases
"""
import atexit
from typing import Union
import abc
import time
from queue import Queue, Empty
import unittest
from pymongo import UpdateOne, InsertOne, collection, MongoClient
import redis
from app.utils_ydf import torndb_for_python3
from app.utils_ydf import LoggerMixin, decorators, LogManager, MongoMixin # NOQA


class RedisOperation:
    """Redis operation, this kind of function is mainly to standardize the format"""

    def __init__(self, operation_name: str, key: str, value: str):
        """
        :param operation_name: redis operation name, such as sadd lpush, etc.
        :param key: redis key
        :param value: the value of the reids key
        """
        self. operation_name = operation_name
        self.key = key
        self. value = value


class BaseBulkHelper(LoggerMixin, metaclass=abc.ABCMeta):
    """Batch manipulation abstract base class"""
    bulk_helper_map = {}

    def __new__(cls, base_object, *args, **kwargs):
        if str(base_object) not in cls.bulk_helper_map: # Add str because some types of instances cannot be hashed as dictionary keys
            self = super().__new__(cls)
            return self
        else:
            return cls. bulk_helper_map[str(base_object)]

    def __init__(self, base_object: Union[collection.Collection, redis.Redis, torndb_for_python3.Connection], threshold: int = 100, is_print_log: bool = True):
        if str(base_object) not in self.bulk_helper_map:
            self._custom_init(base_object, threshold, is_print_log)
            self. bulk_helper_map[str(base_object)] = self

    def _custom_init(self, base_object, threshold, is_print_log):
        self. base_object = base_object
        self._threshold = threshold
        self._is_print_log = is_print_log
        self._to_be_request_queue = Queue(threshold * 2)
        self._current_time = time.time()
        atexit.register(self.__do_something_before_exit) # Execute the registered function before the program automatically ends
        self._main_thread_has_exit = False
        self.__excute_bulk_operation_in_other_thread()
        self.logger.debug(f'{self.__class__} is instantiated')

    def add_task(self, base_operation: Union[UpdateOne, InsertOne, RedisOperation, tuple]):
        """Add a single operation that needs to be performed, and the program automatically aggregates old batch operations"""
        self._to_be_request_queue.put(base_operation)

    @decorators.tomorrow_threads(10)
    def __excute_bulk_operation_in_other_thread(self):
        while True:
            if self._to_be_request_queue.qsize() >= self._threshold or time.time() > self._current_time + 10:
                self._do_bulk_operation()
            if self._main_thread_has_exit and self._to_be_request_queue.qsize() == 0:
                break
            time. sleep(10**-4)

    @abc.abstractmethod
    def _do_bulk_operation(self):
        raise NotImplementedError

    def __do_something_before_exit(self):
        self._main_thread_has_exit = True
        self.logger.critical(f'Execute [{str(self.base_object)}] remaining tasks before the program automatically ends')


class MongoBulkWriteHelper(BaseBulkHelper):
    """
    A simpler mongo batch insert can directly submit an operation, automatically aggregate multiple operations into one batch and then insert, which is n times faster.
    """

    def _do_bulk_operation(self):
        if self._to_be_request_queue.qsize() > 0:
            t_start = time. time()
            count = 0
            request_list = []
            for _ in range(self._threshold):
                try:
                    request = self._to_be_request_queue.get_nowait()
                    count + = 1
                    request_list.append(request)
                except Empty:
                    pass
            if request_list:
                self.base_object.bulk_write(request_list, ordered=False)
            if self._is_print_log:
                self.logger.info(f'[{str(self.base_object)}] The number of tasks inserted in batches is {count} and the time consumed is {round(time.time() - t_start,6)}')
            self._current_time = time.time()


class RedisBulkWriteHelper(BaseBulkHelper):
    """redis batch insert, more convenient to operate non-divisible batches"""

    def _do_bulk_operation(self):
        if self._to_be_request_queue.qsize() > 0:
            t_start = time. time()
            count = 0
            pipeline = self. base_object. pipeline()
            for _ in range(self._threshold):
                try:
                    request = self._to_be_request_queue.get_nowait()
                    count + = 1
                except Empty:
                    pass
                else:
                    getattr(pipeline, request.operation_name)(request.key, request.value)
            pipeline. execute()
            pipeline. reset()
            if self._is_print_log:
                self.logger.info(f'[{str(self.base_object)}] The number of tasks inserted in batches is {count} and the time consumed is {round(time.time() - t_start,6)}')
            self._current_time = time.time()


class MysqlBulkWriteHelper(BaseBulkHelper):
    """mysql batch operation"""

    def __new__(cls, base_object: torndb_for_python3.Connection, *, sql_short: str = None, threshold: int = 100, is_print_log: bool = True):
        # print(cls. bulk_helper_map)
        if str(base_object) + sql_short not in cls.bulk_helper_map: # Adding str is because some types of instances cannot be hashed as dictionary keys
            self = object.__new__(cls)
            return self
        else:
            return cls.bulk_helper_map[str(base_object) + sql_short]

    def __init__(self, base_object: torndb_for_python3.Connection, *, sql_short: str = None, threshold: int = 100, is_print_log: bool = True):
        if str(base_object) + sql_short not in self.bulk_helper_map:
            super()._custom_init(base_object, threshold, is_print_log)
            self.sql_short = sql_short
            self. bulk_helper_map[str(self. base_object) + sql_short] = self

    def _do_bulk_operation(self):
        if self._to_be_request_queue.qsize() > 0:
            t_start = time. time()
            count = 0
            values_list = []
            for _ in range(self._threshold):
                try:
                    request = self._to_be_request_queue.get_nowait()
                    count + = 1
                    values_list.append(request)
                except Empty:
                    pass
            if values_list:
                real_count = self.base_object.executemany_rowcount(self.sql_short, values_list)
                if self._is_print_log:
                    self.logger.info(f'[{str(self.base_object)}] The number of tasks inserted in batches is {real_count} and the time consumed is {round(time.time() - t_start,6)}')
                self._current_time = time.time()


class _Test(unittest. TestCase, LoggerMixin):
    @unittest.skip
    def test_mongo_bulk_write(self):
        # col = MongoMixin().mongo_16_client.get_database('test').get_collection('ydf_test2')
        col = MongoClient().get_database('test').get_collection('ydf_test2')
        with decorators. TimerContextManager():
            for i in range(50000 + 13):
                # time. sleep(0.01)
                item = {<!-- -->'_id': i, 'field1': i * 2}
                mongo_helper = MongoBulkWriteHelper(col, 10000, is_print_log=True)
                mongo_helper.add_task(UpdateOne({<!-- -->'_id': item['_id']}, {<!-- -->'$set': item}, upsert= True))

    @unittest.skip
    def test_redis_bulk_write(self):
        with decorators. TimerContextManager():
            r = redis.Redis(password='123456')
            # redis_helper = RedisBulkWriteHelper(r, 100) # Can be placed outside
            for i in range(100003):
                # time. sleep(0.2)
                redis_helper = RedisBulkWriteHelper(r, 2000) # can also be infinitely instantiated here
                redis_helper.add_task(RedisOperation('sadd', 'key1', str(i)))
# @unittest.skip
    def test_mysql_bulk_write(self):
        mysql_conn = torndb_for_python3.Connection(host='localhost', database='test', user='root', password='123456', charset='utf8')
        with decorators. TimerContextManager():
            # mysql_helper = MysqlBulkWriteHelper(mysql_conn, sql_short='INSERT INTO test.table_2 (column_1, column_2) VALUES (%s,%s)', threshold=200) # It is best to write outside the loop
            for i in range(100000 + 9):
                mysql_helper = MysqlBulkWriteHelper(mysql_conn, sql_short='INSERT INTO test.table_2 (column_1, column_2) VALUES (%s,%s)', threshold=20000, ) # Support infinite instantiation, if accidentally written in the loop does not matter
                mysql_helper.add_task((i, i * 2))


if __name__ == '__main__':
    unittest. main()
   

The batch operation methods of the three databases are the same. The calling method is to call the add_task method and submit a task.

Screenshot of mysql batch operation

3. The code mainly uses template mode, flyweight mode and proxy mode.

The template mode is to save code, and is used to expand other database types for batch operations, and write less methods. The strategy pattern can be used instead.

Flyweight mode does not require users to be careful to initialize in a suitable code location, and then use this object all the time. It can support initializing instances in any position, including for loops.

In proxy mode, users do not need to directly use the official pipeline excutemany bulkwrite methods of the three major databases, and the objects themselves call these official interfaces.