pin_drop当前位置:知识文库 ❯ 图文

Python SQLAlchemy ORM教程 - 模型定义会话管理查询操作

一、SQLAlchemy简介

SQLAlchemy是Python中最流行的ORM框架,提供了强大的对象关系映射能力和灵活的SQL表达式语言。它支持多种数据库后端(SQLite、MySQL、PostgreSQL、Oracle等),并且内置了连接池功能。

安装SQLAlchemy

代码示例

# 安装SQLAlchemy 2.x
pip install sqlalchemy

# 安装MySQL驱动支持
pip install pymysql

# 安装PostgreSQL驱动支持
pip install psycopg2-binary

数据库连接URL格式

SQLAlchemy使用统一的URL格式连接不同数据库:

代码示例

# SQLite
engine = create_engine('sqlite:///example.db')

# MySQL(使用PyMySQL驱动)
engine = create_engine('mysql+pymysql://user:password@localhost:3306/dbname')

# PostgreSQL
engine = create_engine('postgresql+psycopg2://user:password@localhost:5432/dbname')

# 内存SQLite(用于测试)
engine = create_engine('sqlite:///:memory:')

二、模型定义

在SQLAlchemy中,模型通过继承declarative_base()来定义。每个模型类对应数据库中的一张表。

代码示例

from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import relationship, declarative_base
from datetime import datetime

Base = declarative_base()

class User(Base):
    """用户模型"""
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), unique=True, nullable=False, index=True)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # 一对多关系:用户可以有多个文章
    articles = relationship('Article', back_populates='author', cascade='all, delete-orphan')
    
    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

class Article(Base):
    """文章模型"""
    __tablename__ = 'articles'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(200), nullable=False)
    content = Column(String(5000))
    views = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # 外键关联到用户表
    author_id = Column(Integer, ForeignKey('users.id'))
    
    # 反向关系:文章属于一个用户
    author = relationship('User', back_populates='articles')
    
    def __repr__(self):
        return f"<Article(id={self.id}, title='{self.title}')>"

# 创建引擎(以SQLite为例)
engine = create_engine('sqlite:///blog.db', echo=True)

# 创建所有表
Base.metadata.create_all(engine)
print("数据表创建成功!")

上述模型定义中,relationship定义了两个模型之间的关联关系,ForeignKey定义了外键约束。


三、会话管理

会话(Session)是SQLAlchemy与数据库交互的核心。所有数据库操作都通过Session进行,Session负责跟踪对象的状态变化并管理事务。

创建和配置会话

代码示例

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建引擎
engine = create_engine('sqlite:///blog.db')

# 创建会话工厂
Session = sessionmaker(bind=engine)

# 创建会话实例
session = Session()

# 使用完毕后关闭会话
session.close()

使用上下文管理器

推荐使用上下文管理器来自动管理会话的生命周期。

代码示例

from contextlib import contextmanager

@contextmanager
def get_session():
    """会话上下文管理器"""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# 使用示例
with get_session() as session:
    new_user = User(username='张三', email='zhangsan@example.com', age=28)
    session.add(new_user)
    # 退出上下文时自动提交或回滚

添加、更新和删除对象

代码示例

with get_session() as session:
    # === 添加对象 ===
    new_user = User(username='李四', email='lisi@example.com', age=32)
    session.add(new_user)
    
    # 批量添加
    session.add_all([
        User(username='王五', email='wangwu@example.com', age=25),
        User(username='赵六', email='zhaoliu@example.com', age=30)
    ])
    
    # 获取自动生成的ID
    print(f"新用户ID:{new_user.id}")
    
    # === 更新对象 ===
    user = session.query(User).filter_by(username='张三').first()
    if user:
        user.age = 29  # 直接修改属性
        # session.commit() 会在上下文退出时自动调用
    
    # === 删除对象 ===
    user_to_delete = session.query(User).filter_by(username='赵六').first()
    if user_to_delete:
        session.delete(user_to_delete)

