pymysql+ dynamic package update statement

Initial method:

import pymysql


def update_invoice_fields(order_sn, name=None, units=None, class_name=None, tax_rate=None):
    """
    Update invoice related fields based on order number

    Parameters:
        order_sn (str): order number
        name (str): product name
        units (str): commodity units
        class_name (str): tax class name
        tax_rate (float): tax rate

    Returns:
        int: number of rows updated
    """
    # connect to MySQL database
    conn = pymysql. connect(
        host='rmics.com',
        port=3306,
        user='yui',
        password='123456',
        db='test',
        charset='utf8'
    )

    # Create a cursor object for executing SQL statements
    cursor = conn. cursor()

    # Construct SQL statement
    sql1 = """
    UPDATE crm_invoice a
    JOIN crm_invoice_info b ON a.id = b.invoice_id
    JOIN( SELECT *, COUNT(*) AS cnt FROM crm_invoice_info GROUP BY invoice_id HAVING cnt = 1 )tmp ON tmp.id = b.id
    JOIN order_goods n ON b.rec_id = n.rec_id
    JOIN goods d ON n.goods_id = d.goods_id
    """
    sql = """
    WHERE a.ordersn = %(ordersn)s
    """
    if class_name and tax_rate:
        sql + = "AND EXISTS(SELECT 1 FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s) "
    elif class_name:
        sql + = "AND d.class_name = %(class_name)s"
    elif tax_rate:
        sql + = "AND EXISTS(SELECT 1 FROM tax_categories WHERE class_name = b.class AND tax_rate = %(tax_rate)s) "

    set_fields = []
    if name:
        set_fields.append("b.NAME = %(name)s")
        set_fields.append("d.invoice_name = %(name)s")
    if units:
        set_fields.append("b.units = %(units)s")
        set_fields.append("d.unit = %(units)s")
    if class_name and tax_rate:
        set_fields.append("b.class = %(class_name)s")
        set_fields.append(
            "b.class_code = (SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1)")
        set_fields.append(
            "b.tax_rate = (SELECT tax_rate FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1)")
        set_fields.append("d.class_name = %(class_name)s")
        set_fields.append(
            "d.class_code = (SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1)")

    if not set_fields:
        # If there is no SET to update the field, return directly
        return 0

    info = "SET " + ", ".join(set_fields)
    sql = sql1 + info + sql
    print("executed sql is equal to =========", sql)

    # Execute the SQL statement
    params = {<!-- -->"order_sn": order_sn, "name": name, "units": units, "class_name": class_name, "tax_rate": tax_rate}

    cursor. execute(sql, params)

    # commit changes
    conn.commit()

    # Get the number of updated rows
    rowcount = cursor. rowcount
    print("Number of rows executed successfully rowcount===", rowcount)
    # Close the cursor and database connection
    cursor. close()
    conn. close()

    # return the number of updated rows
    return rowcount


update_invoice_fields("20220110134064", name="", units="", class_name="daily necessities", tax_rate=13)

# update_invoice_fields("20220110134064", name="mask")

Package 1:

import pymysql


