Xugu Database-python realizes tens of millions of numbers

1. Background

When you need to create tens of millions of data, it is a bit cumbersome to write SQL and use jmeter to run concurrency. Can you use a tool to solve these needs at once and test disk performance?

2. Principle

First write a certain amount of data to a temporary table PRODUCTS_TEST, then write a stored procedure to fetch data from the temporary table and write it into the target table PRODUCTS, and then execute the stored procedure through multiple threads.

Create main table statement

create table sysdba.products(
product_no varchar(50) not null,
product_name varchar(200),
product_introduce varchar(4000),
manufacture_date date,
sell_dates varchar(50),
address varchar(200),
product_type varchar(50)
)HOTSPOT 20;

Create slave table statement

create table PRODUCTS_TEST as select * from products p

Stored procedure for batch inserting data into a temporary table

CREATE OR REPLACE PROCEDURE SP_insert_356_DATA IS
i number;
begin
 for i in 1..1000 loop
INSERT INTO SYSDBA.PRODUCTS_TEST
            VALUES (sys_uuid AS product_no -- Reference to the self-increasing sequence
                   , DBMS_RANDOM.STRING('x', 8) AS product_name --Get 8 random numbers plus letters
                   , CASE TRUNC(DBMS_RANDOM.VALUE(1, 6)) --randomly select 5 enumeration values
                         when 1 then 'Snack Gift Pack A'
                         when 2 then 'Snack Gift Pack B'
                         when 3 then 'Snack gift package C'
                         when 4 then 'Snack gift pack D'
                         when 5 then 'Snack Gift Pack E'
                         ELSE 'Snack Gift Pack E'
                         END AS PRODUCT_INTRODUCE
                    , to_date('2017-01-01 00:00:00','yyyy-mm-dd hh24:mi:ss') + i AS MANUFACTURE_DATE
                    , to_date('2018-04-01 00:00:00','yyyy-mm-dd hh24:mi:ss') + i AS MANUFACTURE_DATE
                       -- to_char(sysdate, 'J') The total number of days from January 1, 4712 BC to the specified date
                   , CASE TRUNC(DBMS_RANDOM.VALUE(1, 6)) --randomly select 5 enumeration values
                         when 1 then 'Beijing'
                         when 2 then 'Shanghai'
                         when 3 then 'Shenzhen'
                         when 4 then 'Guangzhou'
                         when 5 then 'Chengdu'
                         else 'Wuhan'
                         END address
                   ,CASE TRUNC(DBMS_RANDOM.VALUE(1, 6)) --randomly select 5 enumeration values
                         when 1 then 'food'
                         when 2 then 'ornaments'
                         when 3 then 'car'
                         when 4 then 'hardware'
                         when 5 then 'military industry'
                         else 'seafood'
                         END);
end loop;
end SP_insert_356_DATA;

Use multiple threads to execute the following stored procedure

--Use jmeter to open ten threads
CREATE OR REPLACE PROCEDURE SP_insert_DATA IS
BEGIN
FOR X IN 1..1000
        LOOP
INSERT INTO SYSDBA.PRODUCTS_test1
SELECT sys_uuid as PRODUCT_NO,PRODUCT_NAME,PRODUCT_INTRODUCE,MANUFACTURE_DATE,SELL_DATES,ADDRESS,PRODUCT_TYPE
FROM
SYSDBA.PRODUCTS_tests;
COMMIT;
        END LOOP;
END SP_insert_DATA;

3. python code implementation

Note: Multiple processes cannot pickle database connections, cursors and other objects.
The pickle module is used to convert Python objects into byte streams that can be transferred and reconstructed between different processes. But not all types of objects can be serialized, especially some underlying system resource objects, such as database connections, sockets, etc. These objects generally do not support serialization because of the complications associated with maintaining state after serialization and deserialization.

import argparse
import concurrent.futures
import multiprocessing
import queue
import re
importsys
import time
from multiprocessing import freeze_support

import xgcondb