四、查询操作

SQLAlchemy提供了丰富的查询API,支持各种复杂的数据库查询操作。

基本查询

代码示例

with get_session() as session:
    # 查询所有用户
    all_users = session.query(User).all()
    
    # 查询第一条记录
    first_user = session.query(User).first()
    
    # 根据主键查询
    user = session.query(User).get(1)  # SQLAlchemy 1.x
    user = session.get(User, 1)        # SQLAlchemy 2.x
    
    # 查询指定列
    usernames = session.query(User.username).all()
    # 返回:[('张三',), ('李四',), ...]
    
    # 去重查询
    unique_ages = session.query(User.age).distinct().all()

条件查询

代码示例

from sqlalchemy import and_, or_, not_

with get_session() as session:
    # 单条件查询
    adults = session.query(User).filter(User.age >= 18).all()
    
    # 使用filter_by(仅支持等于)
    user = session.query(User).filter_by(username='张三').first()
    
    # 多条件查询(AND)
    young_users = session.query(User).filter(
        and_(User.age >= 18, User.age <= 30)
    ).all()
    
    # OR条件
    users = session.query(User).filter(
        or_(User.age < 20, User.age > 40)
    ).all()
    
    # 模糊查询
    users = session.query(User).filter(
        User.username.like('%张%')
    ).all()
    
    # IN查询
    users = session.query(User).filter(
        User.username.in_(['张三', '李四', '王五'])
    ).all()
    
    # NOT IN查询
    users = session.query(User).filter(
        ~User.username.in_(['张三', '李四'])
    ).all()

排序、限制和分页

代码示例

with get_session() as session:
    # 排序
    users_by_age = session.query(User).order_by(User.age).all()
    users_by_age_desc = session.query(User).order_by(User.age.desc()).all()
    
    # 多字段排序
    users_sorted = session.query(User).order_by(
        User.age.desc(), User.username.asc()
    ).all()
    
    # 限制结果数量
    top_5 = session.query(User).limit(5).all()
    
    # 偏移(跳过前N条)
    skip_first_10 = session.query(User).offset(10).all()
    
    # 分页(第2页,每页10条)
    page = 2
    per_page = 10
    paginated_users = session.query(User)\
        .order_by(User.id)\
        .limit(per_page)\
        .offset((page - 1) * per_page)\
        .all()

聚合查询

代码示例

from sqlalchemy import func

with get_session() as session:
    # 计数
    user_count = session.query(func.count(User.id)).scalar()
    print(f"用户总数:{user_count}")
    
    # 平均值
    avg_age = session.query(func.avg(User.age)).scalar()
    print(f"平均年龄:{avg_age:.1f}")
    
    # 最大值/最小值
    max_age = session.query(func.max(User.age)).scalar()
    min_age = session.query(func.min(User.age)).scalar()
    
    # 分组聚合
    age_groups = session.query(
        User.age,
        func.count(User.id).label('count')
    ).group_by(User.age).all()
    
    for age, count in age_groups:
        print(f"年龄{age}岁:{count}人")

关联查询

代码示例

from sqlalchemy.orm import joinedload, selectinload

with get_session() as session:
    # JOIN查询
    results = session.query(User, Article)\
        .join(Article, User.id == Article.author_id)\
        .filter(User.username == '张三')\
        .all()
    
    for user, article in results:
        print(f"{user.username} 写了文章:{article.title}")
    
    # 预加载关联数据(避免N+1问题)
    users_with_articles = session.query(User)\
        .options(joinedload(User.articles))\
        .all()
    
    for user in users_with_articles:
        print(f"{user.username} 有 {len(user.articles)} 篇文章")
        for article in user.articles:
            print(f"  - {article.title}")

五、完整示例代码

代码示例

"""
SQLAlchemy完整示例 - 博客管理系统
"""
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship, declarative_base, sessionmaker, joinedload
from contextlib import contextmanager
from datetime import datetime

