pin_drop当前位置:知识文库 ❯ 图文

Python MySQL数据库操作教程 - PyMySQL使用指南

一、PyMySQL简介与安装

PyMySQL是一个纯Python编写的MySQL客户端库,兼容MySQL和MariaDB。与MySQLdb不同,PyMySQL不需要C编译器,安装更加简便,并且支持Python 3。它是Python连接MySQL数据库最常用的第三方库之一。

安装PyMySQL

代码示例

# 使用pip安装PyMySQL
pip install pymysql

# 指定版本安装
pip install pymysql==1.1.0
对比项 PyMySQL mysqlclient mysql-connector-python
安装方式 纯Python 需要C编译器 纯Python
性能 中等 最优 中等
兼容性 MySQL/MariaDB MySQL/MariaDB 仅MySQL
API兼容 MySQLdb兼容 MySQLdb原生 独立API

二、连接MySQL数据库

使用pymysql.connect()方法创建数据库连接。需要提供主机地址、端口、用户名、密码和数据库名等参数。

基本连接方式

代码示例

import pymysql

# 创建数据库连接
connection = pymysql.connect(
    host='localhost',          # 数据库主机地址
    port=3306,                 # MySQL端口号
    user='root',               # 数据库用户名
    password='your_password',  # 数据库密码
    database='test_db',        # 数据库名称
    charset='utf8mb4',         # 字符编码(支持emoji)
    cursorclass=pymysql.cursors.DictCursor  # 返回字典格式结果
)

print("数据库连接成功!")

# 关闭连接
connection.close()

使用上下文管理器

推荐使用上下文管理器来自动管理连接的打开和关闭,确保资源正确释放。

代码示例

import pymysql
from contextlib import contextmanager

@contextmanager
def get_mysql_connection():
    """MySQL数据库连接上下文管理器"""
    connection = pymysql.connect(
        host='localhost',
        port=3306,
        user='root',
        password='your_password',
        database='test_db',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor
    )
    try:
        yield connection
    finally:
        connection.close()

# 使用示例
with get_mysql_connection() as conn:
    with conn.cursor() as cursor:
        cursor.execute('SELECT VERSION()')
        version = cursor.fetchone()
        print(f"MySQL版本:{version['VERSION()']}")

三、执行SQL语句

通过游标对象(cursor)执行SQL语句。PyMySQL支持参数化查询,有效防止SQL注入攻击。

创建数据表

代码示例