def get_cur(db_host, db_port, db_user, db_pwd, db_name):
    conn = xgcondb.connect(host=db_host, port=db_port, database=db_name, user=db_user, password=db_pwd, charset="utf8")
    cur = conn.cursor()
    return cur


# Parameter parsing is prefixed so that multiple processes can avoid reporting errors.
def parse_args():
    parser = argparse.ArgumentParser(
        # description='This is a database environment collection tool',
        prefix_chars='-'
    )
    #Add positional parameters
    # parser.add_argument('input_file', help='path to input file')
    # Add optional parameters
    # parser.add_argument('-o', '--output', help='Path to output file')
    parser.add_argument('-H', '--host', help='Enter the database ip address')
    parser.add_argument('-P', '--port', help='Port number database port', type=int, default=5138)
    parser.add_argument('-u', '--user', help='Enter database user')
    parser.add_argument('-p', '--pwd', help='Enter database password')
    parser.add_argument('-d', '--database_name', help='Enter database name')
    #Add flag parameter
    parser.add_argument('-v', '--verbose', action='store_true', help='Whether to display detailed information')
    args = parser.parse_args()
    #Access the parsed parameters
    # input_file = args.input_file
    # output_file = args.output
    host = args.host
    port = args.port
    user = args.user
    password=args.pwd
    db = args.database_name
    verbose = args.verbose

    # Here you can perform corresponding operations based on the parsed parameters.
    if len(sys.argv) == 1:
        host = input("Please enter ip: ")
        port = input("Please enter the port: ")
        user = input("Please enter user: ")
        password = input("Please enter password: ")
        db = input("Please enter the database: ")
    if verbose:
        print("Show details")
    if not host:
        parser.print_help()
        raise Exception('No IP entered!!!\
')
    if not port:
        parser.print_help()
        raise Exception('No port entered!!!\
')
    if not user:
        parser.print_help()
        raise Exception('No user entered!!!\
')
    if not password:
        parser.print_help()
        raise Exception('No password entered!!!\
')
    if not db:
        parser.print_help()
        raise Exception('No database input!!!\
')
    # if host and port and user and password and db:
    # print(f'host: {host} port: {port} user: {user} password: {password} db: {db} \
')

    return host, port, user, password, db


class ConnectionPool:
    def __init__(self, max_connections, connection_params):
        self.max_connections = max_connections
        self.connection_params = connection_params
        self._pool = queue.Queue(maxsize=max_connections)
        self._initialize_pool()

    def _initialize_pool(self):
        for _ in range(self.max_connections):
            connection = self._create_connection()
            self._pool.put(connection)

    def _create_connection(self):
        try:
            return xgcondb.connect(**self.connection_params)
        except Exception as e:
            print(e)

    def get_connection(self):
        try:
            return self._pool.get(block=False)
        except queue.Empty:
            raise Exception("Connection pool is empty. Try again later.")

    def release_connection(self, connection):
        self._pool.put(connection)

    def close_all_connections(self):
        while not self._pool.empty():
            connection = self._pool.get()
            connection.close()

    def executor(self, sql):
        conn = self.get_connection()
        cursor = conn.cursor()
        try:
            data = cursor.execute(sql)
        except Exception as e:
            conn.rollback()
            print(f'Execution exception; {<!-- -->sql},{<!-- -->e}')
            self.release_connection(conn)
            return None
        if cursor.rowcount:
            rows = cursor.fetchall()
            column_names = [desc[0] for desc in cursor.description]
            conn.commit()
            self.release_connection(conn)
            return [dict(zip(column_names, row)) for row in rows]

    def call_proc(self, name, *args):
        conn = self.get_connection()
        cursor = conn.cursor()
        try:
            # print(name, tuple(args), tuple(1 for i in range(len(args))))
            iflen(args):
                cursor.callproc(name, tuple(args), tuple(1 for i in range(len(args))))
            else:
                cursor.callproc(name)
        except Exception as e:
            conn.rollback()
            print(f'Execution exception; {<!-- -->name},{<!-- -->e}')
            self.release_connection(conn)
            return None
        conn.commit()
        self.release_connection(conn)
        return None

    def execute_func(self, name, *args):
        conn = self.get_connection()
        cursor = conn.cursor()
        try:
            data = cursor.callfunc(name, tuple(args), tuple(1 for i in range(len(args))))
        except Exception as e:
            conn.rollback()
            print(f'Execution exception; {<!-- -->name},{<!-- -->e}')
            self.release_connection(conn)
            return None
        conn.commit()
        self.release_connection(conn)
        return data