# 基础配置
Base = declarative_base()

class Category(Base):
    """分类模型"""
    __tablename__ = 'categories'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(50), unique=True, nullable=False)
    description = Column(String(200))
    
    # 一对多:一个分类下有多篇文章
    articles = relationship('Article', back_populates='category')
    
    def __repr__(self):
        return f"<Category(name='{self.name}')>"

class User(Base):
    """用户模型"""
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    
    articles = relationship('Article', back_populates='author')
    
    def __repr__(self):
        return f"<User(username='{self.username}')>"

class Article(Base):
    """文章模型"""
    __tablename__ = 'articles'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(200), nullable=False)
    content = Column(String(5000))
    views = Column(Integer, default=0)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    author_id = Column(Integer, ForeignKey('users.id'))
    category_id = Column(Integer, ForeignKey('categories.id'))
    
    author = relationship('User', back_populates='articles')
    category = relationship('Category', back_populates='articles')
    
    def __repr__(self):
        return f"<Article(title='{self.title}')>"

# 数据库配置
DATABASE_URL = 'sqlite:///blog_demo.db'
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)

@contextmanager
def get_session():
    """会话上下文管理器"""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

def init_db():
    """初始化数据库"""
    Base.metadata.create_all(engine)
    print("数据库初始化完成!")

def add_user(username, email):
    """添加用户"""
    with get_session() as session:
        user = User(username=username, email=email)
        session.add(user)
        session.flush()
        print(f"用户 {username} 添加成功,ID:{user.id}")
        return user.id

def add_article(title, content, username, category_name):
    """添加文章"""
    with get_session() as session:
        author = session.query(User).filter_by(username=username).first()
        category = session.query(Category).filter_by(name=category_name).first()
        
        if not author:
            print(f"用户 {username} 不存在")
            return
        if not category:
            print(f"分类 {category_name} 不存在")
            return
        
        article = Article(
            title=title,
            content=content,
            author=author,
            category=category
        )
        session.add(article)
        session.flush()
        print(f"文章《{title}》发布成功!")

def get_user_articles(username):
    """获取用户的所有文章(预加载避免N+1)"""
    with get_session() as session:
        user = session.query(User)\
            .options(joinedload(User.articles))\
            .filter_by(username=username)\
            .first()
        
        if user:
            print(f"\n{user.username} 的文章:")
            for article in user.articles:
                print(f"  《{article.title}》 - {article.category.name if article.category else '无分类'}")
        else:
            print(f"用户 {username} 不存在")

def get_statistics():
    """获取博客统计信息"""
    with get_session() as session:
        # 总用户数
        user_count = session.query(func.count(User.id)).scalar()
        
        # 总文章数
        article_count = session.query(func.count(Article.id)).scalar()
        
        # 总浏览量
        total_views = session.query(func.sum(Article.views)).scalar() or 0
        
        # 各分类文章数
        category_stats = session.query(
            Category.name,
            func.count(Article.id).label('article_count')
        ).outerjoin(Article).group_by(Category.id).all()
        
        print("\n=== 博客统计 ===")
        print(f"用户总数:{user_count}")
        print(f"文章总数:{article_count}")
        print(f"总浏览量:{total_views}")
        print("\n各分类文章数:")
        for name, count in category_stats:
            print(f"  {name}:{count}篇")

if __name__ == '__main__':
    # 初始化
    init_db()
    
    # 添加分类
    with get_session() as session:
        for name, desc in [
            ('Python', 'Python编程相关文章'),
            ('Web开发', 'Web开发技术文章'),
            ('数据库', '数据库相关知识')
        ]:
            session.add(Category(name=name, description=desc))
    
    # 添加用户
    add_user('张三', 'zhangsan@example.com')
    add_user('李四', 'lisi@example.com')
    
    # 添加文章
    add_article('Python入门教程', '本文介绍Python基础知识...', '张三', 'Python')
    add_article('Flask Web开发', '使用Flask构建Web应用...', '张三', 'Web开发')
    add_article('SQLAlchemy使用指南', '学习SQLAlchemy的用法...', '李四', '数据库')
    
    # 查询用户文章
    get_user_articles('张三')
    
    # 查看统计
    get_statistics()

