安装扩展类
pip3.7 install pymysql
基于pymysql的封装使用mysql
# -*- coding:utf-8 -*-
import re
import pymysql as mdb
class MysqldbHelper(object):
""" 操作mysql数据库,基本方法 """
def __init__(self, host: object = "127.0.0.1", username: object = "root", password: object = "root",
port: object = 3306, database: object = "test") -> object:
self.debug = False
self.host = host
self.username = username
self.password = password
self.database = database
self.port = port
self.con = None
self.cur = None
try:
self.con = mdb.connect(
host=self.host,
user=self.username,
passwd=self.password,
port=self.port,
db=self.database
)
# 所有的查询,都在连接 con 的一个模块 cursor 上面运行的
self.cur = self.con.cursor()
except:
raise Exception("DataBase connect error,please check the db config.")
""" 关闭数据库连接 """
def close(self):
if not self.con:
self.con.close()
else:
raise Exception("DataBase doesn't connect,close connectiong error;please check the db config.")
""" 获取数据库的版本号 """
def getVersion(self):
self.cur.execute("SELECT VERSION()")
return self.getOneData()
def getOneData(self):
# 取得上个查询的结果,是单个结果
data = self.cur.fetchone()
return data
""" 创建数据库表
args:
tablename :表名字
attrdict :属性键值对,{'book_name':'varchar(200) NOT NULL'...}
constraint :主外键约束,PRIMARY KEY(`id`)
:param tablename:
:param attrdict:
:param constraint:
:return:
"""
def creatTable(self, tablename, attrdict, constraint):
# 表是否存在
if self.isExistTable(tablename) == True:
return True
sql = ''
sql_mid = '`id` bigint(11) NOT NULL AUTO_INCREMENT,'
for attr, value in attrdict.items():
sql_mid = sql_mid + '`' + attr + '`' + ' ' + value + ','
sql = sql + 'CREATE TABLE IF NOT EXISTS %s (' % tablename
sql = sql + sql_mid
sql = sql + constraint
sql = sql + ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
self.deBug('creatTable:' + sql)
self.execute(sql)
""" 创建数据库表
args:
tablename :表名字
key :属性键
value :属性值
"""
def insert(self, tablename, params):
keys = [] # 字段名
placeholders = [] # 字段值的占位符
args = [] # 字段值
for key, value in params.items():
keys.append(key)
args.append(value)
placeholders.append('%s')
# 拼接sql语句
sql = 'insert into %s' % tablename + '(`' + '`,`'.join(keys) + '`)' + ' values(' + ','.join(placeholders) + ')'
self.deBug(' sql insert : ' + sql + str(args))
insertId = self.execute(sql=sql, args=args)
return insertId
""" 插入多条数据
args:
tablename :表名字
attrs :属性键
values :属性值
example:
table='test_mysqldb'
key = ["id" ,"name", "age"]
value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
mydb.insertMany(table, key, value)
"""
def insertMany(self, table, attrs, values):
values_sql = ['%s' for v in attrs]
attrs_sql = '(' + ','.join(attrs) + ')'
values_sql = ' values(' + ','.join(values_sql) + ')'
sql = 'insert into %s' % table
sql = sql + attrs_sql + values_sql
self.deBug('insertMany:' + sql)
try:
self.deBug(sql)
for i in range(0, len(values), 20000):
self.cur.executemany(sql, values[i:i + 20000])
except mdb.Error as e:
self.con.rollback()
error = "insertMany executemany failed! ERROR (%s): %s" % (e.args[0], e.args[1])
self.deBug(error)
""" 查询数据
args:
tablename :表名字
where :查询条件
order :排序条件
example:
print mydb.select(table)
print mydb.select(table, fields=["name"])
print mydb.select(table, fields=["name", "age"])
print mydb.select(table, fields=["age", "name"])
"""
def findOne(self, tablename, where='', order='', fields='*'):
consql = ' '
if where != '':
for k, v in where.items():
consql = consql + k + '="' + str(v) + '" and'
consql = consql + ' 1=1 '
if fields == "*":
sql = 'select * from %s where ' % tablename
else:
if isinstance(fields, list):
fields = ",".join(fields)
sql = 'select %s from %s where ' % (fields, tablename)
else:
raise Exception("fields input error, please input list fields.")
sql = sql + consql + order
self.deBug('select:' + sql)
return self.executeSql(sql, findOne=True)
def findAll(self, tablename, where='', order='', fields='*'):
consql = ' '
if where != '':
for k, v in where.items():
consql = consql + k + '="' + str(v) + '" and'
consql = consql + ' 1=1 '
if(isinstance(fields,str)):
sql = 'select * from %s where ' % tablename
elif isinstance(fields, list):
fields = ",".join(fields)
sql = 'select %s from %s where ' % (fields, tablename)
else:
raise Exception("fields input error, please input list fields.")
# 获取order
order = order == '' and ' ' or ' order by ' + order
sql = sql + consql + order
self.deBug('select:' + sql)
return self.executeSql(sql)
"""更新数据
args:
tablename :表名字
attrs_dict :更新属性键值对字典
where :更新条件字典
example:
params = {"name" : "caixinglong", "age" : "38"}
where = {"name" : "liuqiao", "age" : "18"}
mydb.update(table, params, where)
"""
def update(self, tablename, attrs_dict, where):
# 拼接修改属性
attrs_list = []
for tmpkey, tmpvalue in attrs_dict.items():
attrs_list.append("`" + tmpkey + "`" + "=" + "\'" + tmpvalue + "\'")
attrs_sql = ",".join(attrs_list)
# 拼接where语句
consql = ' '
if where != '':
for k, v in where.items():
if isinstance(v, (str, int)):
v = "\'" + str(v) + "\'"
consql = consql + "`" + tablename + "`." + "`" + k + "`" + '=' + v + ' and '
consql = consql + ' 1=1 '
# 拼接sql
sql = "UPDATE %s SET %s where%s" % (tablename, attrs_sql, consql)
self.deBug(sql)
return self.execute(sql)
def delete(self, tablename, where):
"""删除数据
args:
tablename :表名字
where :删除条件字典
example:
params = {"name" : "caixinglong", "age" : "38"}
mydb.delete(table, params)
"""
consql = ' '
if where != '':
for k, v in where.items():
if isinstance(v, str):
v = "\'" + v + "\'"
consql = consql + tablename + "." + k + '=' + v + ' and '
consql = consql + ' 1=1 '
sql = "DELETE FROM %s where%s" % (tablename, consql)
self.deBug(sql)
try:
result = {
'rows': self.cur.execute(sql),
'data': self.cur.fetchone(),
}
return result
except mdb.Error as e:
error = "MySQL execute failed! ERROR (%s): %s" % (e.args[0], e.args[1])
self.deBug(error)
def dropTable(self, tablename):
"""删除数据库表
args:
tablename :表名字
"""
sql = "DROP TABLE %s" % tablename
self.execute(sql)
def deleteTable(self, tablename):
"""清空数据库表
args:
tablename :表名字
"""
sql = "DELETE FROM %s" % tablename
self.execute(sql)
"""判断数据表是否存在
args:
tablename :表名字
Return:
存在返回True,不存在返回False
"""
def isExistTable(self, tablename):
sql = "show table status like '%s'" % tablename
result = self.execute(sql)
if result:
print('table : ' + tablename + ' 已存在')
return True
else:
print('table ' + tablename + ' 不存在')
return False
""" 执行sql语句,针对读操作返回结果集
args:
sql :sql语句
"""
def executeSql(self, sql: object = '', findOne: object = False) -> object:
try:
result = {
'rows': self.cur.execute(sql),
'data': findOne and self.cur.fetchone() or self.cur.fetchall(),
}
return result
except mdb.Error as e:
error = "MySQL execute failed! ERROR (%s): %s" % (e.args[0], e.args[1])
self.deBug(error)
""" 执行数据库sql语句,针对更新,删除,事务等操作失败时回滚
"""
def execute(self, sql='', args=[]):
try:
res = self.cur.execute(sql, args)
insertId = self.con.insert_id()
return insertId and insertId or res
except mdb.Error as e:
self.rollBack()
error = 'MySQL execute failed! ERROR (%s): %s' % (e.args[0], e.args[1])
self.deBug("error:" + error)
return error
# 事务提交
def commit(self):
return self.con.commit()
# 事务回滚
def rollBack(self):
return self.con.rollback()
# debug
def deBug(self, message=''):
if self.debug != False:
print(message)
使用事例
if __name__ == "__main__":
mydb = mysql.MysqldbHelper(host="127.0.0.1", username="root", password="root", database='test')
# 获取mysql版本号
#print( mydb.getVersion())
# 开启debug模式
# mydb.debug = True
table='test_mysqldb'
# 创建mysql表
# attrs={'name':'varchar(200) DEFAULT NULL','age':'int(11) DEFAULT NULL'}
# constraint='PRIMARY KEY(`id`)'
# print( mydb.creatTable(table, attrs, constraint))
# 添加一条数据
# params = {"name" : "ceshi", "age" : "1"}
# mydb.insert('test_mysqldb', params)
# 查询数据
# print( mydb.select(table))
# print( mydb.select(table, fields=["name", "age"]))
# print( mydb.select(table, fields=["age", "name"]))
# 批量添加
# key = ["id" ,"name", "age"]
# value = [[101, "liuqiao", "25"], [102,"liuqiao1", "26"], [103 ,"liuqiao2", "27"], [104 ,"liuqiao3", "28"]]
# mydb.insertMany(table, key, value)
# 删除
# mydb.delete(table, params)
# cond_dict = {"name" : "liuqiao", "age" : "18"}
# 修改
# mydb.update(table, params, cond_dict)
# mydb.deleteTable(table)
# mydb.dropTable(table)
#不管使用什么操作一定要记得commit,否则数据库不会发生变化
mydb.commit()