pin_drop当前位置:知识文库 ❯ 图文

Python进程池Pool完全指南:apply/map/apply_async实战详解 - 小确幸生活

概述

multiprocessing.Pool 是Python提供的进程池实现,用于管理和复用一组预创建的进程。通过进程池,我们可以避免频繁创建和销毁进程带来的系统开销,从而高效地实现并行计算。

进程池的核心价值在于资源复用任务调度。当你有一批相互独立的任务需要并行处理时,进程池会自动将任务分配给空闲的进程,处理完成后回收进程以供后续任务使用。


语法

代码示例

from multiprocessing import Pool, cpu_count

# 创建进程池
pool = Pool(processes=4)

# 参数说明:
# processes: 进程数量,默认为CPU核心数

# 常用方法:
# pool.apply(func, args)           — 同步执行函数
# pool.apply_async(func, args)     — 异步执行函数
# pool.map(func, iterable)         — 批量映射(同步)
# pool.map_async(func, iterable)   — 批量映射(异步)
# pool.close()                     — 关闭进程池,不再接受新任务
# pool.join()                      — 等待所有进程完成
# pool.terminate()                 — 立即终止所有进程

# 推荐使用上下文管理器
with Pool(processes=4) as pool:
    results = pool.map(func, iterable)

基本用法

map 批量映射

代码示例

from multiprocessing import Pool
import time

def square(x):
    """计算平方"""
    time.sleep(0.5)
    return x * x

if __name__ == '__main__':
    numbers = list(range(10))
    
    with Pool(processes=4) as pool:
        # map 自动将任务分配给进程池
        results = pool.map(square, numbers)
        print(f"计算结果: {results}")
        # 输出: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

apply_async 异步执行

代码示例

from multiprocessing import Pool
import time

def task(name, duration):
    print(f"任务 {name} 开始")
    time.sleep(duration)
    print(f"任务 {name} 完成")
    return f"{name} 的结果"

if __name__ == '__main__':
    with Pool(processes=3) as pool:
        # 异步提交多个任务
        result1 = pool.apply_async(task, args=("A", 2))
        result2 = pool.apply_async(task, args=("B", 1))
        result3 = pool.apply_async(task, args=("C", 3))
        
        # 获取结果(会阻塞直到完成)
        print(result1.get())
        print(result2.get())
        print(result3.get())

代码示例

示例1:批量数据处理

代码示例

from multiprocessing import Pool
import time

def process_data(data):
    """处理单个数据项"""
    result = sum(data) * 2
    time.sleep(0.1)  # 模拟处理时间
    return result

if __name__ == '__main__':
    # 准备数据
    datasets = [[i, i+1, i+2] for i in range(20)]
    
    start = time.time()
    
    with Pool(processes=4) as pool:
        results = pool.map(process_data, datasets)
    
    print(f"处理结果: {results[:5]}...")
    print(f"总耗时: {time.time() - start:.2f} 秒")

示例2:带回调的异步处理

代码示例

from multiprocessing import Pool
import time

def compute(n):
    """计算任务"""
    time.sleep(0.5)
    return n * n

def callback(result):
    """回调函数"""
    print(f"任务完成,结果: {result}")

if __name__ == '__main__':
    with Pool(processes=3) as pool:
        for i in range(5):
            pool.apply_async(compute, args=(i,), callback=callback)
        
        # 关闭进程池,等待所有任务完成
        pool.close()
        pool.join()
    
    print("所有任务已完成")
方法 类型 用途
apply 同步 执行单个任务,阻塞等待结果
apply_async 异步 执行单个任务,立即返回AsyncResult
map 同步 批量映射,保持结果顺序
map_async 异步 批量映射,立即返回AsyncResult

注意事项

注意1:使用完毕后必须调用 close()join(),或使用 with 语句自动管理。

注意2:进程数不应超过CPU核心数。过多进程会导致上下文切换开销增加,反而降低性能。

注意3:map() 会保持结果顺序,但会等待所有任务完成。如果需要立即处理完成的任务,使用 apply_async() 配合回调。


小结

  • Pool:进程池,复用一组进程处理多个任务

  • map:批量映射,自动分配任务,保持结果顺序

  • apply_async:异步执行,支持回调函数

  • 资源管理:使用 with 语句或 close()+join() 管理生命周期


练习题

练习1

编写程序,使用进程池并行计算1到100每个数字的平方,比较单线程和多进程(4个进程)的执行时间差异。

练习2

编写一个函数,使用进程池实现批量图片处理(模拟):给定一组图片文件名,使用进程池并发对每个图片进行"压缩"操作(模拟处理延迟),统计总耗时。

常见问题

Pool和Process有什么区别?

Process是单个进程的低级别控制,需要手动管理start/join。Pool是进程池的高级封装,自动管理一组工作进程,提供map/apply_async等方法批量分配任务,使用更简洁。

进程池应该设置多少进程?

对于CPU密集型任务,进程数设置为CPU核心数(使用cpu_count()获取)。对于I/O密集型任务,可以设置为2倍CPU核心数或更多,因为进程在等待I/O时会释放CPU。

map和apply_async应该如何选择?

如果需要保持结果顺序且任务独立,使用map()。如果需要立即处理完成的任务或使用回调,使用apply_async()。map()会等待所有任务完成才返回,apply_async()可以逐个处理结果。

标签: 进程池 Pool 并行计算 多进程 Python教程

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: 进程间通信Pipe详解 - Python双向管道send/recv 下一篇: Python GIL全局解释器锁详解:原理影响与突破方案 - 小确幸生活

poll相关推荐