PythonNote042—pymysql use

Briefly introduce some operations of pymysql, adding, modifying, deleting and checking

Increase

Create the table first, then write data to the table
In addition to the query operation, the commit operation is required for adding, modifying and deleting. For the specific principle, see ref.1

import pandas as pd
import pymysql
import time
import warnings
warnings. filter warnings("ignore")

Create table

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
create_sql = """
create table user(
    id int NOT NULL AUTO_INCREMENT,
    `name` varchar(50) NOT NULL,
    `age` int NOT NULL,
    PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
try:
    # execute sql statement
    cur. execute(create_sql)
    # execute sql statement
    con. commit()
except:
    # rollback on error
    print("An error occurred, rollback")
    con. rollback()

# Close the database connection
con. close()
con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
desc user;
"""
try:
    # execute sql statement
    cur. execute(sql)
    get_df = pd. DataFrame(cur. fetchall())
    print(get_df)
    # execute sql statement
    con. commit()
except:
    # rollback on error
    con. rollback()
# close the cursor
cur.close
# Close the database connection
con. close()
 Field Type Null Key Default Extra
0 id int NO PRI None auto_increment
1 name varchar(50) NO None
2 age int NO None

Insert data

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age)values('Xiao Ming', 14)"
try:
    # execute sql statement
    t1 = time. time()
    for i in range(row_nums):
        cur. execute(sql)
    con.commit() # submit
    t2 = time. time()
    print(f"Loop writing time: {<!-- -->t2 - t1}") # 7s
except:
    # rollback on error
    con. rollback()
# close the cursor
cur.close
# Close the database connection
con. close()
Cycle write time-consuming: 39.632535457611084

Batch write

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age) values(%s,%s)"
cities = [
    ('Xiao Ming', 14) for i in range(row_nums)
]

try:
    # execute sql statement
    t1 = time. time()
    # citys is a combination of parameters, each element corresponds to the corresponding field of a row of insert sql, which can be a tuple or a list
    cur.executemany(sql, cities) # batch execution
    con.commit() # submit
    t2 = time. time()
    print(f"Time-consuming batch writing: {<!-- -->t2 - t1}") # 7s
except:
    # rollback on error
    con. rollback()
# close the cursor
cur.close
# Close the database connection
con. close()
Time-consuming batch writing: 5.722973823547363

Batch writing has obvious speed advantages, pay attention to “insert into user(name, age) values(%s,%s)”, there is a space in front of values, see ref.2 for specific reasons

pyspark batch write

When the amount of data is huge, you can combine spark’s foreachPartition operator to write in parallel

import pandas as pd
import time
import pymysql
import functools
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def get_or_create_hudi(app_name):
    spark = SparkSession \
        .builder\
        .appName(app_name) \
        .config("spark.driver.maxResultSize", "10g") \
        .config("spark.sql.execution.arrow.enabled", "true") \
        .config("spark.dynamicAllocation.enabled", "false") \
        .config("spark.sql.crossJoin.enabled", "true") \
        .config("spark.kryoserializer.buffer.max", "512m") \
        .config("spark.io.compression.codec", "snappy") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("spark.hadoop.dfs.namenode.acls.enabled", "false") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')
    print("\
")
    print("\
")
    return spark
def insert2mysql_partrdd(part, db_param="", value_cols=['name', 'age'], batch=40000):
    """

    @param part:
    @param db_param: mysql configuration information
    @param value_cols: insert column name
    @param batch: batch insert data volume
    @return:
    """
    con = pymysql.connect(host='localhost',
                          port=3306,
                          user='root',
                          password='12345',
                          db='ai',
                          charset="utf8")
    cur = con.cursor(cursor=pymysql.cursors.DictCursor)
    cnt = 0
    batch_list = []
    sql = sql = "insert into user(name, age) values(%s,%s)"
    for row in part:
        # This operation may be time-consuming. . Is there a better way to optimize it?
        batch_list.append([row[i] for i in value_cols])
        cnt = cnt + 1
        if cnt > 0 and cnt % batch == 0:
            cur.executemany(sql, batch_list)
            con.commit() # submit
            batch_list = []
            print(f"{<!-- -->cnt - batch}-{<!-- -->cnt} row data is inserted into MySql!")

    # If the last wave of data is not the batch remainder, push it over
    if cnt % batch != 0:
        cur.executemany(sql, batch_list)
        con.commit() # submit
        print(f"{<!-- -->cnt - cnt % batch}-{<!-- -->cnt} row data is inserted into MySql!")

    if cnt > 0:
        print(f"data sampling-key:{<!-- -->row}")
        print(f"cnt:{<!-- -->cnt}")
    else:
        print("This partition has no data")

    cur. close()
    con. close()
