pin_drop当前位置:知识文库 ❯ 图文

Python asyncio异步编程完全指南:事件循环与协程实战 - 小确幸生活

一、概述

asyncio 是 Python 的异步 I/O 库,提供了编写并发代码的基础设施。它基于事件循环协程异步 IO 模型,能够高效地处理大量并发连接。

与多线程不同,asyncio 使用单线程实现并发,通过协程在任务间切换,避免了线程切换的开销和锁的复杂性。特别适合 I/O 密集型应用,如网络爬虫、Web 服务器、数据库查询等场景。

对比项 多线程 asyncio
并发模型 抢占式多任务 协作式多任务
线程数量 多个线程 单线程
切换开销 较大(需要锁) 极小(协程切换)
适用场景 I/O 密集型 高并发 I/O 密集型
编程复杂度 中等(需要处理锁) 较高(需要理解异步)

二、语法

核心概念

  • 事件循环(Event Loop):asyncio 的核心,负责调度和执行协程任务

  • 协程(Coroutine):使用 async def 定义的异步函数

  • 任务(Task):对协程的封装,用于并发执行

  • Future:表示异步操作的结果

基本语法

代码示例

import asyncio

# 定义协程函数
async def my_coroutine():
    print("开始执行")
    await asyncio.sleep(1)  # 异步等待
    print("执行完成")
    return "结果"

# 运行协程
asyncio.run(my_coroutine())

常用 API

  • asyncio.run(coro):运行协程的主入口

  • await coro:等待协程完成

  • asyncio.gather(*coros):并发执行多个协程

  • asyncio.create_task(coro):创建任务

  • asyncio.sleep(seconds):异步休眠


三、基本用法

1. 单个协程执行

代码示例

import asyncio
import time

async def say_hello(delay, name):
    """异步函数"""
    print(f"[{name}] 开始,等待 {delay} 秒")
    await asyncio.sleep(delay)  # 非阻塞等待
    print(f"[{name}] 你好!")
    return f"{name} 完成"

# 运行单个协程
result = asyncio.run(say_hello(1, "Alice"))
print(f"结果: {result}")

2. 多个协程并发执行

代码示例

import asyncio
import time

async def task(name, delay):
    print(f"[{name}] 开始")
    await asyncio.sleep(delay)
    print(f"[{name}] 完成")
    return f"{name} 的结果"

async def main():
    # 方式1: 使用 gather 并发执行
    start = time.time()
    results = await asyncio.gather(
        task("任务1", 2),
        task("任务2", 1),
        task("任务3", 3)
    )
    print(f"总耗时: {time.time() - start:.2f} 秒")  # 约 3 秒
    print(f"所有结果: {results}")

asyncio.run(main())

3. 使用 Task 对象

代码示例

import asyncio

async def background_task(name, delay):
    print(f"[{name}] 开始")
    await asyncio.sleep(delay)
    print(f"[{name}] 完成")
    return f"{name} 的结果"

async def main():
    # 创建任务(立即开始执行)
    task1 = asyncio.create_task(background_task("任务A", 2))
    task2 = asyncio.create_task(background_task("任务B", 1))
    
    # 可以做其他事情
    print("主程序继续执行...")
    await asyncio.sleep(0.5)
    print("主程序等待任务完成...")
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"结果1: {result1}")
    print(f"结果2: {result2}")

asyncio.run(main())

提示:create_task() 会立即调度协程执行,而 await 会等待协程完成。使用 Task 可以在等待的同时执行其他操作。


四、代码示例

示例 1:异步网络请求(模拟)

代码示例

import asyncio
import random

async def fetch_data(url, delay):
    """模拟异步网络请求"""
    print(f"开始请求: {url}")
    await asyncio.sleep(delay)  # 模拟网络延迟
    data = {"url": url, "status": 200, "data": f"数据_{random.randint(1, 100)}"}
    print(f"完成请求: {url}")
    return data

async def main():
    urls = [
        ("https://api.example.com/1", 2),
        ("https://api.example.com/2", 1),
        ("https://api.example.com/3", 3),
        ("https://api.example.com/4", 1.5),
    ]
    
    # 并发请求所有 URL
    tasks = [fetch_data(url, delay) for url, delay in urls]
    results = await asyncio.gather(*tasks)
    
    print("\n所有结果:")
    for result in results:
        print(f"  {result}")

asyncio.run(main())

示例 2:异步生产者-消费者模式

代码示例

import asyncio
import random