class BaseSQL:
    """
    SQL statement builder base class
    """

    def __init__(self, db_config, table, columns):
        """
        Initialize connection database and table information

        Args:
            db_config (dict): database connection information
            table (dict): related table and column names
            columns (list): list of fields to be updated
        """
        self.conn = pymysql.connect(**db_config)
        self. table = table
        self. columns = columns

    def _build_where(self, where=None):
        """
        Construct WHERE conditional statement

        Args:
            where (dict): query condition

        Returns:
            str: WHERE condition statement
            dict: query parameters in the WHERE condition
        """
        if not where:
            return '', {<!-- -->}

        where_conditions = []
        params = {<!-- -->}
        for key, value in where.items():
            if value is None:
                continue
            if isinstance(value, (list, tuple)):
                where_conditions.append(f"{<!-- -->key} IN ({<!-- -->','.join('%s' for _ in value)})" )
                params[key] = value
            else:
                where_conditions.append(f"{<!-- -->key} = %({<!-- -->key})s")
                params[key] = value

        if not where_conditions:
            return '', {<!-- -->}

        where_clause = "WHERE " + " AND ".join(where_conditions)
        return where_clause, params

    def _build_set(self, data):
        """
        Constructs a SET update field statement

        Args:
            data (dict): fields and values to be updated

        Returns:
            str: SET update field statement
            dict: SET to update the value in the field
        """
        set_fields = []
        params = {<!-- -->}
        for field, value in data.items():
            if value is not None:
                set_fields.append(f"{<!-- -->self.columns[field]} = %({<!-- -->field})s")
                params[field] = value

        if not set_fields:
            return '', {<!-- -->}

        set_clause = "SET " + ", ".join(set_fields)
        return set_clause, params

    def execute(self, sql, params=None):
        """
        Execute the SQL statement, return the execution result and the number of affected rows.

        Args:
            sql (str): SQL statement
            params (dict): parameters in the SQL statement

        Returns:
            tuple: execution result and number of affected rows
        """
        try:
            with self.conn.cursor() as cursor:
                row_count = cursor. execute(sql, params)
                result = cursor. fetchall()
            self.conn.commit()
            return result, row_count
        except Exception as e:
            self.conn.rollback()
            raise e

    @staticmethod
    def build_params(data=None, where=None):
        """
        Construct parameters in SQL statements

        Args:
            data (dict): fields and values to be updated
            where (dict): query condition

        Returns:
            dict: parameters in the SQL statement
        """
        params = {<!-- -->}
        if where:
            params. update(where)
        if data:
            params. update(data)
        return params


class InvoiceUpdater(BaseSQL):
    """
    Update invoice related fields
    """

    def __init__(self, db_config, order_sn, name=None, units=None, class_name=None, tax_rate=None):
        """
        Initialize connection database and parameter information

        Args:
            db_config (dict): database connection information
            order_sn (str): order number
            name (str): product name
            units (str): commodity units
            class_name (str): tax class name
            tax_rate (float): tax rate
        """
        super().__init__(db_config, table=TABLES, columns=COLUMNS)
        self. order_sn = order_sn
        self.name = name
        self.units = units
        self. class_name = class_name
        self.tax_rate = tax_rate

    def update(self):
        """
        Update invoice related fields

        Returns:
            int: number of rows updated
        """
        where = {<!-- -->"a.order_sn": self.order_sn}

        if self.class_name and self.tax_rate:
            where["SELECT 1 FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s"] = None
        elif self. class_name:
            where["d.class_name"] = self.class_name
        elif self.tax_rate:
            where["SELECT 1 FROM tax_categories WHERE class_name = b.class AND tax_rate = %(tax_rate)s"] = None

        data = {<!-- -->}
        if self.name:
            data[COLUMNS['name']] = self.name
            data[COLUMNS['invoice_name']] = self.name
        if self.units:
            data[COLUMNS['units']] = self.units
            data[COLUMNS['unit']] = self.units
        if self.class_name and self.tax_rate:
            data[COLUMNS['class']] = self. class_name
            data[COLUMNS['class_code']] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1"
            data[COLUMNS['tax_rate']] = "SELECT tax_rate FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1"
            data[COLUMNS['class_name']] = self. class_name
            data[COLUMNS['class_code']] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1"

        where_clause, where_params = self._build_where(where)
        set_clause, set_params = self._build_set(data)
        params = self.build_params(where=where_params, data=set_params)

        if not set_clause:
            # If there is no SET to update the field, return directly
            return 0

        sql = f"""
        UPDATE {<!-- -->self.table['invoice']} AS a
        JOIN {<!-- -->self.table['invoice_info']} AS b ON a.{<!-- -->COLUMNS['invoice_id']} = b.{<!-- -->COLUMNS['invoice_id']}
        JOIN (
            SELECT *, COUNT(*) AS cnt FROM {<!-- -->self.table['invoice_info']} GROUP BY {<!-- -->COLUMNS['invoice_id']} HAVING cnt = 1
        ) AS tmp ON tmp.{<!-- -->COLUMNS['invoice_id']} = b.{<!-- -->COLUMNS['invoice_id']}
        JOIN {<!-- -->self.table['order_goods']} AS n ON b.{<!-- -->COLUMNS['rec_id']} = n.{<!-- -->COLUMNS['rec_id']}
        JOIN {<!-- -->self.table['goods']} AS d ON n.{<!-- -->COLUMNS['goods_id']} = d.{<!-- -->COLUMNS['goods_id']}
        {<!-- -->set_clause}
        {<!-- -->where_clause}
        """

        result, row_count = self. execute(sql, params)
        return row_count