六、注意事项

⚠️ 注意1:SQLAlchemy 2.0迁移

SQLAlchemy 2.0引入了新的查询语法:session.execute(select(User))替代session.query(User)。新项目建议使用2.0风格语法,但1.x风格仍被支持。

⚠️ 注意2:懒加载陷阱

SQLAlchemy默认使用懒加载(lazy loading),访问关联对象时才执行查询。在循环中访问关联属性会导致N+1查询问题。使用joinedloadselectinload预加载。

⚠️ 注意3:会话线程安全

Session不是线程安全的,不应在多线程间共享。每个线程应该创建自己的Session实例。在Web应用中,推荐每个请求创建一个Session,请求结束后关闭。

⚠️ 注意4:数据库迁移

SQLAlchemy的create_all()只会创建新表,不会修改已有表结构。生产环境建议使用Alembic进行数据库版本管理和迁移。

小贴士

开启SQL日志可以帮助调试:在create_engine()中设置echo=True,所有生成的SQL语句都会被打印到控制台,方便排查问题。

常见问题

session.query()和session.execute(select())有什么区别?

session.query()是SQLAlchemy 1.x的传统查询方式,session.execute(select())是2.0推荐的新语法。2.0语法更加统一,select()可以构建更复杂的查询。两者功能基本等价,新项目建议使用2.0风格。

如何执行原生SQL?

使用session.execute(text("SELECT * FROM users"))可以执行原生SQL。需要导入from sqlalchemy import text。原生SQL适合ORM难以表达的复杂查询。

SQLAlchemy如何批量插入大量数据?

使用session.add_all([...])可以批量添加。对于超大数据量,可以使用session.execute(insert(Model), data_list)进行批量插入,性能更好。也可以使用fast_executemany=True选项(MySQL/SQL Server)。

SQLAlchemy支持异步操作吗?

SQLAlchemy 1.4+支持异步操作,使用create_async_engine()AsyncSession。异步操作需要安装对应的异步驱动,如aiomysql(MySQL)或asyncpg(PostgreSQL)。


七、练习题

练习1:在线书店管理系统

使用SQLAlchemy创建一个在线书店管理系统。要求:定义Book、Author、Publisher三个模型,Author和Book是多对多关系(一本书可以有多个作者),Publisher和Book是一对多关系。实现书籍的增删改查、按作者搜索、按出版社筛选、价格区间查询、销量排行榜等功能。

练习2:数据统计与报表

基于博客管理系统,编写查询函数实现以下统计报表功能:1)每月文章发布数量统计;2)各分类文章数量和平均浏览量;3)活跃用户排名(按文章数量和总浏览量综合计算);4)热门文章TOP10。要求使用聚合函数、分组、排序等SQL高级特性,并尽量使用预加载避免N+1查询。


小结

  • 模型定义:继承declarative_base(),使用Column定义字段,relationship定义关联关系

  • 会话管理:Session是核心工作空间,负责跟踪对象状态、执行事务,使用上下文管理器自动管理

  • 查询操作:支持filter、filter_by条件查询,order_by排序,limit/offset分页,func聚合函数,joinedload预加载

  • 最佳实践:使用上下文管理会话、预加载避免N+1、注意2.0语法迁移、生产环境用Alembic做迁移

标签: Python SQLAlchemy ORM 数据库 会话管理 模型定义 查询操作

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python ORM对象关系映射教程 - 优缺点对比与框架推荐 下一篇: Socket编程基础 - Python网络编程入门教程

poll相关推荐