1. The python packages of redis, mongodb, and mysql all provide batch insert operations, but you need to decompose a task such as 1000 001 externally into 1 small batch for every 1000, and also deal with the remainder after dividing the number of batches , it’s okay if you do it once, but if you have to do it for many tasks, it’s a bit troublesome.
For example, the same is true for redis and mongo. It is necessary to prepare a batch list externally. After the cycle is completed, do not miss tasks that do not reach the number of batches.
city_items is an iterator, the length is a bit large, and it is not easy to divide evenly at once. Every time, it is necessary to write a bunch of batches and compatible remainders, as follows
for city_item in city_items: task_dict = OrderedDict() task_dict['city_cn'] = city_item.get('city') task_dict['city_en'] = city_item.get('cityEn') task_dict['is_international'] = is_international task_dict['url'] = url_city self. logger. debug(task_dict) task_dict_list.append(task_dict) if len(task_dict_list) == 2000: self.logger.debug('execute 2000 city task insertion') with self.redis_local_db7.pipeline(transaction=False) as p: for task_dict in task_dict_list: p.sadd(self.start_urls_key, json.dumps(task_dict)) p. execute() task_dict_list.clear() task_dict_list_lenth = len(task_dict_list) if task_dict_list_lenth > 0: self.logger.debug('execute {} city tasks to insert'.format(task_dict_list_lenth)) with self.redis_local_db7.pipeline(transaction=False) as p: for task_dict in task_dict_list: p.sadd(self.start_urls_key, json.dumps(task_dict)) p. execute() task_dict_list.clear() self.logger.debug(total_city_count)
2. The simpler operation should be like this. Just submit a single task outside the class. You only need to call an api for submitting tasks, and automatically aggregate multiple tasks into a batch in the class. If you want to process quickly, you must insert multiple tasks in batches at a time. , instead of using multi-threading, each thread inserts a task at a time, the efficiency of the two is very different, especially for remote public network ip writing.
Issue a simple batch operation API for the three major databases, and use it in unittest. The batch operations implemented in it are based on the batch operation api of redis mongo mysql itself.
# coding=utf8 """ @author:Administrator @file: bulk_operation.py @time: 2018/08/27 Easier batch operations for the three major databases """ import atexit from typing import Union import abc import time from queue import Queue, Empty import unittest from pymongo import UpdateOne, InsertOne, collection, MongoClient import redis from app.utils_ydf import torndb_for_python3 from app.utils_ydf import LoggerMixin, decorators, LogManager, MongoMixin # NOQA class RedisOperation: """Redis operation, this kind of function is mainly to standardize the format""" def __init__(self, operation_name: str, key: str, value: str): """ :param operation_name: redis operation name, such as sadd lpush, etc. :param key: redis key :param value: the value of the reids key """ self. operation_name = operation_name self.key = key self. value = value class BaseBulkHelper(LoggerMixin, metaclass=abc.ABCMeta): """Batch manipulation abstract base class""" bulk_helper_map = {} def __new__(cls, base_object, *args, **kwargs): if str(base_object) not in cls.bulk_helper_map: # Add str because some types of instances cannot be hashed as dictionary keys self = super().__new__(cls) return self else: return cls. bulk_helper_map[str(base_object)] def __init__(self, base_object: Union[collection.Collection, redis.Redis, torndb_for_python3.Connection], threshold: int = 100, is_print_log: bool = True): if str(base_object) not in self.bulk_helper_map: self._custom_init(base_object, threshold, is_print_log) self. bulk_helper_map[str(base_object)] = self def _custom_init(self, base_object, threshold, is_print_log): self. base_object = base_object self._threshold = threshold self._is_print_log = is_print_log self._to_be_request_queue = Queue(threshold * 2) self._current_time = time.time() atexit.register(self.__do_something_before_exit) # Execute the registered function before the program automatically ends self._main_thread_has_exit = False self.__excute_bulk_operation_in_other_thread() self.logger.debug(f'{self.__class__} is instantiated') def add_task(self, base_operation: Union[UpdateOne, InsertOne, RedisOperation, tuple]): """Add a single operation that needs to be performed, and the program automatically aggregates old batch operations""" self._to_be_request_queue.put(base_operation) @decorators.tomorrow_threads(10) def __excute_bulk_operation_in_other_thread(self): while True: if self._to_be_request_queue.qsize() >= self._threshold or time.time() > self._current_time + 10: self._do_bulk_operation() if self._main_thread_has_exit and self._to_be_request_queue.qsize() == 0: break time. sleep(10**-4) @abc.abstractmethod def _do_bulk_operation(self): raise NotImplementedError def __do_something_before_exit(self): self._main_thread_has_exit = True self.logger.critical(f'Execute [{str(self.base_object)}] remaining tasks before the program automatically ends') class MongoBulkWriteHelper(BaseBulkHelper): """ A simpler mongo batch insert can directly submit an operation, automatically aggregate multiple operations into one batch and then insert, which is n times faster. """ def _do_bulk_operation(self): if self._to_be_request_queue.qsize() > 0: t_start = time. time() count = 0 request_list = [] for _ in range(self._threshold): try: request = self._to_be_request_queue.get_nowait() count + = 1 request_list.append(request) except Empty: pass if request_list: self.base_object.bulk_write(request_list, ordered=False) if self._is_print_log: self.logger.info(f'[{str(self.base_object)}] The number of tasks inserted in batches is {count} and the time consumed is {round(time.time() - t_start,6)}') self._current_time = time.time() class RedisBulkWriteHelper(BaseBulkHelper): """redis batch insert, more convenient to operate non-divisible batches""" def _do_bulk_operation(self): if self._to_be_request_queue.qsize() > 0: t_start = time. time() count = 0 pipeline = self. base_object. pipeline() for _ in range(self._threshold): try: request = self._to_be_request_queue.get_nowait() count + = 1 except Empty: pass else: getattr(pipeline, request.operation_name)(request.key, request.value) pipeline. execute() pipeline. reset() if self._is_print_log: self.logger.info(f'[{str(self.base_object)}] The number of tasks inserted in batches is {count} and the time consumed is {round(time.time() - t_start,6)}') self._current_time = time.time() class MysqlBulkWriteHelper(BaseBulkHelper): """mysql batch operation""" def __new__(cls, base_object: torndb_for_python3.Connection, *, sql_short: str = None, threshold: int = 100, is_print_log: bool = True): # print(cls. bulk_helper_map) if str(base_object) + sql_short not in cls.bulk_helper_map: # Adding str is because some types of instances cannot be hashed as dictionary keys self = object.__new__(cls) return self else: return cls.bulk_helper_map[str(base_object) + sql_short] def __init__(self, base_object: torndb_for_python3.Connection, *, sql_short: str = None, threshold: int = 100, is_print_log: bool = True): if str(base_object) + sql_short not in self.bulk_helper_map: super()._custom_init(base_object, threshold, is_print_log) self.sql_short = sql_short self. bulk_helper_map[str(self. base_object) + sql_short] = self def _do_bulk_operation(self): if self._to_be_request_queue.qsize() > 0: t_start = time. time() count = 0 values_list = [] for _ in range(self._threshold): try: request = self._to_be_request_queue.get_nowait() count + = 1 values_list.append(request) except Empty: pass if values_list: real_count = self.base_object.executemany_rowcount(self.sql_short, values_list) if self._is_print_log: self.logger.info(f'[{str(self.base_object)}] The number of tasks inserted in batches is {real_count} and the time consumed is {round(time.time() - t_start,6)}') self._current_time = time.time() class _Test(unittest. TestCase, LoggerMixin): @unittest.skip def test_mongo_bulk_write(self): # col = MongoMixin().mongo_16_client.get_database('test').get_collection('ydf_test2') col = MongoClient().get_database('test').get_collection('ydf_test2') with decorators. TimerContextManager(): for i in range(50000 + 13): # time. sleep(0.01) item = {<!-- -->'_id': i, 'field1': i * 2} mongo_helper = MongoBulkWriteHelper(col, 10000, is_print_log=True) mongo_helper.add_task(UpdateOne({<!-- -->'_id': item['_id']}, {<!-- -->'$set': item}, upsert= True)) @unittest.skip def test_redis_bulk_write(self): with decorators. TimerContextManager(): r = redis.Redis(password='123456') # redis_helper = RedisBulkWriteHelper(r, 100) # Can be placed outside for i in range(100003): # time. sleep(0.2) redis_helper = RedisBulkWriteHelper(r, 2000) # can also be infinitely instantiated here redis_helper.add_task(RedisOperation('sadd', 'key1', str(i))) # @unittest.skip def test_mysql_bulk_write(self): mysql_conn = torndb_for_python3.Connection(host='localhost', database='test', user='root', password='123456', charset='utf8') with decorators. TimerContextManager(): # mysql_helper = MysqlBulkWriteHelper(mysql_conn, sql_short='INSERT INTO test.table_2 (column_1, column_2) VALUES (%s,%s)', threshold=200) # It is best to write outside the loop for i in range(100000 + 9): mysql_helper = MysqlBulkWriteHelper(mysql_conn, sql_short='INSERT INTO test.table_2 (column_1, column_2) VALUES (%s,%s)', threshold=20000, ) # Support infinite instantiation, if accidentally written in the loop does not matter mysql_helper.add_task((i, i * 2)) if __name__ == '__main__': unittest. main()
The batch operation methods of the three databases are the same. The calling method is to call the add_task method and submit a task.
Screenshot of mysql batch operation
3. The code mainly uses template mode, flyweight mode and proxy mode.
The template mode is to save code, and is used to expand other database types for batch operations, and write less methods. The strategy pattern can be used instead.
Flyweight mode does not require users to be careful to initialize in a suitable code location, and then use this object all the time. It can support initializing instances in any position, including for loops.
In proxy mode, users do not need to directly use the official pipeline excutemany bulkwrite methods of the three major databases, and the objects themselves call these official interfaces.