pin_drop当前位置:知识文库 ❯ 图文
线程池ThreadPoolExecutor详解 - Python并发编程指南
ThreadPoolExecutor概述
ThreadPoolExecutor 是 Python concurrent.futures 模块提供的高级线程池实现。它简化了多线程编程,自动管理线程的创建、分配和回收,让开发者专注于任务本身。
线程池的核心优势:
-
资源复用:线程可重复使用,避免频繁创建销毁的开销
-
并发控制:限制同时运行的线程数量,防止资源耗尽
-
任务管理:提供submit/map接口,支持异步执行和结果获取
-
简化编程:隐藏线程管理细节,代码更简洁易维护
创建线程池
创建线程池有两种方式:
方式一:使用with语句(推荐)
代码示例
from concurrent.futures import ThreadPoolExecutor
# 创建固定大小的线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交任务
future = executor.submit(task_func, arg1, arg2)
# with块结束时自动关闭线程池方式二:手动管理
代码示例
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=5)
try:
future = executor.submit(task_func, arg1, arg2)
finally:
executor.shutdown(wait=True) # 等待所有任务完成ThreadPoolExecutor参数:
-
max_workers:最大工作线程数,默认为CPU核心数*5
-
thread_name_prefix:线程名称前缀,便于调试
submit方法
submit() 方法用于提交单个任务到线程池:
代码示例
from concurrent.futures import ThreadPoolExecutor
import time
def task(name, duration):
print(f"任务 {name} 开始执行")
time.sleep(duration)
print(f"任务 {name} 执行完成")
return f"结果-{name}"
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
future1 = executor.submit(task, "A", 2)
future2 = executor.submit(task, "B", 1)
future3 = executor.submit(task, "C", 3)
# 获取结果(阻塞等待)
result1 = future1.result()
result2 = future2.result()
result3 = future3.result()
print(f"所有结果: {result1}, {result2}, {result3}")submit()的特点:
-
异步执行:立即返回Future对象,不阻塞主线程
-
灵活参数:支持位置参数和关键字参数
-
结果获取:通过Future.result()获取返回值
map方法
map() 方法用于批量提交相同函数的多个参数组合:
代码示例
from concurrent.futures import ThreadPoolExecutor
import time
def square(n):
time.sleep(0.5)
return n * n
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with ThreadPoolExecutor(max_workers=5) as executor:
# 批量提交任务
results = executor.map(square, numbers)
# results是迭代器,按提交顺序返回结果
for num, result in zip(numbers, results):
print(f"{num}^2 = {result}")map()与submit()的区别:
Future对象
Future对象代表异步执行的结果,提供以下方法:
代码示例
from concurrent.futures import ThreadPoolExecutor
import time
def task(duration):
time.sleep(duration)
return f"完成-{duration}"
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(task, 2)
# 检查任务状态
print(f"是否运行中: {future.running()}")
print(f"是否完成: {future.done()}")
# 等待结果(可设置超时)
result = future.result(timeout=5)
print(f"结果: {result}")
# 添加回调函数
future2 = executor.submit(task, 1)
future2.add_done_callback(lambda f: print(f"回调: {f.result()}"))Future常用方法:
-
result(timeout=None):获取结果,可设置超时
-
done():检查任务是否完成
-
running():检查任务是否正在运行
-
cancel():尝试取消任务
-
add_done_callback(fn):添加完成回调函数
代码示例
示例1:批量下载文件
代码示例
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def download_file(url):
"""模拟文件下载"""
print(f"开始下载: {url}")
time.sleep(random.uniform(1, 3))
print(f"下载完成: {url}")
return f"下载成功-{url}"
urls = [
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip",
"https://example.com/file4.zip",
"https://example.com/file5.zip",
]
# 使用线程池并发下载
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交所有任务
future_to_url = {executor.submit(download_file, url): url for url in urls}
# 使用as_completed按完成顺序处理结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
print(f"{url}: {result}")
except Exception as e:
print(f"{url} 下载失败: {e}")示例2:数据处理管道
代码示例
from concurrent.futures import ThreadPoolExecutor
import time
def fetch_data(id):
"""获取数据"""
time.sleep(0.5)
return {"id": id, "value": id * 10}
def process_data(data):
"""处理数据"""
time.sleep(0.3)
data["processed"] = data["value"] * 2
return data
def save_data(data):
"""保存数据"""
time.sleep(0.2)
print(f"保存数据: {data}")
return True
# 数据处理管道
with ThreadPoolExecutor(max_workers=5) as executor:
# 阶段1: 获取数据
data_futures = [executor.submit(fetch_data, i) for i in range(10)]
# 阶段2: 处理数据
process_futures = []
for future in data_futures:
data = future.result()
process_futures.append(executor.submit(process_data, data))
# 阶段3: 保存数据
save_futures = []
for future in process_futures:
data = future.result()
save_futures.append(executor.submit(save_data, data))
# 等待所有保存完成
results = [f.result() for f in save_futures]
print(f"所有数据保存完成: {sum(results)} 条")示例3:带超时的任务执行
代码示例
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def slow_task(duration):
time.sleep(duration)
return f"任务完成-{duration}"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(slow_task, 1),
executor.submit(slow_task, 2),
executor.submit(slow_task, 5), # 这个会超时
]
for i, future in enumerate(futures):
try:
result = future.result(timeout=3)
print(f"任务{i}: {result}")
except TimeoutError:
print(f"任务{i}: 执行超时")
except Exception as e:
print(f"任务{i}: 执行异常 - {e}")注意事项
注意1:合理设置max_workers。IO密集型任务可设置较大值(如CPU核心数*5),CPU密集型任务应设置为CPU核心数。
注意2:使用with语句管理线程池,确保线程池正确关闭。手动管理时必须调用shutdown()。
注意3:result()方法会阻塞等待结果。如需非阻塞获取,使用as_completed()或add_done_callback()。
注意4:异常处理很重要。任务中的异常会在调用result()时抛出,需要用try-except捕获。
注意5:避免在线程池中提交过多任务导致内存占用过高。可以使用队列或分批提交。
小结
-
ThreadPoolExecutor作用:提供高级线程池管理,简化并发编程
-
核心方法:submit()提交单个任务,map()批量提交,Future管理结果
-
典型应用:批量下载、数据处理管道、并发IO操作
-
最佳实践:使用with语句管理,合理设置线程数,正确处理异常
练习题
练习1
实现一个并发URL检查器,使用线程池同时检查多个URL的可访问性,返回每个URL的状态码和响应时间。
练习2
实现一个图片处理管道,使用线程池并发读取、处理和保存图片,支持进度显示和错误处理。
常见问题
ThreadPoolExecutor和直接创建Thread有什么区别?
ThreadPoolExecutor自动管理线程生命周期,复用线程减少开销,提供高级API简化编程。直接创建Thread需要手动管理线程,每次都要创建新线程,代码更复杂。
如何获取任务的执行进度?
可以使用as_completed()按完成顺序处理结果,或使用add_done_callback()添加回调函数。也可以在任务内部更新共享的进度计数器(需加锁保护)。
线程池中的任务可以取消吗?
可以调用Future.cancel()尝试取消。但如果任务已经开始执行,cancel()会返回False且无法取消。只有尚未开始执行的任务才能被取消。
如何设置线程池的最大任务数?
ThreadPoolExecutor本身不限制任务数量,但可以通过信号量(Semaphore)或队列控制并发提交的任务数,防止内存溢出。
本文涉及AI创作
内容由AI创作,请仔细甄别