# There cannot be comments in the stored procedure


def drop_tb(table):
    sql = f"drop table if exists {<!-- -->table} cascade"
    pool.executor(sql)


def add_index():
    sql1 = 'alter table SYSDBA.EVALUATIONS add constraint EVALUATIONS_PK primary key(EVALUATION_NO);'
    sql2 = 'alter table SYSDBA.LOGISTICS add constraint INDEX_KEY primary key(LOGISTICS_NO);'
    sql3 = 'alter table SYSDBA.ORDERS add constraint INDEX_KEY primary key(ORDER_NO);'
    sql4 = 'alter table SYSDBA.PRODUCTS add constraint PRODUCT_KEY primary key(PRODUCT_NO);'
    sql5 = 'alter table SYSDBA.PURCHASES add constraint PURCHASE_KEY primary key(PURCHASE_NO);'

    sql6 = 'create index "EVALUATIONS_PRODUCT_NO_IDX" on SYSDBA.EVALUATIONS("PRODUCT_NO") indextype is btree global ;'
    sql7 = 'create index "PEODUCT_NO_KEY" on SYSDBA.LOGISTICS("PRODUCT_NO") indextype is btree global ;'
    sql8 = 'create index "PRODECT_KEY" on SYSDBA.ORDERS("PRODUCT_NO") indextype is btree global ;'
    sql9 = 'create index "PRODUCT_MANUF_DATE_INDEX" on SYSDBA.PRODUCTS("MANUFACTURE_DATE") indextype is btree global ;'
    sql10 = 'create index "PRODECT_NO_KEY" on SYSDBA.PURCHASES("PRODUCT_NO") indextype is btree global ;'
    sqls = [sql1, sql2, sql3, sql4, sql5, sql6, sql7, sql8, sql9, sql10]
    for i in sqls:
        pool.executor(i)


def create_products_tb(hotspot):
    sql2 = f"""
    create table if not exists sysdba.products(
    product_no varchar(50) not null,
    product_name varchar(200),
    product_introduce varchar(4000),
    manufacture_date date,
    sell_dates varchar(50),
    address varchar(200),
    product_type varchar(50)
    )HOTSPOT {<!-- -->hotspot};
    """
    pool.executor(sql2)


def create_evaluations_tb(hotspot):
    sql2 = f'''
    create table sysdba.evaluations(
    evaluation_no varchar(50) not null,
    product_no varchar(50),
    product_batche varchar(50),
    product_evaluation varchar(4000),
    evaluation_type varchar(50),
    evaluation_date date,
    deal varchar(200),
    product_bathe clob
    ) HOTSPOT {<!-- -->hotspot};
    '''
    pool.executor(sql2)


def create_logistics_tb(hotspot):
    sql = f'''
    create table sysdba.logistics(
    logistics_no varchar(50) not null,
    product_no varchar(50),
    recipient_name varchar(50),
    sender_name varchar(50),
    order_no varchar(50),
    notes varchar(4000),
    send_date date,
    reach_date date,
    logistics_type varchar(50)
    ) HOTSPOT {<!-- -->hotspot};
    '''
    pool.executor(sql)


def create_order_tb(hotspot):
    sql = f"""
    create table sysdba.orders(
    order_no varchar(50) not null,
    employee_no varchar(50),
    order_name varchar(50),
    order_num integer,
    order_date date,
    order_address varchar(200),
    notes varchar(4000),
    product_no varchar(50),
    order_type varchar(50)
    ) HOTSPOT {<!-- -->hotspot};
    """
    pool.executor(sql)


