Initial method:
import pymysql def update_invoice_fields(order_sn, name=None, units=None, class_name=None, tax_rate=None): """ Update invoice related fields based on order number Parameters: order_sn (str): order number name (str): product name units (str): commodity units class_name (str): tax class name tax_rate (float): tax rate Returns: int: number of rows updated """ # connect to MySQL database conn = pymysql. connect( host='rmics.com', port=3306, user='yui', password='123456', db='test', charset='utf8' ) # Create a cursor object for executing SQL statements cursor = conn. cursor() # Construct SQL statement sql1 = """ UPDATE crm_invoice a JOIN crm_invoice_info b ON a.id = b.invoice_id JOIN( SELECT *, COUNT(*) AS cnt FROM crm_invoice_info GROUP BY invoice_id HAVING cnt = 1 )tmp ON tmp.id = b.id JOIN order_goods n ON b.rec_id = n.rec_id JOIN goods d ON n.goods_id = d.goods_id """ sql = """ WHERE a.ordersn = %(ordersn)s """ if class_name and tax_rate: sql + = "AND EXISTS(SELECT 1 FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s) " elif class_name: sql + = "AND d.class_name = %(class_name)s" elif tax_rate: sql + = "AND EXISTS(SELECT 1 FROM tax_categories WHERE class_name = b.class AND tax_rate = %(tax_rate)s) " set_fields = [] if name: set_fields.append("b.NAME = %(name)s") set_fields.append("d.invoice_name = %(name)s") if units: set_fields.append("b.units = %(units)s") set_fields.append("d.unit = %(units)s") if class_name and tax_rate: set_fields.append("b.class = %(class_name)s") set_fields.append( "b.class_code = (SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1)") set_fields.append( "b.tax_rate = (SELECT tax_rate FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1)") set_fields.append("d.class_name = %(class_name)s") set_fields.append( "d.class_code = (SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1)") if not set_fields: # If there is no SET to update the field, return directly return 0 info = "SET " + ", ".join(set_fields) sql = sql1 + info + sql print("executed sql is equal to =========", sql) # Execute the SQL statement params = {<!-- -->"order_sn": order_sn, "name": name, "units": units, "class_name": class_name, "tax_rate": tax_rate} cursor. execute(sql, params) # commit changes conn.commit() # Get the number of updated rows rowcount = cursor. rowcount print("Number of rows executed successfully rowcount===", rowcount) # Close the cursor and database connection cursor. close() conn. close() # return the number of updated rows return rowcount update_invoice_fields("20220110134064", name="", units="", class_name="daily necessities", tax_rate=13) # update_invoice_fields("20220110134064", name="mask")
Package 1:
import pymysql class BaseSQL: """ SQL statement builder base class """ def __init__(self, db_config, table, columns): """ Initialize connection database and table information Args: db_config (dict): database connection information table (dict): related table and column names columns (list): list of fields to be updated """ self.conn = pymysql.connect(**db_config) self. table = table self. columns = columns def _build_where(self, where=None): """ Construct WHERE conditional statement Args: where (dict): query condition Returns: str: WHERE condition statement dict: query parameters in the WHERE condition """ if not where: return '', {<!-- -->} where_conditions = [] params = {<!-- -->} for key, value in where.items(): if value is None: continue if isinstance(value, (list, tuple)): where_conditions.append(f"{<!-- -->key} IN ({<!-- -->','.join('%s' for _ in value)})" ) params[key] = value else: where_conditions.append(f"{<!-- -->key} = %({<!-- -->key})s") params[key] = value if not where_conditions: return '', {<!-- -->} where_clause = "WHERE " + " AND ".join(where_conditions) return where_clause, params def _build_set(self, data): """ Constructs a SET update field statement Args: data (dict): fields and values to be updated Returns: str: SET update field statement dict: SET to update the value in the field """ set_fields = [] params = {<!-- -->} for field, value in data.items(): if value is not None: set_fields.append(f"{<!-- -->self.columns[field]} = %({<!-- -->field})s") params[field] = value if not set_fields: return '', {<!-- -->} set_clause = "SET " + ", ".join(set_fields) return set_clause, params def execute(self, sql, params=None): """ Execute the SQL statement, return the execution result and the number of affected rows. Args: sql (str): SQL statement params (dict): parameters in the SQL statement Returns: tuple: execution result and number of affected rows """ try: with self.conn.cursor() as cursor: row_count = cursor. execute(sql, params) result = cursor. fetchall() self.conn.commit() return result, row_count except Exception as e: self.conn.rollback() raise e @staticmethod def build_params(data=None, where=None): """ Construct parameters in SQL statements Args: data (dict): fields and values to be updated where (dict): query condition Returns: dict: parameters in the SQL statement """ params = {<!-- -->} if where: params. update(where) if data: params. update(data) return params class InvoiceUpdater(BaseSQL): """ Update invoice related fields """ def __init__(self, db_config, order_sn, name=None, units=None, class_name=None, tax_rate=None): """ Initialize connection database and parameter information Args: db_config (dict): database connection information order_sn (str): order number name (str): product name units (str): commodity units class_name (str): tax class name tax_rate (float): tax rate """ super().__init__(db_config, table=TABLES, columns=COLUMNS) self. order_sn = order_sn self.name = name self.units = units self. class_name = class_name self.tax_rate = tax_rate def update(self): """ Update invoice related fields Returns: int: number of rows updated """ where = {<!-- -->"a.order_sn": self.order_sn} if self.class_name and self.tax_rate: where["SELECT 1 FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s"] = None elif self. class_name: where["d.class_name"] = self.class_name elif self.tax_rate: where["SELECT 1 FROM tax_categories WHERE class_name = b.class AND tax_rate = %(tax_rate)s"] = None data = {<!-- -->} if self.name: data[COLUMNS['name']] = self.name data[COLUMNS['invoice_name']] = self.name if self.units: data[COLUMNS['units']] = self.units data[COLUMNS['unit']] = self.units if self.class_name and self.tax_rate: data[COLUMNS['class']] = self. class_name data[COLUMNS['class_code']] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1" data[COLUMNS['tax_rate']] = "SELECT tax_rate FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1" data[COLUMNS['class_name']] = self. class_name data[COLUMNS['class_code']] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1" where_clause, where_params = self._build_where(where) set_clause, set_params = self._build_set(data) params = self.build_params(where=where_params, data=set_params) if not set_clause: # If there is no SET to update the field, return directly return 0 sql = f""" UPDATE {<!-- -->self.table['invoice']} AS a JOIN {<!-- -->self.table['invoice_info']} AS b ON a.{<!-- -->COLUMNS['invoice_id']} = b.{<!-- -->COLUMNS['invoice_id']} JOIN ( SELECT *, COUNT(*) AS cnt FROM {<!-- -->self.table['invoice_info']} GROUP BY {<!-- -->COLUMNS['invoice_id']} HAVING cnt = 1 ) AS tmp ON tmp.{<!-- -->COLUMNS['invoice_id']} = b.{<!-- -->COLUMNS['invoice_id']} JOIN {<!-- -->self.table['order_goods']} AS n ON b.{<!-- -->COLUMNS['rec_id']} = n.{<!-- -->COLUMNS['rec_id']} JOIN {<!-- -->self.table['goods']} AS d ON n.{<!-- -->COLUMNS['goods_id']} = d.{<!-- -->COLUMNS['goods_id']} {<!-- -->set_clause} {<!-- -->where_clause} """ result, row_count = self. execute(sql, params) return row_count
Package Three:
import pymysql import logging from datetime import datetime # Database connection information DB_CONFIG = {<!-- --> "host": "rmics.com", "port": 3306, "user": "yui", "password": "123456", "db": "test", "charset": "utf8", } # log configuration information LOG_CONFIG = {<!-- --> "filename": "update_invoice_fields.log", "level": logging. INFO, "format": "[%(asctime)s] [%(levelname)s] %(message)s", "datefmt": "%Y-%m-%d %H:%M:%S", } # The corresponding relationship between the field name and the column name to be updated COLUMNS = {<!-- --> "name": "b.NAME", "units": "b.units", "class_name": "b. class", "tax_rate": "b.tax_rate", "invoice_name": "d.invoice_name", "class_code": "d. class_code", "class_name2": "d. class_name", "unit": "d.unit", "tax_rate2": "b.tax_rate", "class": "b. class", "invoice_id": "id", "rec_id": "rec_id", "goods_id": "goods_id", } # Correspondence between related table names and column names TABLES = {<!-- --> "invoice": "crm_invoice", "invoice_info": "crm_invoice_info", "order_goods": "military_order_goods", "goods": "goods", } # Basic service class, encapsulating database and log functions class BaseService: """ Basic service class, encapsulating database and log functions """ def __init__(self, db_config=DB_CONFIG, logger_name="BaseService"): """ Initialize database connection, logger Args: db_config (dict): database connection information logger_name (str): logger name """ self.conn = pymysql.connect(**db_config) self. logger = logging. getLogger(logger_name) self.logger.setLevel(LOG_CONFIG["level"]) formatter = logging.Formatter(LOG_CONFIG["format"], LOG_CONFIG["datefmt"]) handler = logging. FileHandler(LOG_CONFIG["filename"]) handler. setFormatter(formatter) self. logger. addHandler(handler) def _build_where(self, where): """ Construct the WHERE clause Args: where (dict): dictionary of WHERE conditions Returns: str: WHERE clause dict: WHERE condition parameter """ where_clause = "" where_params = {<!-- -->} for key, value in where.items(): if isinstance(value, str): # Direct concatenation of strings where_clause + = f" AND {<!-- -->key} = %(where_{<!-- -->key})s" where_params[f"where_{<!-- -->key}"] = value elif isinstance(value, dict): # Concatenate subqueries sub_clause, sub_params = self._build_where(value) where_clause + = f" AND ({<!-- -->sub_clause})" where_params. update(sub_params) elif isinstance(value, list): # Concatenate IN clauses where_clause + = f" AND {<!-- -->key} IN %(where_{<!-- -->key})s" where_params[f"where_{<!-- -->key}"] = value elif value is None: # Concatenate IS NULL clauses where_clause += f" AND {<!-- -->key} IS NULL" else: raise ValueError(f"Invalid WHERE condition: {<!-- -->key}={<!-- -->value}") if where_clause: where_clause = "WHERE " + where_clause[5:] return where_clause, where_params def _build_set(self, data): """ Construct the SET clause Args: data (dict): update field dictionary Returns: str: SET clause dict: Update field parameters """ set_clause = "" set_params = {<!-- -->} for key, value in data.items(): if isinstance(value, str): # Direct concatenation of strings set_clause + = f", {<!-- -->key} = %(set_{<!-- -->key})s" set_params[f"set_{<!-- -->key}"] = value elif isinstance(value, dict): # Concatenate subqueries for sub_key, sub_value in value.items(): set_clause + = f", {<!-- -->key} = ({<!-- -->sub_value})" set_params.update({<!-- -->sub_key: sub_value}) elif isinstance(value, list): # Currently does not support update of list type raise ValueError(f"Unsupported data type for SET clause: {<!-- -->key}={<!-- -->value}") else: # Direct splicing of numeric types set_clause + = f", {<!-- -->key} = {<!-- -->value}" if set_clause: set_clause = "SET " + set_clause[2:] return set_clause, set_params def execute(self, sql, params=None): """ Execute SQL statement Args: sql (str): SQL statement params (dict): SQL statement parameters Returns: tuple: query result and number of affected rows """ cursor = self.conn.cursor() try: cursor. execute(sql, params) self.conn.commit() return cursor.fetchall(), cursor.rowcount except Exception as e: self.logger.error(f"Failed to execute SQL query: {<!-- -->sql}, params={<!-- -->params}, error={<!-- -->str (e)}") raise ValueError(str(e)) from e finally: cursor. close() # Update invoice related fields class InvoiceUpdater(BaseService): """ Update invoice related fields """ def __init__(self, order_sn=None, name=None, units=None, class_name=None, tax_rate=None, **kwargs): """ Initialize the order number and field information to be updated Args: order_sn (str): order number name (str): invoice name units (str): units class_name (str): tax class name tax_rate (float): tax rate """ super().__init__(db_config=DB_CONFIG, logger_name="InvoiceUpdater") self. order_sn = order_sn self.name = name self.units = units self. class_name = class_name self.tax_rate = tax_rate def execute(self): """ Update invoice related fields Returns: int: number of rows updated """ try: where = {<!-- -->"a.order_sn": self.order_sn} if self.class_name and self.tax_rate: where[ "SELECT 1 FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s"] = None elif self. class_name: where["d.class_name"] = self.class_name elif self.tax_rate: where["SELECT 1 FROM tax_categories WHERE class_name = b.class AND tax_rate = %(tax_rate)s"] = None data = {<!-- -->} if self.name: data["name"] = self.name data["invoice_name"] = self.name if self.units: data["units"] = self.units data["unit"] = self.units if self.class_name and self.tax_rate: data["class"] = self. class_name data["class_code"] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1" data["tax_rate"] = "SELECT tax_rate FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1" data["class_name2"] = self. class_name data["class_code2"] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1" where_clause, where_params = self._build_where(where) set_clause, set_params = self._build_set(data) params = {<!-- -->**where_params, **set_params} if not set_clause: # If there is no SET to update the field, return directly return 0 sql = f""" UPDATE {<!-- -->TABLES['invoice']} AS a JOIN {<!-- -->TABLES['invoice_info']} AS b ON a.{<!-- -->COLUMNS['invoice_id']} = b.{<!-- -- >COLUMNS['invoice_id']} JOIN ( SELECT *, COUNT(*) AS cnt FROM {<!-- -->TABLES['order_goods']} WHERE {<!-- -->COLUMNS['rec_id']} IS NOT NULL GROUP BY {<!-- -->COLUMNS['goods_id']} HAVING cnt = 1 ) AS c ON b.{<!-- -->COLUMNS['goods_id']} = c.{<!-- -->COLUMNS['goods_id']} JOIN {<!-- -->TABLES['goods']} AS d ON c.{<!-- -->COLUMNS['rec_id']} = d.{<!-- -- >COLUMNS['rec_id']} {<!-- -->where_clause} {<!-- -->set_clause} """ result, count = self. execute(sql, params=params) self.logger.info(f"Updated {<!-- -->count} rows for ordersn={<!-- -->self.ordersn}") return count except Exception as e: self.logger.error(f"Failed to update invoice fields for ordersn={<!-- -->self.ordersn}: {<!-- -->str(e)}") raise ValueError(str(e)) from e if __name__ == "__main__": updater = InvoiceUpdater( order_sn="123456", name="Test Invoice", units="unit", class_name="Office Supplies", tax_rate=0.16 ) updater. execute()