async def producer(queue, name, count):
    """生产者:向队列添加数据"""
    for i in range(count):
        item = f"{name}-产品-{i}"
        await queue.put(item)
        print(f"[生产者 {name}] 生产: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    print(f"[生产者 {name}] 完成生产")

async def consumer(queue, name):
    """消费者:从队列获取数据"""
    while True:
        item = await queue.get()
        if item is None:  # 结束信号
            break
        print(f"[消费者 {name}] 消费: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))
        queue.task_done()
    print(f"[消费者 {name}] 完成消费")

async def main():
    # 创建队列
    queue = asyncio.Queue(maxsize=10)
    
    # 创建生产者和消费者
    producers = [
        producer(queue, "P1", 5),
        producer(queue, "P2", 5),
    ]
    consumers = [
        consumer(queue, "C1"),
        consumer(queue, "C2"),
    ]
    
    # 并发执行
    await asyncio.gather(*producers)
    
    # 发送结束信号
    for _ in consumers:
        await queue.put(None)
    
    await asyncio.gather(*consumers)
    print("所有任务完成")

asyncio.run(main())

示例 3:异步超时控制

代码示例

import asyncio

async def slow_operation():
    """慢速操作"""
    await asyncio.sleep(5)
    return "完成"

async def main():
    try:
        # 设置超时时间为 2 秒
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(f"操作成功: {result}")
    except asyncio.TimeoutError:
        print("操作超时!")

asyncio.run(main())

五、注意事项

⚠️ 注意 1:避免阻塞调用

在协程中不要使用阻塞调用(如 time.sleep()、requests.get()),这会阻塞整个事件循环。应使用异步版本(asyncio.sleep()、aiohttp 等)。

⚠️ 注意 2:正确使用 await

调用协程函数时必须使用 await,否则只会返回协程对象而不会执行。例如:await my_coroutine() 而不是 my_coroutine()。

⚠️ 注意 3:异常处理

使用 try-except 捕获协程中的异常。使用 asyncio.gather() 时,可以设置 return_exceptions=True 来收集所有异常而不是立即抛出。

⚠️ 注意 4:CPU 密集型任务

asyncio 不适合 CPU 密集型任务,会阻塞事件循环。对于 CPU 密集型任务,应使用 multiprocessing 或将计算放到线程池中执行。


六、小结

  • asyncio 核心:基于事件循环、协程和异步 I/O 模型,实现高效的并发编程

  • 协程定义:使用 async def 定义协程函数,使用 await 调用其他协程

  • 并发执行:使用 asyncio.gather() 或 create_task() 并发执行多个协程

  • 适用场景:高并发 I/O 密集型应用,如网络爬虫、Web 服务器、实时通信等

  • 注意事项:避免阻塞调用、正确使用 await、处理异常、不适合 CPU 密集型任务


七、练习题

练习 1

编写一个异步程序,模拟同时下载 5 个文件,每个文件下载时间不同(1-5 秒随机)。使用 asyncio.gather() 并发执行,并统计总耗时。

练习 2

实现一个异步的生产者-消费者系统:2 个生产者每秒生产一个数据,3 个消费者消费数据(消费时间 0.5-1.5 秒随机)。使用 asyncio.Queue 进行通信,运行 10 秒后停止。

常见问题

asyncio 和多线程有什么区别?应该选择哪个?

asyncio 使用单线程和协程实现并发,切换开销小,适合高并发 I/O 密集型场景(如数万个并发连接)。多线程使用多个线程,受 GIL 限制,适合中等并发的 I/O 密集型任务。选择建议:需要极高并发选 asyncio,需要与现有同步代码集成选多线程。

为什么不能在协程中使用 time.sleep()?

time.sleep() 是阻塞调用,会阻塞整个事件循环,导致其他协程无法执行。应该使用 asyncio.sleep(),它是非阻塞的,会让出控制权给事件循环,让其他协程继续执行。

如何处理协程中的异常?

使用 try-except 块捕获协程中的异常。对于 asyncio.gather(),可以设置 return_exceptions=True 参数,这样会返回异常对象而不是抛出。对于单个协程,使用 asyncio.wait_for() 可以捕获超时异常。

asyncio 适合 CPU 密集型任务吗?

不适合。asyncio 是单线程的,CPU 密集型任务会阻塞事件循环,导致其他协程无法执行。对于 CPU 密集型任务,应该使用 multiprocessing 模块,或者使用 loop.run_in_executor() 将计算放到线程池或进程池中执行。

标签: asyncio 异步编程 协程 事件循环 并发编程

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python GIL全局解释器锁详解:原理影响与突破方案 - 小确幸生活 下一篇: Python async/await语法完全指南:异步函数定义与调用 - 小确幸生活

poll相关推荐