pin_drop当前位置:知识文库 ❯ 图文
Python MySQL数据库操作教程 - PyMySQL使用指南
一、PyMySQL简介与安装
PyMySQL是一个纯Python编写的MySQL客户端库,兼容MySQL和MariaDB。与MySQLdb不同,PyMySQL不需要C编译器,安装更加简便,并且支持Python 3。它是Python连接MySQL数据库最常用的第三方库之一。
安装PyMySQL
代码示例
# 使用pip安装PyMySQL
pip install pymysql
# 指定版本安装
pip install pymysql==1.1.0二、连接MySQL数据库
使用pymysql.connect()方法创建数据库连接。需要提供主机地址、端口、用户名、密码和数据库名等参数。
基本连接方式
代码示例
import pymysql
# 创建数据库连接
connection = pymysql.connect(
host='localhost', # 数据库主机地址
port=3306, # MySQL端口号
user='root', # 数据库用户名
password='your_password', # 数据库密码
database='test_db', # 数据库名称
charset='utf8mb4', # 字符编码(支持emoji)
cursorclass=pymysql.cursors.DictCursor # 返回字典格式结果
)
print("数据库连接成功!")
# 关闭连接
connection.close()使用上下文管理器
推荐使用上下文管理器来自动管理连接的打开和关闭,确保资源正确释放。
代码示例
import pymysql
from contextlib import contextmanager
@contextmanager
def get_mysql_connection():
"""MySQL数据库连接上下文管理器"""
connection = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='your_password',
database='test_db',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
yield connection
finally:
connection.close()
# 使用示例
with get_mysql_connection() as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT VERSION()')
version = cursor.fetchone()
print(f"MySQL版本:{version['VERSION()']}")三、执行SQL语句
通过游标对象(cursor)执行SQL语句。PyMySQL支持参数化查询,有效防止SQL注入攻击。
创建数据表
代码示例
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;代码示例
import pymysql
with get_mysql_connection() as conn:
with conn.cursor() as cursor:
# 创建员工表
create_table_sql = '''
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
'''
cursor.execute(create_table_sql)
print("数据表创建成功!")插入数据
代码示例
import pymysql
with get_mysql_connection() as conn:
with conn.cursor() as cursor:
# 插入单条记录
insert_sql = '''
INSERT INTO employees (name, email, department, salary, hire_date)
VALUES (%s, %s, %s, %s, %s)
'''
cursor.execute(insert_sql, ('张三', 'zhangsan@company.com', '技术部', 15000.00, '2024-01-15'))
# 插入多条记录
employees = [
('李四', 'lisi@company.com', '技术部', 18000.00, '2023-06-20'),
('王五', 'wangwu@company.com', '市场部', 12000.00, '2024-03-01'),
('赵六', 'zhaoliu@company.com', '人事部', 11000.00, '2023-09-10'),
('孙七', 'sunqi@company.com', '技术部', 20000.00, '2022-11-05')
]
cursor.executemany(insert_sql, employees)
# 提交事务
conn.commit()
print(f"成功插入 {cursor.rowcount} 条记录,最后插入ID:{cursor.lastrowid}")查询数据
代码示例
import pymysql
with get_mysql_connection() as conn:
with conn.cursor() as cursor:
# 查询所有员工
cursor.execute('SELECT * FROM employees ORDER BY id')
employees = cursor.fetchall()
print("所有员工信息:")
for emp in employees:
print(f" {emp['name']} | {emp['department']} | ¥{emp['salary']}")
# 条件查询 - 查询技术部员工
cursor.execute(
'SELECT * FROM employees WHERE department = %s AND salary > %s',
('技术部', 16000)
)
tech_employees = cursor.fetchall()
print(f"\n技术部薪资超过16000的员工(共{len(tech_employees)}人):")
for emp in tech_employees:
print(f" {emp['name']} - ¥{emp['salary']}")
# 聚合查询
cursor.execute('''
SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
FROM employees
GROUP BY department
''')
dept_stats = cursor.fetchall()
print("\n各部门统计:")
for dept in dept_stats:
print(f" {dept['department']}:{dept['count']}人,平均薪资¥{dept['avg_salary']:.2f}")更新和删除数据
代码示例
import pymysql
with get_mysql_connection() as conn:
with conn.cursor() as cursor:
# 更新员工薪资
cursor.execute(
'UPDATE employees SET salary = salary * 1.1 WHERE department = %s',
('技术部',)
)
print(f"更新了 {cursor.rowcount} 条记录")
# 删除指定员工
cursor.execute(
'DELETE FROM employees WHERE email = %s',
('zhaoliu@company.com',)
)
print(f"删除了 {cursor.rowcount} 条记录")
conn.commit()四、事务处理
PyMySQL默认关闭自动提交模式,所有操作都在事务中执行。调用conn.commit()提交事务,conn.rollback()回滚事务。
代码示例
import pymysql
def transfer_salary(from_emp_id, to_emp_id, amount):
"""薪资调整事务:从一个员工扣减,加到另一个员工"""
with get_mysql_connection() as conn:
with conn.cursor() as cursor:
try:
# 检查转出员工薪资是否足够
cursor.execute(
'SELECT salary FROM employees WHERE id = %s',
(from_emp_id,)
)
from_emp = cursor.fetchone()
if from_emp['salary'] < amount:
raise ValueError("转出员工薪资不足")
# 执行转账操作
cursor.execute(
'UPDATE employees SET salary = salary - %s WHERE id = %s',
(amount, from_emp_id)
)
cursor.execute(
'UPDATE employees SET salary = salary + %s WHERE id = %s',
(amount, to_emp_id)
)
# 记录操作日志
cursor.execute(
'INSERT INTO salary_log (from_id, to_id, amount, log_time) VALUES (%s, %s, %s, NOW())',
(from_emp_id, to_emp_id, amount)
)
# 提交事务
conn.commit()
print(f"薪资调整成功:从ID {from_emp_id} 转移 ¥{amount} 到 ID {to_emp_id}")
except Exception as e:
# 任何错误都回滚
conn.rollback()
print(f"薪资调整失败,事务已回滚:{e}")小贴士
可以开启自动提交模式:connection.autocommit(True)。开启后每条SQL语句自动提交,但失去事务控制能力。生产环境建议保持手动提交。
五、完整示例代码
以下是一个完整的MySQL员工管理系统示例。
代码示例
"""
MySQL员工管理系统 - PyMySQL完整示例
"""
import pymysql
from contextlib import contextmanager
from datetime import datetime
# 数据库配置
DB_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'your_password',
'database': 'company_db',
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
@contextmanager
def get_db_connection():
"""数据库连接上下文管理器"""
conn = pymysql.connect(**DB_CONFIG)
try:
yield conn
finally:
conn.close()
def init_db():
"""初始化数据库"""
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
department VARCHAR(50),
salary DECIMAL(10, 2),
hire_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
''')
conn.commit()
print("数据库初始化完成!")
def add_employee(name, email, department, salary, hire_date=None):
"""添加员工"""
if hire_date is None:
hire_date = datetime.now().strftime('%Y-%m-%d')
with get_db_connection() as conn:
with conn.cursor() as cursor:
try:
cursor.execute(
'INSERT INTO employees (name, email, department, salary, hire_date) VALUES (%s, %s, %s, %s, %s)',
(name, email, department, salary, hire_date)
)
conn.commit()
print(f"员工 {name} 添加成功!")
except pymysql.err.IntegrityError:
print(f"邮箱 {email} 已存在,添加失败!")
def get_employees(department=None, min_salary=None):
"""查询员工列表"""
with get_db_connection() as conn:
with conn.cursor() as cursor:
sql = 'SELECT * FROM employees WHERE 1=1'
params = []
if department:
sql += ' AND department = %s'
params.append(department)
if min_salary:
sql += ' AND salary >= %s'
params.append(min_salary)
sql += ' ORDER BY salary DESC'
cursor.execute(sql, params)
return cursor.fetchall()
def update_salary(emp_id, new_salary):
"""更新员工薪资"""
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
'UPDATE employees SET salary = %s WHERE id = %s',
(new_salary, emp_id)
)
conn.commit()
print(f"员工ID {emp_id} 薪资已更新为 ¥{new_salary}")
def delete_employee(emp_id):
"""删除员工"""
with get_db_connection() as conn:
with conn.cursor() as cursor:
cursor.execute('DELETE FROM employees WHERE id = %s', (emp_id,))
conn.commit()
print(f"员工ID {emp_id} 已删除")
def print_employees(employees):
"""打印员工列表"""
print(f"\n共 {len(employees)} 名员工:")
print("-" * 70)
for emp in employees:
print(f" ID:{emp['id']} | {emp['name']} | {emp['department']} | ¥{emp['salary']}")
print("-" * 70)
if __name__ == '__main__':
# 初始化
init_db()
# 添加员工
add_employee('张三', 'zhangsan@company.com', '技术部', 15000.00, '2024-01-15')
add_employee('李四', 'lisi@company.com', '技术部', 18000.00, '2023-06-20')
add_employee('王五', 'wangwu@company.com', '市场部', 12000.00, '2024-03-01')
# 查询所有员工
all_emps = get_employees()
print_employees(all_emps)
# 查询技术部员工
tech_emps = get_employees(department='技术部')
print("\n技术部员工:")
print_employees(tech_emps)
# 查询高薪员工
high_salary = get_employees(min_salary=15000)
print("\n薪资15000以上的员工:")
print_employees(high_salary)六、注意事项
⚠️ 注意1:使用%s作为占位符
PyMySQL使用
%s作为参数占位符(注意不是?),不要使用Python字符串格式化拼接SQL,以防SQL注入。
⚠️ 注意2:字符编码设置
连接时务必指定
charset='utf8mb4',这是MySQL中真正的UTF-8编码,支持emoji和所有Unicode字符。不要使用utf8(MySQL中是utf8mb3的别名)。
⚠️ 注意3:密码安全
不要将数据库密码硬编码在代码中。建议使用环境变量、配置文件或密钥管理服务存储数据库凭据。
⚠️ 注意4:游标类型选择
默认游标返回元组,使用
cursorclass=pymysql.cursors.DictCursor可以返回字典,通过列名访问更方便。SSCursor适用于大数据量查询,避免一次性加载所有结果到内存。
常见问题
PyMySQL的占位符为什么是%s而不是?
PyMySQL兼容MySQLdb的API规范,MySQLdb使用%s作为占位符。这是为了与Python的字符串格式化符号一致。注意这里的%s不是字符串格式化,而是参数化查询占位符,PyMySQL会自动处理类型转换和转义。
如何处理大量数据查询避免内存溢出?
使用SSCursor(Server Side Cursor):cursor = conn.cursor(pymysql.cursors.SSCursor)。SSCursor逐行从服务器读取数据,而不是一次性加载全部结果到内存。但注意使用SSCursor时连接不能被其他操作使用。
PyMySQL如何获取自增ID?
插入数据后通过cursor.lastrowid获取最后插入的自增ID。注意在executemany批量插入时,lastrowid只返回第一条插入记录的ID。
PyMySQL与异步框架如何配合使用?
PyMySQL是同步库,不支持asyncio。在异步框架(如FastAPI、aiohttp)中,建议使用aiomysql或asyncmy等异步MySQL库。也可以使用线程池包装PyMySQL调用。
七、练习题
练习1:博客文章管理系统
编写一个程序,使用PyMySQL创建博客文章管理系统。要求:创建articles表(包含id、标题、内容、作者、分类、发布时间),实现文章的发布、编辑、删除、按分类查询、关键词搜索等功能,并统计各分类的文章数量。
练习2:订单处理系统
编写一个函数,实现基于PyMySQL的订单处理系统。要求:包含用户表、商品表、订单表三个关联表,实现下单功能(使用事务确保库存扣减和订单创建的一致性)、订单查询、销售统计等功能。
小结
-
PyMySQL:纯Python实现的MySQL客户端库,pip install pymysql即可安装
-
参数化查询:使用%s占位符,不要拼接SQL字符串,防止SQL注入攻击
-
事务管理:PyMySQL默认关闭自动提交,通过conn.commit()提交,conn.rollback()回滚
-
字符编码:始终使用utf8mb4编码,确保完整支持Unicode字符
本文涉及AI创作
内容由AI创作,请仔细甄别