row_nums = 500000

df = pd.DataFrame({<!-- -->"name": ['Xiao Ming'] * row_nums, 'age': [14] * row_nums})
spark = get_or_create_hudi("test")
spark_df = spark.createDataFrame(df).repartition(10)
t1 = time.time()
spark_df.rdd.foreachPartition(
    functools.partial(insert2mysql_partrdd, batch=50000))
t2 = time. time()
print(f"spark batch write time: {<!-- -->t2 - t1}") # 1.2s
Spark batch write time: 8.034992456436157
  • Doesn’t seem to be any faster
  • It may be more effective if the amount of data is larger
  • In addition, running spark on a single machine may also have some impact

Delete

Just made 100w data, delete some

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
delete from user where id>10
"""
try:
    # execute sql statement
    cur. execute(sql)
    # execute sql statement
    con. commit()
except:
    # rollback on error
    print("An error occurred, rollback")
    con. rollback()

# Close the database connection
con. close()
con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select count(*) as cnt from user
"""
try:
    # execute sql statement
    cur. execute(sql)
    get_df = pd. DataFrame(cur. fetchall())
    print(get_df)
    # execute sql statement
    # con.commit()
except:
    # rollback on error
    print("An error occurred, rollback")
    con. rollback()

# Close the database connection
con. close()
 cnt
0 10

10 pieces of data left

Check

Combined with pandas, convert the query data into df

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from user limit 100
"""
try:
    # execute sql statement
    cur. execute(sql)
    get_df = pd. DataFrame(cur. fetchall())
    print(get_df)
    # execute sql statement
    # con.commit()
except:
    # rollback on error
    print("An error occurred, rollback")
    con. rollback()

# Close the database connection
con. close()
 id name age
0 1 Xiao Ming 14
1 2 Xiao Ming 14
2 3 Xiao Ming 14
3 4 Xiao Ming 14
4 5 Xiao Ming 14
5 6 Xiao Ming 14
6 7 Xiao Ming 14
7 8 Xiao Ming 14
8 9 Xiao Ming 14
9 10 Xiao Ming 14

Change

con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
update user set name = 'Xiaohong' where id<=5
"""
try:
    # execute sql statement
    cur. execute(sql)
    # execute sql statement
    con. commit()
except:
    # rollback on error
    print("An error occurred, rollback")
    con. rollback()

# Close the database connection
con. close()
con = pymysql.connect(host='localhost',
                      port=3306,
                      user='root',
                      password='12345',
                      db='ai',
                      charset="utf8")
# Create a cursor (the default data returns tuple, modify it to dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from user limit 100
"""
try:
    # execute sql statement
    cur. execute(sql)
    get_df = pd. DataFrame(cur. fetchall())
    print(get_df)
    # execute sql statement
    # con.commit()
except:
    # rollback on error
    print("An error occurred, rollback")
    con. rollback()

# Close the database connection
con. close()
 id name age
0 1 Little Red 14
1 2 Little Red 14
2 3 little red 14
3 4 Little Red 14
4 5 Little Red 14
5 6 Xiao Ming 14
6 7 Xiao Ming 14
7 8 Xiao Ming 14
8 9 Xiao Ming 14
9 10 Xiao Ming 14

Ref

[1] https://www.runoob.com/python3/python3-mysql.html
[2] https://www.modb.pro/db/184700

2023-07-28 Typhoon, heavy rain in Jiangning District, Nanjing