pin_drop当前位置:知识文库 ❯ 图文
Python生成器高级应用 - 管道模式、协程与send方法详解
一、生成器与send方法
Python生成器不仅可以产出值(yield),还可以接收值。这就是send()方法的作用。它允许调用方在生成器暂停时向其发送数据,实现双向通信。
代码示例
def echo_generator():
"""最简单的接收值的生成器"""
while True:
value = yield # yield接收send发送的值
print(f"收到: {value}")
# 使用
gen = echo_generator()
next(gen) # 启动生成器(必须先执行到第一个yield)
gen.send("Hello") # 收到: Hello
gen.send("World") # 收到: World
gen.send(42) # 收到: 42
send()方法的关键点:首次调用生成器时必须使用next()或send(None)来启动生成器,之后才能发送实际值。
yield表达式返回值
代码示例
def counter_with_input():
"""生成器同时产出值和接收控制信号"""
count = 0
while True:
increment = yield count # 产出count,接收increment
if increment is not None:
count += increment
else:
count += 1
gen = counter_with_input()
print(next(gen)) # 0 (启动)
print(gen.send(5)) # 5 (count += 5)
print(gen.send(3)) # 8 (count += 3)
print(next(gen)) # 9 (increment为None,count += 1)二、协程基础
当生成器被用来接收数据而不是产生数据时,它就变成了一种协程(Coroutine)。协程是可以暂停和恢复执行的函数,可以在执行过程中与调用方交换数据。
Python的协程体系包含以下关键方法:
-
send(value):向生成器发送值,并恢复执行
-
throw(type, value, traceback):在生成器暂停处抛出异常
-
close():关闭生成器,触发GeneratorExit异常
使用装饰器简化协程启动
代码示例
def coroutine(func):
"""自动启动协程的装饰器"""
def wrapper(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen) # 自动启动
return gen
return wrapper
@coroutine
def grep(pattern):
"""简单的grep协程"""
print(f"搜索模式: {pattern}")
while True:
line = yield # 接收输入
if pattern in line:
print(f"匹配: {line}")
# 使用时无需手动next()
g = grep("python")
g.send("I love python programming") # 匹配: I love python programming
g.send("Java is great too") # 无输出
g.send("python is easy") # 匹配: python is easy
g.close() # 关闭协程三、管道模式
管道模式(Pipeline Pattern)是生成器最经典的高级应用之一。它将数据处理拆分为多个阶段,每个阶段是一个生成器或协程,前一阶段的输出作为后一阶段的输入,形成一条数据处理流水线。
管道模式的核心组件
-
生产者(Producer):生成数据的源头,通常是一个生成器函数
-
过滤器(Filter):处理并过滤数据的中间环节
-
消费者(Consumer):接收最终结果并执行操作的协程
四、代码示例与实战
示例1:数据处理管道
代码示例
def producer(n):
"""生产者:生成1到n的整数"""
for i in range(1, n + 1):
yield i
def squarer(source):
"""过滤器:对每个元素求平方"""
for item in source:
yield item ** 2
def filter_positive(source):
"""过滤器:只保留大于10的数"""
for item in source:
if item > 10:
yield item
def printer():
"""消费者:打印结果"""
while True:
item = yield
print(f"结果: {item}")
# 构建管道
data = producer(10)
squared = squarer(data)
filtered = filter_positive(squared)
# 消费管道输出
for result in filtered:
print(f"结果: {result}")
# 结果: 16
# 结果: 25
# 结果: 36
# 结果: 49
# 结果: 64
# 结果: 81
# 结果: 100示例2:日志分析管道
代码示例
import re
@coroutine
def log_parser():
"""日志解析协程"""
pattern = r"(\d{4}-\d{2}-\d{2}) \[(\w+)\] (.+)"
while True:
line = yield
match = re.match(pattern, line)
if match:
date, level, message = match.groups()
print(f"日期: {date} | 级别: {level} | 内容: {message}")
@coroutine
def log_filter(target, level):
"""日志过滤协程,只处理指定级别"""
while True:
line = yield
if f"[{level}]" in line:
target.send(line)
# 使用
parser = log_parser()
error_parser = log_filter(parser, "ERROR")
error_parser.send("2024-01-15 [INFO] Server started")
error_parser.send("2024-01-15 [ERROR] Connection timeout")
# 日期: 2024-01-15 | 级别: ERROR | 内容: Connection timeout
error_parser.send("2024-01-15 [ERROR] Disk full")
# 日期: 2024-01-15 | 级别: ERROR | 内容: Disk full示例3:使用throw处理异常
代码示例
def resilient_processor():
"""能处理异常的生成器"""
while True:
try:
value = yield
print(f"处理: {value}")
except ValueError as e:
print(f"值错误: {e},继续处理...")
except TypeError as e:
print(f"类型错误: {e},终止处理")
break # 遇到TypeError则退出
gen = resilient_processor()
next(gen)
gen.send("hello") # 处理: hello
gen.throw(ValueError, "无效的值") # 值错误: 无效的值,继续处理...
gen.send("world") # 处理: world
gen.throw(TypeError, "类型不匹配") # 类型错误: 类型不匹配,终止处理示例4:使用close优雅关闭
代码示例
def resource_manager():
"""管理资源的生成器"""
try:
print("打开资源...")
while True:
data = yield
print(f"使用资源处理: {data}")
except GeneratorExit:
print("关闭资源...清理完成!")
gen = resource_manager()
next(gen) # 打开资源...
gen.send("数据1") # 使用资源处理: 数据1
gen.send("数据2") # 使用资源处理: 数据2
gen.close() # 关闭资源...清理完成!五、注意事项与最佳实践
注意1:首次启动协程时必须调用next()或send(None)。如果直接send非None值,会抛出TypeError异常。使用装饰器可以避免忘记启动的问题。
注意2:协程使用完毕后应该调用close()关闭,确保GeneratorExit异常被触发,释放可能的资源。如果不关闭,协程在垃圾回收时会自动关闭,但资源释放的时机不可控。
注意3:虽然生成器协程很有趣,但在现代Python开发中,建议使用asyncio和async/await来编写异步代码。生成器协程更多用于理解Python协程的底层原理。
小贴士
管道模式非常适合流式数据处理场景,例如:日志分析、数据清洗、ETL流程等。每个管道阶段都可以独立测试和复用,组合灵活。配合contextlib.contextmanager可以实现更优雅的资源管理。
六、练习题
练习1
编写一个协程averager(),它持续接收数值并实时计算并返回当前的平均值。每次send一个数后,yield返回更新后的平均值。
练习2
构建一个数据处理管道:生产者生成1到100的随机数 -> 过滤器过滤掉小于50的数 -> 转换器将剩余数乘以2 -> 消费者计算总和。使用生成器和协程实现完整的管道。
常见问题
生成器协程和asyncio的异步协程有什么区别?
生成器协程基于yield和send,是Python 3.4之前协程的实现方式。asyncio的异步协程基于async/await(Python 3.5+),由事件循环驱动,支持真正的并发I/O操作。现代Python项目应优先使用asyncio。
为什么要先调用next()才能send值?
因为协程创建后处于初始状态,还没有执行到第一个yield语句。调用next()让协程运行到第一个yield处暂停,此时才能接收send传入的值。这是Python协程的生命周期设计。
管道模式和普通函数调用链有什么不同?
管道模式是惰性求值的,数据按需流动,不需要将所有中间结果存储在内存中。而普通函数调用链通常是先计算完整结果再传递给下一个函数。管道模式适合大数据流式处理。
协程中的GeneratorExit异常是什么?
当调用生成器的close()方法时,Python会在生成器暂停处注入GeneratorExit异常。协程可以捕获这个异常来执行清理操作(如关闭文件、释放锁等)。如果不捕获,生成器正常退出。
本文涉及AI创作
内容由AI创作,请仔细甄别