安装扩展类

pip3.7 install pymysql
基于pymysql的封装使用mysql
# -*- coding:utf-8 -*-
import re
import pymysql as mdb


class MysqldbHelper(object):
    """ 操作mysql数据库,基本方法 """

    def __init__(self, host: object = "127.0.0.1", username: object = "root", password: object = "root",
                 port: object = 3306, database: object = "test") -> object:
        self.debug = False
        self.host = host
        self.username = username
        self.password = password
        self.database = database
        self.port = port
        self.con = None
        self.cur = None
        try:
            self.con = mdb.connect(
                host=self.host,
                user=self.username,
                passwd=self.password,
                port=self.port,
                db=self.database
            )
            # 所有的查询,都在连接 con 的一个模块 cursor 上面运行的
            self.cur = self.con.cursor()
        except:
            raise Exception("DataBase connect error,please check the db config.")

    """ 关闭数据库连接 """
    def close(self):
        if not self.con:
            self.con.close()
        else:
            raise Exception("DataBase doesn't connect,close connectiong error;please check the db config.")

    """ 获取数据库的版本号 """
    def getVersion(self):
        self.cur.execute("SELECT VERSION()")
        return self.getOneData()

    def getOneData(self):
        # 取得上个查询的结果,是单个结果
        data = self.cur.fetchone()
        return data

    """ 创建数据库表
        args:
            tablename  :表名字
            attrdict   :属性键值对,{'book_name':'varchar(200) NOT NULL'...}
            constraint :主外键约束,PRIMARY KEY(`id`)
            :param tablename: 
            :param attrdict: 
            :param constraint: 
            :return: 
    """
    def creatTable(self, tablename, attrdict, constraint):
        # 表是否存在
        if self.isExistTable(tablename) == True:
            return True
        sql = ''
        sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,'
        for attr, value in attrdict.items():
            sql_mid = sql_mid + '`' + attr + '`' + ' ' + value + ','
        sql = sql + 'CREATE TABLE IF NOT EXISTS %s (' % tablename
        sql = sql + sql_mid
        sql = sql + constraint
        sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
        self.deBug('creatTable:' + sql)
        self.execute(sql)

    """ 创建数据库表
        args:
            tablename  :表名字
            key        :属性键
            value      :属性值
    """
    def insert(self, tablename, params):
        keys = []  # 字段名
        placeholders = []  # 字段值的占位符
        args = []  # 字段值

        for key, value in params.items():
            keys.append(key)
            args.append(value)
            placeholders.append('%s')

        # 拼接sql语句
        sql = 'insert into %s' % tablename + '(`' + '`,`'.join(keys) + '`)' + ' values(' + ','.join(placeholders) + ')'

        self.deBug(' sql insert : ' + sql + str(args))
        insertId = self.execute(sql=sql, args=args)
        return insertId

    """ 插入多条数据
        args:
            tablename  :表名字
            attrs        :属性键
            values      :属性值

        example:
            table='test_mysqldb'
            key = ["id" ,"name", "age"]
            value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
            mydb.insertMany(table, key, value)
    """
    def insertMany(self, table, attrs, values):

        values_sql = ['%s' for v in attrs]
        attrs_sql = '(' + ','.join(attrs) + ')'
        values_sql = ' values(' + ','.join(values_sql) + ')'
        sql = 'insert into %s' % table
        sql = sql + attrs_sql + values_sql
        self.deBug('insertMany:' + sql)
        try:
            self.deBug(sql)
            for i in range(0, len(values), 20000):
                self.cur.executemany(sql, values[i:i + 20000])
        except mdb.Error as e:
            self.con.rollback()
            error = "insertMany executemany failed! ERROR (%s): %s" % (e.args[0], e.args[1])
            self.deBug(error)

    """ 查询数据
        args:
            tablename  :表名字
            where  :查询条件
            order      :排序条件

        example:
            print mydb.select(table)
            print mydb.select(table, fields=["name"])
            print mydb.select(table, fields=["name", "age"])
            print mydb.select(table, fields=["age", "name"])
    """
    def findOne(self, tablename, where='', order='', fields='*'):
        consql = ' '
        if where != '':
            for k, v in where.items():
                consql = consql + k + '="' + str(v) + '" and'
        consql = consql + ' 1=1 '
        if fields == "*":
            sql = 'select * from %s where ' % tablename
        else:
            if isinstance(fields, list):
                fields = ",".join(fields)
                sql = 'select %s from %s where ' % (fields, tablename)
            else:
                raise Exception("fields input error, please input list fields.")
        sql = sql + consql + order
        self.deBug('select:' + sql)
        return self.executeSql(sql, findOne=True)

    def findAll(self, tablename, where='', order='', fields='*'):
        consql = ' '
        if where != '':
            for k, v in where.items():
                consql = consql + k + '="' + str(v) + '" and'
        consql = consql + ' 1=1 '
        if(isinstance(fields,str)):
            sql = 'select * from %s where ' % tablename
        elif isinstance(fields, list):
            fields = ",".join(fields)
            sql = 'select %s from %s where ' % (fields, tablename)
        else:
            raise Exception("fields input error, please input list fields.")

        # 获取order
        order = order == '' and ' ' or ' order by ' + order

        sql = sql + consql + order
        self.deBug('select:' + sql)
        return self.executeSql(sql)

    """更新数据
        args:
            tablename  :表名字
            attrs_dict  :更新属性键值对字典
            where  :更新条件字典

        example:
            params = {"name" : "caixinglong", "age" : "38"}
            where = {"name" : "liuqiao", "age" : "18"}
            mydb.update(table, params, where)

    """
    def update(self, tablename, attrs_dict, where):
        # 拼接修改属性
        attrs_list = []
        for tmpkey, tmpvalue in attrs_dict.items():
            attrs_list.append("`" + tmpkey + "`" + "=" + "\'" + tmpvalue + "\'")
        attrs_sql = ",".join(attrs_list)

        # 拼接where语句
        consql = ' '
        if where != '':
            for k, v in where.items():
                if isinstance(v, (str, int)):
                    v = "\'" + str(v) + "\'"
                consql = consql + "`" + tablename + "`." + "`" + k + "`" + '=' + v + ' and '
        consql = consql + ' 1=1 '

        # 拼接sql
        sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql)
        self.deBug(sql)
        return self.execute(sql)

    def delete(self, tablename, where):
        """删除数据

            args:
                tablename  :表名字
                where  :删除条件字典

            example:
                params = {"name" : "caixinglong", "age" : "38"}
                mydb.delete(table, params)

        """
        consql = ' '
        if where != '':
            for k, v in where.items():
                if isinstance(v, str):
                    v = "\'" + v + "\'"
                consql = consql + tablename + "." + k + '=' + v + ' and '
        consql = consql + ' 1=1 '
        sql = "DELETE FROM %s where%s" % (tablename, consql)
        self.deBug(sql)
        try:
            result = {
                'rows': self.cur.execute(sql),
                'data': self.cur.fetchone(),
            }
            return result
        except mdb.Error as e:
            error = "MySQL execute failed! ERROR (%s): %s" % (e.args[0], e.args[1])
            self.deBug(error)

    def dropTable(self, tablename):
        """删除数据库表

            args:
                tablename  :表名字
        """
        sql = "DROP TABLE  %s" % tablename
        self.execute(sql)

    def deleteTable(self, tablename):
        """清空数据库表

            args:
                tablename  :表名字
        """
        sql = "DELETE FROM %s" % tablename
        self.execute(sql)

    """判断数据表是否存在
        args:
            tablename  :表名字

        Return:
            存在返回True,不存在返回False
    """

    def isExistTable(self, tablename):
        sql = "show table status like '%s'" % tablename
        result = self.execute(sql)
        if result:
            print('table : ' + tablename + ' 已存在')
            return True
        else:
            print('table ' + tablename + ' 不存在')
            return False

    """ 执行sql语句,针对读操作返回结果集
        args:
            sql  :sql语句
    """

    def executeSql(self, sql: object = '', findOne: object = False) -> object:
        try:
            result = {
                'rows': self.cur.execute(sql),
                'data': findOne and self.cur.fetchone() or self.cur.fetchall(),
            }
            return result
        except mdb.Error as e:
            error = "MySQL execute failed! ERROR (%s): %s" % (e.args[0], e.args[1])
            self.deBug(error)

    """ 执行数据库sql语句,针对更新,删除,事务等操作失败时回滚
    """

    def execute(self, sql='', args=[]):
        try:
            res = self.cur.execute(sql, args)
            insertId = self.con.insert_id()
            return insertId and insertId or res
        except mdb.Error as e:
            self.rollBack()
            error = 'MySQL execute failed! ERROR (%s): %s' % (e.args[0], e.args[1])
            self.deBug("error:" + error)
            return error

    # 事务提交
    def commit(self):
        return self.con.commit()

    # 事务回滚
    def rollBack(self):
        return self.con.rollback()

    # debug
    def deBug(self, message=''):
        if self.debug != False:
            print(message)

