pin_drop当前位置:知识文库 ❯ 图文
Python SQLAlchemy ORM教程 - 模型定义会话管理查询操作
一、SQLAlchemy简介
SQLAlchemy是Python中最流行的ORM框架,提供了强大的对象关系映射能力和灵活的SQL表达式语言。它支持多种数据库后端(SQLite、MySQL、PostgreSQL、Oracle等),并且内置了连接池功能。
安装SQLAlchemy
代码示例
# 安装SQLAlchemy 2.x
pip install sqlalchemy
# 安装MySQL驱动支持
pip install pymysql
# 安装PostgreSQL驱动支持
pip install psycopg2-binary数据库连接URL格式
SQLAlchemy使用统一的URL格式连接不同数据库:
代码示例
# SQLite
engine = create_engine('sqlite:///example.db')
# MySQL(使用PyMySQL驱动)
engine = create_engine('mysql+pymysql://user:password@localhost:3306/dbname')
# PostgreSQL
engine = create_engine('postgresql+psycopg2://user:password@localhost:5432/dbname')
# 内存SQLite(用于测试)
engine = create_engine('sqlite:///:memory:')二、模型定义
在SQLAlchemy中,模型通过继承declarative_base()来定义。每个模型类对应数据库中的一张表。
代码示例
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, create_engine
from sqlalchemy.orm import relationship, declarative_base
from datetime import datetime
Base = declarative_base()
class User(Base):
"""用户模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
# 一对多关系:用户可以有多个文章
articles = relationship('Article', back_populates='author', cascade='all, delete-orphan')
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
class Article(Base):
"""文章模型"""
__tablename__ = 'articles'
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(200), nullable=False)
content = Column(String(5000))
views = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
# 外键关联到用户表
author_id = Column(Integer, ForeignKey('users.id'))
# 反向关系:文章属于一个用户
author = relationship('User', back_populates='articles')
def __repr__(self):
return f"<Article(id={self.id}, title='{self.title}')>"
# 创建引擎(以SQLite为例)
engine = create_engine('sqlite:///blog.db', echo=True)
# 创建所有表
Base.metadata.create_all(engine)
print("数据表创建成功!")
上述模型定义中,relationship定义了两个模型之间的关联关系,ForeignKey定义了外键约束。
三、会话管理
会话(Session)是SQLAlchemy与数据库交互的核心。所有数据库操作都通过Session进行,Session负责跟踪对象的状态变化并管理事务。
创建和配置会话
代码示例
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 创建引擎
engine = create_engine('sqlite:///blog.db')
# 创建会话工厂
Session = sessionmaker(bind=engine)
# 创建会话实例
session = Session()
# 使用完毕后关闭会话
session.close()使用上下文管理器
推荐使用上下文管理器来自动管理会话的生命周期。
代码示例
from contextlib import contextmanager
@contextmanager
def get_session():
"""会话上下文管理器"""
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# 使用示例
with get_session() as session:
new_user = User(username='张三', email='zhangsan@example.com', age=28)
session.add(new_user)
# 退出上下文时自动提交或回滚添加、更新和删除对象
代码示例
with get_session() as session:
# === 添加对象 ===
new_user = User(username='李四', email='lisi@example.com', age=32)
session.add(new_user)
# 批量添加
session.add_all([
User(username='王五', email='wangwu@example.com', age=25),
User(username='赵六', email='zhaoliu@example.com', age=30)
])
# 获取自动生成的ID
print(f"新用户ID:{new_user.id}")
# === 更新对象 ===
user = session.query(User).filter_by(username='张三').first()
if user:
user.age = 29 # 直接修改属性
# session.commit() 会在上下文退出时自动调用
# === 删除对象 ===
user_to_delete = session.query(User).filter_by(username='赵六').first()
if user_to_delete:
session.delete(user_to_delete)四、查询操作
SQLAlchemy提供了丰富的查询API,支持各种复杂的数据库查询操作。
基本查询
代码示例
with get_session() as session:
# 查询所有用户
all_users = session.query(User).all()
# 查询第一条记录
first_user = session.query(User).first()
# 根据主键查询
user = session.query(User).get(1) # SQLAlchemy 1.x
user = session.get(User, 1) # SQLAlchemy 2.x
# 查询指定列
usernames = session.query(User.username).all()
# 返回:[('张三',), ('李四',), ...]
# 去重查询
unique_ages = session.query(User.age).distinct().all()条件查询
代码示例
from sqlalchemy import and_, or_, not_
with get_session() as session:
# 单条件查询
adults = session.query(User).filter(User.age >= 18).all()
# 使用filter_by(仅支持等于)
user = session.query(User).filter_by(username='张三').first()
# 多条件查询(AND)
young_users = session.query(User).filter(
and_(User.age >= 18, User.age <= 30)
).all()
# OR条件
users = session.query(User).filter(
or_(User.age < 20, User.age > 40)
).all()
# 模糊查询
users = session.query(User).filter(
User.username.like('%张%')
).all()
# IN查询
users = session.query(User).filter(
User.username.in_(['张三', '李四', '王五'])
).all()
# NOT IN查询
users = session.query(User).filter(
~User.username.in_(['张三', '李四'])
).all()排序、限制和分页
代码示例
with get_session() as session:
# 排序
users_by_age = session.query(User).order_by(User.age).all()
users_by_age_desc = session.query(User).order_by(User.age.desc()).all()
# 多字段排序
users_sorted = session.query(User).order_by(
User.age.desc(), User.username.asc()
).all()
# 限制结果数量
top_5 = session.query(User).limit(5).all()
# 偏移(跳过前N条)
skip_first_10 = session.query(User).offset(10).all()
# 分页(第2页,每页10条)
page = 2
per_page = 10
paginated_users = session.query(User)\
.order_by(User.id)\
.limit(per_page)\
.offset((page - 1) * per_page)\
.all()聚合查询
代码示例
from sqlalchemy import func
with get_session() as session:
# 计数
user_count = session.query(func.count(User.id)).scalar()
print(f"用户总数:{user_count}")
# 平均值
avg_age = session.query(func.avg(User.age)).scalar()
print(f"平均年龄:{avg_age:.1f}")
# 最大值/最小值
max_age = session.query(func.max(User.age)).scalar()
min_age = session.query(func.min(User.age)).scalar()
# 分组聚合
age_groups = session.query(
User.age,
func.count(User.id).label('count')
).group_by(User.age).all()
for age, count in age_groups:
print(f"年龄{age}岁:{count}人")关联查询
代码示例
from sqlalchemy.orm import joinedload, selectinload
with get_session() as session:
# JOIN查询
results = session.query(User, Article)\
.join(Article, User.id == Article.author_id)\
.filter(User.username == '张三')\
.all()
for user, article in results:
print(f"{user.username} 写了文章:{article.title}")
# 预加载关联数据(避免N+1问题)
users_with_articles = session.query(User)\
.options(joinedload(User.articles))\
.all()
for user in users_with_articles:
print(f"{user.username} 有 {len(user.articles)} 篇文章")
for article in user.articles:
print(f" - {article.title}")五、完整示例代码
代码示例
"""
SQLAlchemy完整示例 - 博客管理系统
"""
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship, declarative_base, sessionmaker, joinedload
from contextlib import contextmanager
from datetime import datetime
# 基础配置
Base = declarative_base()
class Category(Base):
"""分类模型"""
__tablename__ = 'categories'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), unique=True, nullable=False)
description = Column(String(200))
# 一对多:一个分类下有多篇文章
articles = relationship('Article', back_populates='category')
def __repr__(self):
return f"<Category(name='{self.name}')>"
class User(Base):
"""用户模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
articles = relationship('Article', back_populates='author')
def __repr__(self):
return f"<User(username='{self.username}')>"
class Article(Base):
"""文章模型"""
__tablename__ = 'articles'
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(200), nullable=False)
content = Column(String(5000))
views = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
author_id = Column(Integer, ForeignKey('users.id'))
category_id = Column(Integer, ForeignKey('categories.id'))
author = relationship('User', back_populates='articles')
category = relationship('Category', back_populates='articles')
def __repr__(self):
return f"<Article(title='{self.title}')>"
# 数据库配置
DATABASE_URL = 'sqlite:///blog_demo.db'
engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine)
@contextmanager
def get_session():
"""会话上下文管理器"""
session = Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
def init_db():
"""初始化数据库"""
Base.metadata.create_all(engine)
print("数据库初始化完成!")
def add_user(username, email):
"""添加用户"""
with get_session() as session:
user = User(username=username, email=email)
session.add(user)
session.flush()
print(f"用户 {username} 添加成功,ID:{user.id}")
return user.id
def add_article(title, content, username, category_name):
"""添加文章"""
with get_session() as session:
author = session.query(User).filter_by(username=username).first()
category = session.query(Category).filter_by(name=category_name).first()
if not author:
print(f"用户 {username} 不存在")
return
if not category:
print(f"分类 {category_name} 不存在")
return
article = Article(
title=title,
content=content,
author=author,
category=category
)
session.add(article)
session.flush()
print(f"文章《{title}》发布成功!")
def get_user_articles(username):
"""获取用户的所有文章(预加载避免N+1)"""
with get_session() as session:
user = session.query(User)\
.options(joinedload(User.articles))\
.filter_by(username=username)\
.first()
if user:
print(f"\n{user.username} 的文章:")
for article in user.articles:
print(f" 《{article.title}》 - {article.category.name if article.category else '无分类'}")
else:
print(f"用户 {username} 不存在")
def get_statistics():
"""获取博客统计信息"""
with get_session() as session:
# 总用户数
user_count = session.query(func.count(User.id)).scalar()
# 总文章数
article_count = session.query(func.count(Article.id)).scalar()
# 总浏览量
total_views = session.query(func.sum(Article.views)).scalar() or 0
# 各分类文章数
category_stats = session.query(
Category.name,
func.count(Article.id).label('article_count')
).outerjoin(Article).group_by(Category.id).all()
print("\n=== 博客统计 ===")
print(f"用户总数:{user_count}")
print(f"文章总数:{article_count}")
print(f"总浏览量:{total_views}")
print("\n各分类文章数:")
for name, count in category_stats:
print(f" {name}:{count}篇")
if __name__ == '__main__':
# 初始化
init_db()
# 添加分类
with get_session() as session:
for name, desc in [
('Python', 'Python编程相关文章'),
('Web开发', 'Web开发技术文章'),
('数据库', '数据库相关知识')
]:
session.add(Category(name=name, description=desc))
# 添加用户
add_user('张三', 'zhangsan@example.com')
add_user('李四', 'lisi@example.com')
# 添加文章
add_article('Python入门教程', '本文介绍Python基础知识...', '张三', 'Python')
add_article('Flask Web开发', '使用Flask构建Web应用...', '张三', 'Web开发')
add_article('SQLAlchemy使用指南', '学习SQLAlchemy的用法...', '李四', '数据库')
# 查询用户文章
get_user_articles('张三')
# 查看统计
get_statistics()六、注意事项
⚠️ 注意1:SQLAlchemy 2.0迁移
SQLAlchemy 2.0引入了新的查询语法:
session.execute(select(User))替代session.query(User)。新项目建议使用2.0风格语法,但1.x风格仍被支持。
⚠️ 注意2:懒加载陷阱
SQLAlchemy默认使用懒加载(lazy loading),访问关联对象时才执行查询。在循环中访问关联属性会导致N+1查询问题。使用
joinedload或selectinload预加载。
⚠️ 注意3:会话线程安全
Session不是线程安全的,不应在多线程间共享。每个线程应该创建自己的Session实例。在Web应用中,推荐每个请求创建一个Session,请求结束后关闭。
⚠️ 注意4:数据库迁移
SQLAlchemy的
create_all()只会创建新表,不会修改已有表结构。生产环境建议使用Alembic进行数据库版本管理和迁移。
小贴士
开启SQL日志可以帮助调试:在create_engine()中设置echo=True,所有生成的SQL语句都会被打印到控制台,方便排查问题。
常见问题
session.query()和session.execute(select())有什么区别?
session.query()是SQLAlchemy 1.x的传统查询方式,session.execute(select())是2.0推荐的新语法。2.0语法更加统一,select()可以构建更复杂的查询。两者功能基本等价,新项目建议使用2.0风格。
如何执行原生SQL?
使用session.execute(text("SELECT * FROM users"))可以执行原生SQL。需要导入from sqlalchemy import text。原生SQL适合ORM难以表达的复杂查询。
SQLAlchemy如何批量插入大量数据?
使用session.add_all([...])可以批量添加。对于超大数据量,可以使用session.execute(insert(Model), data_list)进行批量插入,性能更好。也可以使用fast_executemany=True选项(MySQL/SQL Server)。
SQLAlchemy支持异步操作吗?
SQLAlchemy 1.4+支持异步操作,使用create_async_engine()和AsyncSession。异步操作需要安装对应的异步驱动,如aiomysql(MySQL)或asyncpg(PostgreSQL)。
七、练习题
练习1:在线书店管理系统
使用SQLAlchemy创建一个在线书店管理系统。要求:定义Book、Author、Publisher三个模型,Author和Book是多对多关系(一本书可以有多个作者),Publisher和Book是一对多关系。实现书籍的增删改查、按作者搜索、按出版社筛选、价格区间查询、销量排行榜等功能。
练习2:数据统计与报表
基于博客管理系统,编写查询函数实现以下统计报表功能:1)每月文章发布数量统计;2)各分类文章数量和平均浏览量;3)活跃用户排名(按文章数量和总浏览量综合计算);4)热门文章TOP10。要求使用聚合函数、分组、排序等SQL高级特性,并尽量使用预加载避免N+1查询。
小结
-
模型定义:继承declarative_base(),使用Column定义字段,relationship定义关联关系
-
会话管理:Session是核心工作空间,负责跟踪对象状态、执行事务,使用上下文管理器自动管理
-
查询操作:支持filter、filter_by条件查询,order_by排序,limit/offset分页,func聚合函数,joinedload预加载
-
最佳实践:使用上下文管理会话、预加载避免N+1、注意2.0语法迁移、生产环境用Alembic做迁移
本文涉及AI创作
内容由AI创作,请仔细甄别