def create_purchases_tb(hotspot):
    sql = f'''
    create table sysdba.purchases(
    purchase_no varchar(50) not null,
    product_no varchar(50),
    purchase_date date,
    purchase_num integer,
    purchase_price integer,
    factory varchar(200),
    address varchar(200)
    ) HOTSPOT {<!-- -->hotspot};
    '''
    pool.executor(sql)


def create_temp_proc(num):
    """
    :param num: Insert data into the temporary table once
    :return:
    """
    sql = f"""
    CREATE OR REPLACE PROCEDURE SP_insert_356_DATA() IS
    i number;
    begin
     for i in 1..{<!-- -->num} loop
    INSERT INTO SYSDBA.PRODUCTS_TEST
                VALUES (sys_uuid AS product_no
                       , DBMS_RANDOM.STRING('x', 8) AS product_name
                       , CASE TRUNC(DBMS_RANDOM.VALUE(1, 6))
                             when 1 then 'Snack gift package A'
                             when 2 then 'Snack Gift Pack B'
                             when 3 then 'Snack gift package C'
                             when 4 then 'Snack gift pack D'
                             when 5 then 'Snack Gift Pack E'
                             ELSE 'Snack Gift Pack E'
                             END AS product_introduce
                        , to_date('2017-01-01 00:00:00','yyyy-mm-dd hh24:mi:ss') + i AS manufacture_date
                        , to_date('2018-04-01 00:00:00','yyyy-mm-dd hh24:mi:ss') + i AS sell_dates
                       , CASE TRUNC(DBMS_RANDOM.VALUE(1, 6))
                             when 1 then 'Beijing'
                             when 2 then 'Shanghai'
                             when 3 then 'Shenzhen'
                             when 4 then 'Guangzhou'
                             when 5 then 'Chengdu'
                             else 'Wuhan'
                             END address
                       ,CASE TRUNC(DBMS_RANDOM.VALUE(1, 6))
                             when 1 then 'food'
                             when 2 then 'ornaments'
                             when 3 then 'car'
                             when 4 then 'hardware'
                             when 5 then 'military industry'
                             else 'seafood'
                             END);
    end loop;
    commit;
    end SP_insert_356_DATA;
    """
    sql2 = 'create table PRODUCTS_TEST as select * from products;'
    pool.executor("truncate table SYSDBA.PRODUCTS_TEST ")
    # show('SYSDBA.PRODUCTS_TEST')
    pool.executor(sql)
    pool.call_proc('SP_insert_356_DATA')


def create_product_proc(num):
    sql = f"""
    create or replace procedure sp_insert_data is
    begin
    for x in 1..{<!-- -->num} loop
    insert into sysdba.products
        select sys_uuid as product_no,product_name,product_introduce,manufacturing_date,sysdate,address,product_type from sysdba.products_test s;
    commit;
     end loop;
    end sp_insert_data;
    """
    pool.executor(sql)


def create_procduct_test():
    sql = 'create table if not exists PRODUCTS_TEST as select * from products;'
    pool.executor(sql)


def show(table):
    cur = get_cur(db_host, db_port, db_user, db_pwd, db_name)
    sql = f'select count(*) from {<!-- -->table}'
    data = cur.execute(sql)
    row = cur.fetchone()
    print(f'{<!-- -->table} : {<!-- -->row}')


#Multiple process calls
def mul_proc_executor():
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        # with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        # with concurrent.futures(max_workers=10) as executor:
        # Submit tasks with parameters to the thread pool
        args = ['p_test' for i in range(5)]
        # args = ['p_test']
        # results = executor.map(execute_proc, args)
        results = executor.map(execute_proc_args, args)
        for result in results:
            print(result)
        # results = [executor.submit(execute_proc, arg) for arg in args]

        # # Get the results of the task
        # for future in concurrent.futures.as_completed(results):
        # result = future.result()
        # print(f"Task result: {result}")


