pin_drop当前位置:知识文库 ❯ 图文
Python进程池Pool完全指南:apply/map/apply_async实战详解 - 小确幸生活
概述
multiprocessing.Pool 是Python提供的进程池实现,用于管理和复用一组预创建的进程。通过进程池,我们可以避免频繁创建和销毁进程带来的系统开销,从而高效地实现并行计算。
进程池的核心价值在于资源复用和任务调度。当你有一批相互独立的任务需要并行处理时,进程池会自动将任务分配给空闲的进程,处理完成后回收进程以供后续任务使用。
语法
代码示例
from multiprocessing import Pool, cpu_count
# 创建进程池
pool = Pool(processes=4)
# 参数说明:
# processes: 进程数量,默认为CPU核心数
# 常用方法:
# pool.apply(func, args) — 同步执行函数
# pool.apply_async(func, args) — 异步执行函数
# pool.map(func, iterable) — 批量映射(同步)
# pool.map_async(func, iterable) — 批量映射(异步)
# pool.close() — 关闭进程池,不再接受新任务
# pool.join() — 等待所有进程完成
# pool.terminate() — 立即终止所有进程
# 推荐使用上下文管理器
with Pool(processes=4) as pool:
results = pool.map(func, iterable)基本用法
map 批量映射
代码示例
from multiprocessing import Pool
import time
def square(x):
"""计算平方"""
time.sleep(0.5)
return x * x
if __name__ == '__main__':
numbers = list(range(10))
with Pool(processes=4) as pool:
# map 自动将任务分配给进程池
results = pool.map(square, numbers)
print(f"计算结果: {results}")
# 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]apply_async 异步执行
代码示例
from multiprocessing import Pool
import time
def task(name, duration):
print(f"任务 {name} 开始")
time.sleep(duration)
print(f"任务 {name} 完成")
return f"{name} 的结果"
if __name__ == '__main__':
with Pool(processes=3) as pool:
# 异步提交多个任务
result1 = pool.apply_async(task, args=("A", 2))
result2 = pool.apply_async(task, args=("B", 1))
result3 = pool.apply_async(task, args=("C", 3))
# 获取结果(会阻塞直到完成)
print(result1.get())
print(result2.get())
print(result3.get())代码示例
示例1:批量数据处理
代码示例
from multiprocessing import Pool
import time
def process_data(data):
"""处理单个数据项"""
result = sum(data) * 2
time.sleep(0.1) # 模拟处理时间
return result
if __name__ == '__main__':
# 准备数据
datasets = [[i, i+1, i+2] for i in range(20)]
start = time.time()
with Pool(processes=4) as pool:
results = pool.map(process_data, datasets)
print(f"处理结果: {results[:5]}...")
print(f"总耗时: {time.time() - start:.2f} 秒")示例2:带回调的异步处理
代码示例
from multiprocessing import Pool
import time
def compute(n):
"""计算任务"""
time.sleep(0.5)
return n * n
def callback(result):
"""回调函数"""
print(f"任务完成,结果: {result}")
if __name__ == '__main__':
with Pool(processes=3) as pool:
for i in range(5):
pool.apply_async(compute, args=(i,), callback=callback)
# 关闭进程池,等待所有任务完成
pool.close()
pool.join()
print("所有任务已完成")注意事项
注意1:使用完毕后必须调用
close()和join(),或使用with语句自动管理。
注意2:进程数不应超过CPU核心数。过多进程会导致上下文切换开销增加,反而降低性能。
注意3:map() 会保持结果顺序,但会等待所有任务完成。如果需要立即处理完成的任务,使用 apply_async() 配合回调。
小结
-
Pool:进程池,复用一组进程处理多个任务
-
map:批量映射,自动分配任务,保持结果顺序
-
apply_async:异步执行,支持回调函数
-
资源管理:使用 with 语句或 close()+join() 管理生命周期
练习题
练习1
编写程序,使用进程池并行计算1到100每个数字的平方,比较单线程和多进程(4个进程)的执行时间差异。
练习2
编写一个函数,使用进程池实现批量图片处理(模拟):给定一组图片文件名,使用进程池并发对每个图片进行"压缩"操作(模拟处理延迟),统计总耗时。
常见问题
Pool和Process有什么区别?
Process是单个进程的低级别控制,需要手动管理start/join。Pool是进程池的高级封装,自动管理一组工作进程,提供map/apply_async等方法批量分配任务,使用更简洁。
进程池应该设置多少进程?
对于CPU密集型任务,进程数设置为CPU核心数(使用cpu_count()获取)。对于I/O密集型任务,可以设置为2倍CPU核心数或更多,因为进程在等待I/O时会释放CPU。
map和apply_async应该如何选择?
如果需要保持结果顺序且任务独立,使用map()。如果需要立即处理完成的任务或使用回调,使用apply_async()。map()会等待所有任务完成才返回,apply_async()可以逐个处理结果。
本文涉及AI创作
内容由AI创作,请仔细甄别