Package Three:

import pymysql
import logging
from datetime import datetime


# Database connection information
DB_CONFIG = {<!-- -->
    "host": "rmics.com",
    "port": 3306,
    "user": "yui",
    "password": "123456",
    "db": "test",
    "charset": "utf8",
}


# log configuration information
LOG_CONFIG = {<!-- -->
    "filename": "update_invoice_fields.log",
    "level": logging. INFO,
    "format": "[%(asctime)s] [%(levelname)s] %(message)s",
    "datefmt": "%Y-%m-%d %H:%M:%S",
}


# The corresponding relationship between the field name and the column name to be updated
COLUMNS = {<!-- -->
    "name": "b.NAME",
    "units": "b.units",
    "class_name": "b. class",
    "tax_rate": "b.tax_rate",
    "invoice_name": "d.invoice_name",
    "class_code": "d. class_code",
    "class_name2": "d. class_name",
    "unit": "d.unit",
    "tax_rate2": "b.tax_rate",
    "class": "b. class",
    "invoice_id": "id",
    "rec_id": "rec_id",
    "goods_id": "goods_id",
}


# Correspondence between related table names and column names
TABLES = {<!-- -->
    "invoice": "crm_invoice",
    "invoice_info": "crm_invoice_info",
    "order_goods": "military_order_goods",
    "goods": "goods",
}


# Basic service class, encapsulating database and log functions
class BaseService:
    """
    Basic service class, encapsulating database and log functions
    """

    def __init__(self, db_config=DB_CONFIG, logger_name="BaseService"):
        """
        Initialize database connection, logger

        Args:
            db_config (dict): database connection information
            logger_name (str): logger name
        """
        self.conn = pymysql.connect(**db_config)
        self. logger = logging. getLogger(logger_name)
        self.logger.setLevel(LOG_CONFIG["level"])
        formatter = logging.Formatter(LOG_CONFIG["format"], LOG_CONFIG["datefmt"])
        handler = logging. FileHandler(LOG_CONFIG["filename"])
        handler. setFormatter(formatter)
        self. logger. addHandler(handler)

    def _build_where(self, where):
        """
        Construct the WHERE clause

        Args:
            where (dict): dictionary of WHERE conditions

        Returns:
            str: WHERE clause
            dict: WHERE condition parameter
        """
        where_clause = ""
        where_params = {<!-- -->}
        for key, value in where.items():
            if isinstance(value, str):
                # Direct concatenation of strings
                where_clause + = f" AND {<!-- -->key} = %(where_{<!-- -->key})s"
                where_params[f"where_{<!-- -->key}"] = value
            elif isinstance(value, dict):
                # Concatenate subqueries
                sub_clause, sub_params = self._build_where(value)
                where_clause + = f" AND ({<!-- -->sub_clause})"
                where_params. update(sub_params)
            elif isinstance(value, list):
                # Concatenate IN clauses
                where_clause + = f" AND {<!-- -->key} IN %(where_{<!-- -->key})s"
                where_params[f"where_{<!-- -->key}"] = value
            elif value is None:
                # Concatenate IS NULL clauses
                where_clause += f" AND {<!-- -->key} IS NULL"
            else:
                raise ValueError(f"Invalid WHERE condition: {<!-- -->key}={<!-- -->value}")
        if where_clause:
            where_clause = "WHERE " + where_clause[5:]
        return where_clause, where_params

    def _build_set(self, data):
        """
        Construct the SET clause

        Args:
            data (dict): update field dictionary

        Returns:
            str: SET clause
            dict: Update field parameters
        """
        set_clause = ""
        set_params = {<!-- -->}
        for key, value in data.items():
            if isinstance(value, str):
                # Direct concatenation of strings
                set_clause + = f", {<!-- -->key} = %(set_{<!-- -->key})s"
                set_params[f"set_{<!-- -->key}"] = value
            elif isinstance(value, dict):
                # Concatenate subqueries
                for sub_key, sub_value in value.items():
                    set_clause + = f", {<!-- -->key} = ({<!-- -->sub_value})"
                    set_params.update({<!-- -->sub_key: sub_value})
            elif isinstance(value, list):
                # Currently does not support update of list type
                raise ValueError(f"Unsupported data type for SET clause: {<!-- -->key}={<!-- -->value}")
            else:
                # Direct splicing of numeric types
                set_clause + = f", {<!-- -->key} = {<!-- -->value}"
        if set_clause:
            set_clause = "SET " + set_clause[2:]
        return set_clause, set_params

    def execute(self, sql, params=None):
        """
        Execute SQL statement

        Args:
            sql (str): SQL statement
            params (dict): SQL statement parameters

        Returns:
            tuple: query result and number of affected rows
        """
        cursor = self.conn.cursor()
        try:
            cursor. execute(sql, params)
            self.conn.commit()
            return cursor.fetchall(), cursor.rowcount
        except Exception as e:
            self.logger.error(f"Failed to execute SQL query: {<!-- -->sql}, params={<!-- -->params}, error={<!-- -->str (e)}")
            raise ValueError(str(e)) from e
        finally:
            cursor. close()


