pin_drop当前位置:知识文库 ❯ 图文

Python生成器高级应用 - 管道模式、协程与send方法详解

一、生成器与send方法

Python生成器不仅可以产出值(yield),还可以接收值。这就是send()方法的作用。它允许调用方在生成器暂停时向其发送数据,实现双向通信。

代码示例

def echo_generator():
    """最简单的接收值的生成器"""
    while True:
        value = yield  # yield接收send发送的值
        print(f"收到: {value}")

# 使用
gen = echo_generator()
next(gen)           # 启动生成器(必须先执行到第一个yield)
gen.send("Hello")   # 收到: Hello
gen.send("World")   # 收到: World
gen.send(42)        # 收到: 42

send()方法的关键点:首次调用生成器时必须使用next()send(None)来启动生成器,之后才能发送实际值。

yield表达式返回值

代码示例

def counter_with_input():
    """生成器同时产出值和接收控制信号"""
    count = 0
    while True:
        increment = yield count  # 产出count,接收increment
        if increment is not None:
            count += increment
        else:
            count += 1

gen = counter_with_input()
print(next(gen))       # 0 (启动)
print(gen.send(5))     # 5 (count += 5)
print(gen.send(3))     # 8 (count += 3)
print(next(gen))       # 9 (increment为None,count += 1)

二、协程基础

当生成器被用来接收数据而不是产生数据时,它就变成了一种协程(Coroutine)。协程是可以暂停和恢复执行的函数,可以在执行过程中与调用方交换数据。

Python的协程体系包含以下关键方法:

  • send(value):向生成器发送值,并恢复执行

  • throw(type, value, traceback):在生成器暂停处抛出异常

  • close():关闭生成器,触发GeneratorExit异常

使用装饰器简化协程启动

代码示例

def coroutine(func):
    """自动启动协程的装饰器"""
    def wrapper(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)  # 自动启动
        return gen
    return wrapper

@coroutine
def grep(pattern):
    """简单的grep协程"""
    print(f"搜索模式: {pattern}")
    while True:
        line = yield  # 接收输入
        if pattern in line:
            print(f"匹配: {line}")

# 使用时无需手动next()
g = grep("python")
g.send("I love python programming")  # 匹配: I love python programming
g.send("Java is great too")          # 无输出
g.send("python is easy")             # 匹配: python is easy
g.close()  # 关闭协程

三、管道模式

管道模式(Pipeline Pattern)是生成器最经典的高级应用之一。它将数据处理拆分为多个阶段,每个阶段是一个生成器或协程,前一阶段的输出作为后一阶段的输入,形成一条数据处理流水线。

管道模式的核心组件

  • 生产者(Producer):生成数据的源头,通常是一个生成器函数

  • 过滤器(Filter):处理并过滤数据的中间环节

  • 消费者(Consumer):接收最终结果并执行操作的协程


四、代码示例与实战

示例1:数据处理管道

代码示例

def producer(n):
    """生产者:生成1到n的整数"""
    for i in range(1, n + 1):
        yield i

def squarer(source):
    """过滤器:对每个元素求平方"""
    for item in source:
        yield item ** 2

def filter_positive(source):
    """过滤器:只保留大于10的数"""
    for item in source:
        if item > 10:
            yield item

def printer():
    """消费者:打印结果"""
    while True:
        item = yield
        print(f"结果: {item}")

# 构建管道
data = producer(10)
squared = squarer(data)
filtered = filter_positive(squared)

# 消费管道输出
for result in filtered:
    print(f"结果: {result}")
# 结果: 16
# 结果: 25
# 结果: 36
# 结果: 49
# 结果: 64
# 结果: 81
# 结果: 100

示例2:日志分析管道

代码示例

import re

@coroutine
def log_parser():
    """日志解析协程"""
    pattern = r"(\d{4}-\d{2}-\d{2}) \[(\w+)\] (.+)"
    while True:
        line = yield
        match = re.match(pattern, line)
        if match:
            date, level, message = match.groups()
            print(f"日期: {date} | 级别: {level} | 内容: {message}")

@coroutine
def log_filter(target, level):
    """日志过滤协程,只处理指定级别"""
    while True:
        line = yield
        if f"[{level}]" in line:
            target.send(line)