CREATE TABLE IF NOT EXISTS employees (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    department VARCHAR(50),
    salary DECIMAL(10, 2),
    hire_date DATE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

代码示例

import pymysql

with get_mysql_connection() as conn:
    with conn.cursor() as cursor:
        # 创建员工表
        create_table_sql = '''
        CREATE TABLE IF NOT EXISTS employees (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            department VARCHAR(50),
            salary DECIMAL(10, 2),
            hire_date DATE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
        '''
        cursor.execute(create_table_sql)
        print("数据表创建成功!")

插入数据

代码示例

import pymysql

with get_mysql_connection() as conn:
    with conn.cursor() as cursor:
        # 插入单条记录
        insert_sql = '''
        INSERT INTO employees (name, email, department, salary, hire_date)
        VALUES (%s, %s, %s, %s, %s)
        '''
        cursor.execute(insert_sql, ('张三', 'zhangsan@company.com', '技术部', 15000.00, '2024-01-15'))
        
        # 插入多条记录
        employees = [
            ('李四', 'lisi@company.com', '技术部', 18000.00, '2023-06-20'),
            ('王五', 'wangwu@company.com', '市场部', 12000.00, '2024-03-01'),
            ('赵六', 'zhaoliu@company.com', '人事部', 11000.00, '2023-09-10'),
            ('孙七', 'sunqi@company.com', '技术部', 20000.00, '2022-11-05')
        ]
        cursor.executemany(insert_sql, employees)
        
        # 提交事务
        conn.commit()
        print(f"成功插入 {cursor.rowcount} 条记录,最后插入ID:{cursor.lastrowid}")

查询数据

代码示例

import pymysql

with get_mysql_connection() as conn:
    with conn.cursor() as cursor:
        # 查询所有员工
        cursor.execute('SELECT * FROM employees ORDER BY id')
        employees = cursor.fetchall()
        print("所有员工信息:")
        for emp in employees:
            print(f"  {emp['name']} | {emp['department']} | ¥{emp['salary']}")
        
        # 条件查询 - 查询技术部员工
        cursor.execute(
            'SELECT * FROM employees WHERE department = %s AND salary > %s',
            ('技术部', 16000)
        )
        tech_employees = cursor.fetchall()
        print(f"\n技术部薪资超过16000的员工(共{len(tech_employees)}人):")
        for emp in tech_employees:
            print(f"  {emp['name']} - ¥{emp['salary']}")
        
        # 聚合查询
        cursor.execute('''
            SELECT department, COUNT(*) as count, AVG(salary) as avg_salary
            FROM employees
            GROUP BY department
        ''')
        dept_stats = cursor.fetchall()
        print("\n各部门统计:")
        for dept in dept_stats:
            print(f"  {dept['department']}:{dept['count']}人,平均薪资¥{dept['avg_salary']:.2f}")

更新和删除数据

代码示例

import pymysql

with get_mysql_connection() as conn:
    with conn.cursor() as cursor:
        # 更新员工薪资
        cursor.execute(
            'UPDATE employees SET salary = salary * 1.1 WHERE department = %s',
            ('技术部',)
        )
        print(f"更新了 {cursor.rowcount} 条记录")
        
        # 删除指定员工
        cursor.execute(
            'DELETE FROM employees WHERE email = %s',
            ('zhaoliu@company.com',)
        )
        print(f"删除了 {cursor.rowcount} 条记录")
        
        conn.commit()

四、事务处理

PyMySQL默认关闭自动提交模式,所有操作都在事务中执行。调用conn.commit()提交事务,conn.rollback()回滚事务。

代码示例

import pymysql

def transfer_salary(from_emp_id, to_emp_id, amount):
    """薪资调整事务:从一个员工扣减,加到另一个员工"""
    with get_mysql_connection() as conn:
        with conn.cursor() as cursor:
            try:
                # 检查转出员工薪资是否足够
                cursor.execute(
                    'SELECT salary FROM employees WHERE id = %s',
                    (from_emp_id,)
                )
                from_emp = cursor.fetchone()
                if from_emp['salary'] < amount:
                    raise ValueError("转出员工薪资不足")
                
                # 执行转账操作
                cursor.execute(
                    'UPDATE employees SET salary = salary - %s WHERE id = %s',
                    (amount, from_emp_id)
                )
                cursor.execute(
                    'UPDATE employees SET salary = salary + %s WHERE id = %s',
                    (amount, to_emp_id)
                )
                
                # 记录操作日志
                cursor.execute(
                    'INSERT INTO salary_log (from_id, to_id, amount, log_time) VALUES (%s, %s, %s, NOW())',
                    (from_emp_id, to_emp_id, amount)
                )
                
                # 提交事务
                conn.commit()
                print(f"薪资调整成功:从ID {from_emp_id} 转移 ¥{amount} 到 ID {to_emp_id}")
                
            except Exception as e:
                # 任何错误都回滚
                conn.rollback()
                print(f"薪资调整失败,事务已回滚:{e}")

小贴士

可以开启自动提交模式:connection.autocommit(True)。开启后每条SQL语句自动提交,但失去事务控制能力。生产环境建议保持手动提交。


五、完整示例代码

以下是一个完整的MySQL员工管理系统示例。

代码示例

"""
MySQL员工管理系统 - PyMySQL完整示例
"""
import pymysql
from contextlib import contextmanager
from datetime import datetime

# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'your_password',
    'database': 'company_db',
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

@contextmanager
def get_db_connection():
    """数据库连接上下文管理器"""
    conn = pymysql.connect(**DB_CONFIG)
    try:
        yield conn
    finally:
        conn.close()

def init_db():
    """初始化数据库"""
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS employees (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(100) NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    department VARCHAR(50),
                    salary DECIMAL(10, 2),
                    hire_date DATE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            ''')
            conn.commit()
            print("数据库初始化完成!")

def add_employee(name, email, department, salary, hire_date=None):
    """添加员工"""
    if hire_date is None:
        hire_date = datetime.now().strftime('%Y-%m-%d')
    
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            try:
                cursor.execute(
                    'INSERT INTO employees (name, email, department, salary, hire_date) VALUES (%s, %s, %s, %s, %s)',
                    (name, email, department, salary, hire_date)
                )
                conn.commit()
                print(f"员工 {name} 添加成功!")
            except pymysql.err.IntegrityError:
                print(f"邮箱 {email} 已存在,添加失败!")

def get_employees(department=None, min_salary=None):
    """查询员工列表"""
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            sql = 'SELECT * FROM employees WHERE 1=1'
            params = []
            
            if department:
                sql += ' AND department = %s'
                params.append(department)
            if min_salary:
                sql += ' AND salary >= %s'
                params.append(min_salary)
            
            sql += ' ORDER BY salary DESC'
            cursor.execute(sql, params)
            return cursor.fetchall()

def update_salary(emp_id, new_salary):
    """更新员工薪资"""
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(
                'UPDATE employees SET salary = %s WHERE id = %s',
                (new_salary, emp_id)
            )
            conn.commit()
            print(f"员工ID {emp_id} 薪资已更新为 ¥{new_salary}")

def delete_employee(emp_id):
    """删除员工"""
    with get_db_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute('DELETE FROM employees WHERE id = %s', (emp_id,))
            conn.commit()
            print(f"员工ID {emp_id} 已删除")

def print_employees(employees):
    """打印员工列表"""
    print(f"\n共 {len(employees)} 名员工:")
    print("-" * 70)
    for emp in employees:
        print(f"  ID:{emp['id']} | {emp['name']} | {emp['department']} | ¥{emp['salary']}")
    print("-" * 70)

if __name__ == '__main__':
    # 初始化
    init_db()
    
    # 添加员工
    add_employee('张三', 'zhangsan@company.com', '技术部', 15000.00, '2024-01-15')
    add_employee('李四', 'lisi@company.com', '技术部', 18000.00, '2023-06-20')
    add_employee('王五', 'wangwu@company.com', '市场部', 12000.00, '2024-03-01')
    
    # 查询所有员工
    all_emps = get_employees()
    print_employees(all_emps)
    
    # 查询技术部员工
    tech_emps = get_employees(department='技术部')
    print("\n技术部员工:")
    print_employees(tech_emps)
    
    # 查询高薪员工
    high_salary = get_employees(min_salary=15000)
    print("\n薪资15000以上的员工:")
    print_employees(high_salary)

六、注意事项

⚠️ 注意1:使用%s作为占位符

PyMySQL使用%s作为参数占位符(注意不是?),不要使用Python字符串格式化拼接SQL,以防SQL注入。

⚠️ 注意2:字符编码设置

连接时务必指定charset='utf8mb4',这是MySQL中真正的UTF-8编码,支持emoji和所有Unicode字符。不要使用utf8(MySQL中是utf8mb3的别名)。

⚠️ 注意3:密码安全

不要将数据库密码硬编码在代码中。建议使用环境变量、配置文件或密钥管理服务存储数据库凭据。

⚠️ 注意4:游标类型选择

默认游标返回元组,使用cursorclass=pymysql.cursors.DictCursor可以返回字典,通过列名访问更方便。SSCursor适用于大数据量查询,避免一次性加载所有结果到内存。

常见问题

PyMySQL的占位符为什么是%s而不是?

PyMySQL兼容MySQLdb的API规范,MySQLdb使用%s作为占位符。这是为了与Python的字符串格式化符号一致。注意这里的%s不是字符串格式化,而是参数化查询占位符,PyMySQL会自动处理类型转换和转义。

如何处理大量数据查询避免内存溢出?

使用SSCursor(Server Side Cursor):cursor = conn.cursor(pymysql.cursors.SSCursor)。SSCursor逐行从服务器读取数据,而不是一次性加载全部结果到内存。但注意使用SSCursor时连接不能被其他操作使用。

PyMySQL如何获取自增ID?

插入数据后通过cursor.lastrowid获取最后插入的自增ID。注意在executemany批量插入时,lastrowid只返回第一条插入记录的ID。

PyMySQL与异步框架如何配合使用?

PyMySQL是同步库,不支持asyncio。在异步框架(如FastAPI、aiohttp)中,建议使用aiomysqlasyncmy等异步MySQL库。也可以使用线程池包装PyMySQL调用。


七、练习题

练习1:博客文章管理系统

编写一个程序,使用PyMySQL创建博客文章管理系统。要求:创建articles表(包含id、标题、内容、作者、分类、发布时间),实现文章的发布、编辑、删除、按分类查询、关键词搜索等功能,并统计各分类的文章数量。

练习2:订单处理系统

编写一个函数,实现基于PyMySQL的订单处理系统。要求:包含用户表、商品表、订单表三个关联表,实现下单功能(使用事务确保库存扣减和订单创建的一致性)、订单查询、销售统计等功能。


小结

  • PyMySQL:纯Python实现的MySQL客户端库,pip install pymysql即可安装

  • 参数化查询:使用%s占位符,不要拼接SQL字符串,防止SQL注入攻击

  • 事务管理:PyMySQL默认关闭自动提交,通过conn.commit()提交,conn.rollback()回滚

  • 字符编码:始终使用utf8mb4编码,确保完整支持Unicode字符

标签: Python MySQL PyMySQL 数据库 事务 SQL

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python SQLite数据库操作教程 - sqlite3模块使用指南 下一篇: Python数据库连接池教程 - DBUtils连接池配置与性能优化

poll相关推荐