# Update invoice related fields
class InvoiceUpdater(BaseService):
    """
    Update invoice related fields
    """

    def __init__(self, order_sn=None, name=None, units=None, class_name=None, tax_rate=None, **kwargs):
        """
        Initialize the order number and field information to be updated

        Args:
            order_sn (str): order number
            name (str): invoice name
            units (str): units
            class_name (str): tax class name
            tax_rate (float): tax rate
        """
        super().__init__(db_config=DB_CONFIG, logger_name="InvoiceUpdater")
        self. order_sn = order_sn
        self.name = name
        self.units = units
        self. class_name = class_name
        self.tax_rate = tax_rate

    def execute(self):
        """
        Update invoice related fields

        Returns:
            int: number of rows updated
        """
        try:
            where = {<!-- -->"a.order_sn": self.order_sn}

            if self.class_name and self.tax_rate:
                where[
                    "SELECT 1 FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s"] = None
            elif self. class_name:
                where["d.class_name"] = self.class_name
            elif self.tax_rate:
                where["SELECT 1 FROM tax_categories WHERE class_name = b.class AND tax_rate = %(tax_rate)s"] = None

            data = {<!-- -->}
            if self.name:
                data["name"] = self.name
                data["invoice_name"] = self.name
            if self.units:
                data["units"] = self.units
                data["unit"] = self.units
            if self.class_name and self.tax_rate:
                data["class"] = self. class_name
                data["class_code"] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1"
                data["tax_rate"] = "SELECT tax_rate FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1"
                data["class_name2"] = self. class_name
                data["class_code2"] = "SELECT class_code FROM tax_categories WHERE class_name = %(class_name)s AND tax_rate = %(tax_rate)s LIMIT 1"

            where_clause, where_params = self._build_where(where)
            set_clause, set_params = self._build_set(data)
            params = {<!-- -->**where_params, **set_params}

            if not set_clause:
                # If there is no SET to update the field, return directly
                return 0

            sql = f"""
            UPDATE {<!-- -->TABLES['invoice']} AS a
            JOIN {<!-- -->TABLES['invoice_info']} AS b ON a.{<!-- -->COLUMNS['invoice_id']} = b.{<!-- -- >COLUMNS['invoice_id']}
            JOIN (
                SELECT *, COUNT(*) AS cnt FROM {<!-- -->TABLES['order_goods']} WHERE {<!-- -->COLUMNS['rec_id']} IS NOT NULL GROUP BY {<!-- -->COLUMNS['goods_id']} HAVING cnt = 1
            ) AS c ON b.{<!-- -->COLUMNS['goods_id']} = c.{<!-- -->COLUMNS['goods_id']}
            JOIN {<!-- -->TABLES['goods']} AS d ON c.{<!-- -->COLUMNS['rec_id']} = d.{<!-- -- >COLUMNS['rec_id']}
            {<!-- -->where_clause}
            {<!-- -->set_clause}
            """

            result, count = self. execute(sql, params=params)
            self.logger.info(f"Updated {<!-- -->count} rows for ordersn={<!-- -->self.ordersn}")
            return count
        except Exception as e:
            self.logger.error(f"Failed to update invoice fields for ordersn={<!-- -->self.ordersn}: {<!-- -->str(e)}")
            raise ValueError(str(e)) from e


if __name__ == "__main__":
    updater = InvoiceUpdater(
        order_sn="123456",
        name="Test Invoice",
        units="unit",
        class_name="Office Supplies",
        tax_rate=0.16
    )
    updater. execute()