pin_drop当前位置:知识文库 ❯ 图文
Python asyncio异步编程完全指南:事件循环与协程实战 - 小确幸生活
一、概述
asyncio 是 Python 的异步 I/O 库,提供了编写并发代码的基础设施。它基于事件循环、协程和异步 IO 模型,能够高效地处理大量并发连接。
与多线程不同,asyncio 使用单线程实现并发,通过协程在任务间切换,避免了线程切换的开销和锁的复杂性。特别适合 I/O 密集型应用,如网络爬虫、Web 服务器、数据库查询等场景。
二、语法
核心概念
-
事件循环(Event Loop):asyncio 的核心,负责调度和执行协程任务
-
协程(Coroutine):使用 async def 定义的异步函数
-
任务(Task):对协程的封装,用于并发执行
-
Future:表示异步操作的结果
基本语法
代码示例
import asyncio
# 定义协程函数
async def my_coroutine():
print("开始执行")
await asyncio.sleep(1) # 异步等待
print("执行完成")
return "结果"
# 运行协程
asyncio.run(my_coroutine())常用 API
-
asyncio.run(coro):运行协程的主入口 -
await coro:等待协程完成 -
asyncio.gather(*coros):并发执行多个协程 -
asyncio.create_task(coro):创建任务 -
asyncio.sleep(seconds):异步休眠
三、基本用法
1. 单个协程执行
代码示例
import asyncio
import time
async def say_hello(delay, name):
"""异步函数"""
print(f"[{name}] 开始,等待 {delay} 秒")
await asyncio.sleep(delay) # 非阻塞等待
print(f"[{name}] 你好!")
return f"{name} 完成"
# 运行单个协程
result = asyncio.run(say_hello(1, "Alice"))
print(f"结果: {result}")2. 多个协程并发执行
代码示例
import asyncio
import time
async def task(name, delay):
print(f"[{name}] 开始")
await asyncio.sleep(delay)
print(f"[{name}] 完成")
return f"{name} 的结果"
async def main():
# 方式1: 使用 gather 并发执行
start = time.time()
results = await asyncio.gather(
task("任务1", 2),
task("任务2", 1),
task("任务3", 3)
)
print(f"总耗时: {time.time() - start:.2f} 秒") # 约 3 秒
print(f"所有结果: {results}")
asyncio.run(main())3. 使用 Task 对象
代码示例
import asyncio
async def background_task(name, delay):
print(f"[{name}] 开始")
await asyncio.sleep(delay)
print(f"[{name}] 完成")
return f"{name} 的结果"
async def main():
# 创建任务(立即开始执行)
task1 = asyncio.create_task(background_task("任务A", 2))
task2 = asyncio.create_task(background_task("任务B", 1))
# 可以做其他事情
print("主程序继续执行...")
await asyncio.sleep(0.5)
print("主程序等待任务完成...")
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"结果1: {result1}")
print(f"结果2: {result2}")
asyncio.run(main())提示:create_task() 会立即调度协程执行,而 await 会等待协程完成。使用 Task 可以在等待的同时执行其他操作。
四、代码示例
示例 1:异步网络请求(模拟)
代码示例
import asyncio
import random
async def fetch_data(url, delay):
"""模拟异步网络请求"""
print(f"开始请求: {url}")
await asyncio.sleep(delay) # 模拟网络延迟
data = {"url": url, "status": 200, "data": f"数据_{random.randint(1, 100)}"}
print(f"完成请求: {url}")
return data
async def main():
urls = [
("https://api.example.com/1", 2),
("https://api.example.com/2", 1),
("https://api.example.com/3", 3),
("https://api.example.com/4", 1.5),
]
# 并发请求所有 URL
tasks = [fetch_data(url, delay) for url, delay in urls]
results = await asyncio.gather(*tasks)
print("\n所有结果:")
for result in results:
print(f" {result}")
asyncio.run(main())示例 2:异步生产者-消费者模式
代码示例
import asyncio
import random
async def producer(queue, name, count):
"""生产者:向队列添加数据"""
for i in range(count):
item = f"{name}-产品-{i}"
await queue.put(item)
print(f"[生产者 {name}] 生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
print(f"[生产者 {name}] 完成生产")
async def consumer(queue, name):
"""消费者:从队列获取数据"""
while True:
item = await queue.get()
if item is None: # 结束信号
break
print(f"[消费者 {name}] 消费: {item}")
await asyncio.sleep(random.uniform(0.2, 0.6))
queue.task_done()
print(f"[消费者 {name}] 完成消费")
async def main():
# 创建队列
queue = asyncio.Queue(maxsize=10)
# 创建生产者和消费者
producers = [
producer(queue, "P1", 5),
producer(queue, "P2", 5),
]
consumers = [
consumer(queue, "C1"),
consumer(queue, "C2"),
]
# 并发执行
await asyncio.gather(*producers)
# 发送结束信号
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
print("所有任务完成")
asyncio.run(main())示例 3:异步超时控制
代码示例
import asyncio
async def slow_operation():
"""慢速操作"""
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 设置超时时间为 2 秒
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(f"操作成功: {result}")
except asyncio.TimeoutError:
print("操作超时!")
asyncio.run(main())五、注意事项
⚠️ 注意 1:避免阻塞调用
在协程中不要使用阻塞调用(如 time.sleep()、requests.get()),这会阻塞整个事件循环。应使用异步版本(asyncio.sleep()、aiohttp 等)。
⚠️ 注意 2:正确使用 await
调用协程函数时必须使用 await,否则只会返回协程对象而不会执行。例如:await my_coroutine() 而不是 my_coroutine()。
⚠️ 注意 3:异常处理
使用 try-except 捕获协程中的异常。使用 asyncio.gather() 时,可以设置 return_exceptions=True 来收集所有异常而不是立即抛出。
⚠️ 注意 4:CPU 密集型任务
asyncio 不适合 CPU 密集型任务,会阻塞事件循环。对于 CPU 密集型任务,应使用 multiprocessing 或将计算放到线程池中执行。
六、小结
-
asyncio 核心:基于事件循环、协程和异步 I/O 模型,实现高效的并发编程
-
协程定义:使用 async def 定义协程函数,使用 await 调用其他协程
-
并发执行:使用 asyncio.gather() 或 create_task() 并发执行多个协程
-
适用场景:高并发 I/O 密集型应用,如网络爬虫、Web 服务器、实时通信等
-
注意事项:避免阻塞调用、正确使用 await、处理异常、不适合 CPU 密集型任务
七、练习题
练习 1
编写一个异步程序,模拟同时下载 5 个文件,每个文件下载时间不同(1-5 秒随机)。使用 asyncio.gather() 并发执行,并统计总耗时。
练习 2
实现一个异步的生产者-消费者系统:2 个生产者每秒生产一个数据,3 个消费者消费数据(消费时间 0.5-1.5 秒随机)。使用 asyncio.Queue 进行通信,运行 10 秒后停止。
常见问题
asyncio 和多线程有什么区别?应该选择哪个?
asyncio 使用单线程和协程实现并发,切换开销小,适合高并发 I/O 密集型场景(如数万个并发连接)。多线程使用多个线程,受 GIL 限制,适合中等并发的 I/O 密集型任务。选择建议:需要极高并发选 asyncio,需要与现有同步代码集成选多线程。
为什么不能在协程中使用 time.sleep()?
time.sleep() 是阻塞调用,会阻塞整个事件循环,导致其他协程无法执行。应该使用 asyncio.sleep(),它是非阻塞的,会让出控制权给事件循环,让其他协程继续执行。
如何处理协程中的异常?
使用 try-except 块捕获协程中的异常。对于 asyncio.gather(),可以设置 return_exceptions=True 参数,这样会返回异常对象而不是抛出。对于单个协程,使用 asyncio.wait_for() 可以捕获超时异常。
asyncio 适合 CPU 密集型任务吗?
不适合。asyncio 是单线程的,CPU 密集型任务会阻塞事件循环,导致其他协程无法执行。对于 CPU 密集型任务,应该使用 multiprocessing 模块,或者使用 loop.run_in_executor() 将计算放到线程池或进程池中执行。
本文涉及AI创作
内容由AI创作,请仔细甄别