Briefly introduce some operations of pymysql, adding, modifying, deleting and checking
Increase
Create the table first, then write data to the table
In addition to the query operation, the commit operation is required for adding, modifying and deleting. For the specific principle, see ref.1
import pandas as pd import pymysql import time import warnings warnings. filter warnings("ignore")
Create table
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) create_sql = """ create table user( id int NOT NULL AUTO_INCREMENT, `name` varchar(50) NOT NULL, `age` int NOT NULL, PRIMARY KEY (`id`) )ENGINE=InnoDB DEFAULT CHARSET=utf8; """ try: # execute sql statement cur. execute(create_sql) # execute sql statement con. commit() except: # rollback on error print("An error occurred, rollback") con. rollback() # Close the database connection con. close()
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) sql = """ desc user; """ try: # execute sql statement cur. execute(sql) get_df = pd. DataFrame(cur. fetchall()) print(get_df) # execute sql statement con. commit() except: # rollback on error con. rollback() # close the cursor cur.close # Close the database connection con. close()
Field Type Null Key Default Extra 0 id int NO PRI None auto_increment 1 name varchar(50) NO None 2 age int NO None
Insert data
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) row_nums = 500000 sql = "insert into user(name, age)values('Xiao Ming', 14)" try: # execute sql statement t1 = time. time() for i in range(row_nums): cur. execute(sql) con.commit() # submit t2 = time. time() print(f"Loop writing time: {<!-- -->t2 - t1}") # 7s except: # rollback on error con. rollback() # close the cursor cur.close # Close the database connection con. close()
Cycle write time-consuming: 39.632535457611084
Batch write
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) row_nums = 500000 sql = "insert into user(name, age) values(%s,%s)" cities = [ ('Xiao Ming', 14) for i in range(row_nums) ] try: # execute sql statement t1 = time. time() # citys is a combination of parameters, each element corresponds to the corresponding field of a row of insert sql, which can be a tuple or a list cur.executemany(sql, cities) # batch execution con.commit() # submit t2 = time. time() print(f"Time-consuming batch writing: {<!-- -->t2 - t1}") # 7s except: # rollback on error con. rollback() # close the cursor cur.close # Close the database connection con. close()
Time-consuming batch writing: 5.722973823547363
Batch writing has obvious speed advantages, pay attention to “insert into user(name, age) values(%s,%s)”, there is a space in front of values, see ref.2 for specific reasons
pyspark batch write
When the amount of data is huge, you can combine spark’s foreachPartition operator to write in parallel
import pandas as pd import time import pymysql import functools import pyspark.sql.functions as F from pyspark.sql import SparkSession from pyspark.sql.types import *
def get_or_create_hudi(app_name): spark = SparkSession \ .builder\ .appName(app_name) \ .config("spark.driver.maxResultSize", "10g") \ .config("spark.sql.execution.arrow.enabled", "true") \ .config("spark.dynamicAllocation.enabled", "false") \ .config("spark.sql.crossJoin.enabled", "true") \ .config("spark.kryoserializer.buffer.max", "512m") \ .config("spark.io.compression.codec", "snappy") \ .config("spark.sql.hive.convertMetastoreParquet", "false") \ .config("spark.hadoop.dfs.namenode.acls.enabled", "false") \ .config("spark.sql.hive.convertMetastoreParquet", "false") \ .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .enableHiveSupport() \ .getOrCreate() spark.sparkContext.setLogLevel('ERROR') print("\ ") print("\ ") return spark
def insert2mysql_partrdd(part, db_param="", value_cols=['name', 'age'], batch=40000): """ @param part: @param db_param: mysql configuration information @param value_cols: insert column name @param batch: batch insert data volume @return: """ con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") cur = con.cursor(cursor=pymysql.cursors.DictCursor) cnt = 0 batch_list = [] sql = sql = "insert into user(name, age) values(%s,%s)" for row in part: # This operation may be time-consuming. . Is there a better way to optimize it? batch_list.append([row[i] for i in value_cols]) cnt = cnt + 1 if cnt > 0 and cnt % batch == 0: cur.executemany(sql, batch_list) con.commit() # submit batch_list = [] print(f"{<!-- -->cnt - batch}-{<!-- -->cnt} row data is inserted into MySql!") # If the last wave of data is not the batch remainder, push it over if cnt % batch != 0: cur.executemany(sql, batch_list) con.commit() # submit print(f"{<!-- -->cnt - cnt % batch}-{<!-- -->cnt} row data is inserted into MySql!") if cnt > 0: print(f"data sampling-key:{<!-- -->row}") print(f"cnt:{<!-- -->cnt}") else: print("This partition has no data") cur. close() con. close()
row_nums = 500000 df = pd.DataFrame({<!-- -->"name": ['Xiao Ming'] * row_nums, 'age': [14] * row_nums}) spark = get_or_create_hudi("test") spark_df = spark.createDataFrame(df).repartition(10)
t1 = time.time() spark_df.rdd.foreachPartition( functools.partial(insert2mysql_partrdd, batch=50000)) t2 = time. time() print(f"spark batch write time: {<!-- -->t2 - t1}") # 1.2s
Spark batch write time: 8.034992456436157
- Doesn’t seem to be any faster
- It may be more effective if the amount of data is larger
- In addition, running spark on a single machine may also have some impact
Delete
Just made 100w data, delete some
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) sql = """ delete from user where id>10 """ try: # execute sql statement cur. execute(sql) # execute sql statement con. commit() except: # rollback on error print("An error occurred, rollback") con. rollback() # Close the database connection con. close()
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) sql = """ select count(*) as cnt from user """ try: # execute sql statement cur. execute(sql) get_df = pd. DataFrame(cur. fetchall()) print(get_df) # execute sql statement # con.commit() except: # rollback on error print("An error occurred, rollback") con. rollback() # Close the database connection con. close()
cnt 0 10
10 pieces of data left
Check
Combined with pandas, convert the query data into df
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) sql = """ select * from user limit 100 """ try: # execute sql statement cur. execute(sql) get_df = pd. DataFrame(cur. fetchall()) print(get_df) # execute sql statement # con.commit() except: # rollback on error print("An error occurred, rollback") con. rollback() # Close the database connection con. close()
id name age 0 1 Xiao Ming 14 1 2 Xiao Ming 14 2 3 Xiao Ming 14 3 4 Xiao Ming 14 4 5 Xiao Ming 14 5 6 Xiao Ming 14 6 7 Xiao Ming 14 7 8 Xiao Ming 14 8 9 Xiao Ming 14 9 10 Xiao Ming 14
Change
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) sql = """ update user set name = 'Xiaohong' where id<=5 """ try: # execute sql statement cur. execute(sql) # execute sql statement con. commit() except: # rollback on error print("An error occurred, rollback") con. rollback() # Close the database connection con. close()
con = pymysql.connect(host='localhost', port=3306, user='root', password='12345', db='ai', charset="utf8") # Create a cursor (the default data returns tuple, modify it to dict) cur = con.cursor(cursor=pymysql.cursors.DictCursor) sql = """ select * from user limit 100 """ try: # execute sql statement cur. execute(sql) get_df = pd. DataFrame(cur. fetchall()) print(get_df) # execute sql statement # con.commit() except: # rollback on error print("An error occurred, rollback") con. rollback() # Close the database connection con. close()
id name age 0 1 Little Red 14 1 2 Little Red 14 2 3 little red 14 3 4 Little Red 14 4 5 Little Red 14 5 6 Xiao Ming 14 6 7 Xiao Ming 14 7 8 Xiao Ming 14 8 9 Xiao Ming 14 9 10 Xiao Ming 14
Ref
[1] https://www.runoob.com/python3/python3-mysql.html
[2] https://www.modb.pro/db/184700
2023-07-28 Typhoon, heavy rain in Jiangning District, Nanjing