使用事例

if __name__ == "__main__":
    mydb = mysql.MysqldbHelper(host="127.0.0.1", username="root", password="root", database='test')

    # 获取mysql版本号
    #print( mydb.getVersion())

    # 开启debug模式
    # mydb.debug = True

    table='test_mysqldb'
    # 创建mysql表
    # attrs={'name':'varchar(200) DEFAULT NULL','age':'int(11) DEFAULT NULL'}
    # constraint='PRIMARY KEY(`id`)'
    # print( mydb.creatTable(table, attrs, constraint))

    # 添加一条数据
    # params = {"name" : "ceshi", "age" : "1"}
    # mydb.insert('test_mysqldb', params)

    # 查询数据
    # print( mydb.select(table))
    # print( mydb.select(table, fields=["name", "age"]))
    # print( mydb.select(table, fields=["age", "name"]))

    # 批量添加
    # key = ["id" ,"name", "age"]
    # value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
    # mydb.insertMany(table, key, value)

    # 删除
    # mydb.delete(table, params)
    # cond_dict = {"name" : "liuqiao", "age" : "18"}

    # 修改
    # mydb.update(table, params, cond_dict)
    # mydb.deleteTable(table)
    # mydb.dropTable(table)
    
    #不管使用什么操作一定要记得commit,否则数据库不会发生变化
    mydb.commit()
Last modification:December 6, 2019
如果觉得我的文章对你有用,请随意赞赏