def parse_str(input_string):
    # input_string = "123 45.67 abc 89.0 def 10 0"
    # Use regular expressions to match integers, floating point numbers and strings (use |) to distinguish between multiple regular expressions
    pattern = re.compile(r".*?(\d + \.\d + |\d + |\w + )")
    matches = re.findall(pattern, input_string)
    # print(matches)
    res = []
    for match in matches:
        if "." in match:
            res.append(float(match))
        elif match.isdigit() or (match[0] in " + -" and match[1:].isdigit()):
            res.append(int(match))
        else:
            res.append(match)
    return res


def rebuild_tables():
    """
    Rebuild the table: delete first and then rebuild
    :return:
    """
    tables = ['products', 'evaluations', 'logistics', 'orders', 'purchases', 'products_test']
    tables = [f'{<!-- -->db_user}.{<!-- -->i}' for i in tables]
    for table in tables:
        drop_tb(table)

    create_products_tb(20)
    create_evaluations_tb(20)
    create_logistics_tb(20)
    create_order_tb(20)
    create_purchases_tb(20)
    create_procduct_test()


def execute_proc_args(name):
    cur = get_cur(db_host, db_port, db_user, db_pwd, db_name)
    # print(cur.callproc("test_in", (20,), (1,)))
    cur.callproc(name, (200000,), (1,))


def execute_proc(name, db_host, db_port, db_user, db_pwd, db_name, *args):
    cur = get_cur(db_host, db_port, db_user, db_pwd, db_name)
    iflen(args):
        cur.callproc(name, tuple(args), tuple(1 for _ in range(len(args))))
    else:
        cur.callproc(name)


#Multiple process calls
def multi_process(n, proc_name, db_host, db_port, db_user, db_pwd, db_name, *args):
    processes = []
    for i in range(n):
        process = multiprocessing.Process(target=execute_proc,
                                          args=(proc_name, db_host, db_port, db_user, db_pwd, db_name, *args))
        processes.append(process)
    for process in processes:
        process.start()
    # Wait for all processes to complete
    for process in processes:
        process.join()


def once_proc():
    tmp_n = int(input("Please enter the number of temporary table rows: "))
    proc_nums = int(input("Please enter the number of times the temporary table is inserted into the formal table: "))
    parallel_n = int(input("Please enter the number of concurrency: "))
    create_temp_proc(tmp_n)
    create_product_proc(proc_nums)
    start = time.time()
    multi_process(parallel_n, 'sp_insert_data', db_host, db_port, db_user, db_pwd, db_name)
    end = time.time() - start
    show('products')
    show('products_test')
    print(f'took {<!-- -->end:.2f} seconds', f'tps:{<!-- -->(tmp_n * parallel_n * proc_nums / end):.2f} lines/s ')


if __name__ == '__main__':
    print(xgcondb.version())
    freeze_support()
    # db_host = '10.28.23.174'
    #db_port = 5138
    # db_user = 'SYSDBA'
    # db_pwd = 'SYSDBA'
    # db_name = 'SYSTEM'
    db_host, db_port, db_user, db_pwd, db_name = parse_args()
    pool = ConnectionPool(
        max_connections=100,
        connection_params={<!-- -->
            "user": db_user,
            "password": db_pwd,
            "host": db_host,
            "port": db_port,
            "database": db_name,
            "charset": 'utf8',
        },
    )
    pool.executor('set max_loop_num to 0')
    pool.executor('set max_trans_modify to 0')
    #Purpose: Remove 1w data from the temporary table to the formal table
    rebuild_tables()
    while True:
        once_proc()
        flag = input("Whether it is necessary to clear the table and rebuild it (default is not to rebuild), please enter Y/N: ")
        if flag == 'Y' or flag == 'y':
            rebuild_tables()
            print('Table has been rebuilt')
        q = input('\
Press q to exit…or continue')
        if q == 'q' or q == 'Q':
            break

4. Package it into a tool for use


5. Performance monitoring during the number creation process


Performance Monitoring

6. Statistical analysis

Through testing in a 4-node cluster, it was concluded that the larger the number of temporary table insertions, the better. 120 million rows were inserted, and the tps was also stable at 30w/s. By comparing the smaller and larger values of the number of inserted rows, the single performance of this cluster The performance of inserting rows into the table is stable at around 30w/s.