# 使用
parser = log_parser()
error_parser = log_filter(parser, "ERROR")

error_parser.send("2024-01-15 [INFO] Server started")
error_parser.send("2024-01-15 [ERROR] Connection timeout")
# 日期: 2024-01-15 | 级别: ERROR | 内容: Connection timeout
error_parser.send("2024-01-15 [ERROR] Disk full")
# 日期: 2024-01-15 | 级别: ERROR | 内容: Disk full

示例3:使用throw处理异常

代码示例

def resilient_processor():
    """能处理异常的生成器"""
    while True:
        try:
            value = yield
            print(f"处理: {value}")
        except ValueError as e:
            print(f"值错误: {e},继续处理...")
        except TypeError as e:
            print(f"类型错误: {e},终止处理")
            break  # 遇到TypeError则退出

gen = resilient_processor()
next(gen)

gen.send("hello")        # 处理: hello
gen.throw(ValueError, "无效的值")  # 值错误: 无效的值,继续处理...
gen.send("world")        # 处理: world
gen.throw(TypeError, "类型不匹配")  # 类型错误: 类型不匹配,终止处理

示例4:使用close优雅关闭

代码示例

def resource_manager():
    """管理资源的生成器"""
    try:
        print("打开资源...")
        while True:
            data = yield
            print(f"使用资源处理: {data}")
    except GeneratorExit:
        print("关闭资源...清理完成!")

gen = resource_manager()
next(gen)  # 打开资源...
gen.send("数据1")  # 使用资源处理: 数据1
gen.send("数据2")  # 使用资源处理: 数据2
gen.close()  # 关闭资源...清理完成!

五、注意事项与最佳实践

注意1:首次启动协程时必须调用next()或send(None)。如果直接send非None值,会抛出TypeError异常。使用装饰器可以避免忘记启动的问题。

注意2:协程使用完毕后应该调用close()关闭,确保GeneratorExit异常被触发,释放可能的资源。如果不关闭,协程在垃圾回收时会自动关闭,但资源释放的时机不可控。

注意3:虽然生成器协程很有趣,但在现代Python开发中,建议使用asyncio和async/await来编写异步代码。生成器协程更多用于理解Python协程的底层原理。

小贴士

管道模式非常适合流式数据处理场景,例如:日志分析、数据清洗、ETL流程等。每个管道阶段都可以独立测试和复用,组合灵活。配合contextlib.contextmanager可以实现更优雅的资源管理。


六、练习题

练习1

编写一个协程averager(),它持续接收数值并实时计算并返回当前的平均值。每次send一个数后,yield返回更新后的平均值。

练习2

构建一个数据处理管道:生产者生成1到100的随机数 -> 过滤器过滤掉小于50的数 -> 转换器将剩余数乘以2 -> 消费者计算总和。使用生成器和协程实现完整的管道。

常见问题

生成器协程和asyncio的异步协程有什么区别?

生成器协程基于yield和send,是Python 3.4之前协程的实现方式。asyncio的异步协程基于async/await(Python 3.5+),由事件循环驱动,支持真正的并发I/O操作。现代Python项目应优先使用asyncio。

为什么要先调用next()才能send值?

因为协程创建后处于初始状态,还没有执行到第一个yield语句。调用next()让协程运行到第一个yield处暂停,此时才能接收send传入的值。这是Python协程的生命周期设计。

管道模式和普通函数调用链有什么不同?

管道模式是惰性求值的,数据按需流动,不需要将所有中间结果存储在内存中。而普通函数调用链通常是先计算完整结果再传递给下一个函数。管道模式适合大数据流式处理。

协程中的GeneratorExit异常是什么?

当调用生成器的close()方法时,Python会在生成器暂停处注入GeneratorExit异常。协程可以捕获这个异常来执行清理操作(如关闭文件、释放锁等)。如果不捕获,生成器正常退出。

标签: send方法 协程 管道模式 GeneratorExit Python高级

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python yield from完全指南 - 委托生成器与嵌套生成器 下一篇: 正则表达式基础语法详解 - Python re模块入门指南

poll相关推荐