pin_drop当前位置:知识文库 ❯ 图文

线程池ThreadPoolExecutor详解 - Python并发编程指南

ThreadPoolExecutor概述

ThreadPoolExecutor 是 Python concurrent.futures 模块提供的高级线程池实现。它简化了多线程编程,自动管理线程的创建、分配和回收,让开发者专注于任务本身。

线程池的核心优势:

  • 资源复用:线程可重复使用,避免频繁创建销毁的开销

  • 并发控制:限制同时运行的线程数量,防止资源耗尽

  • 任务管理:提供submit/map接口,支持异步执行和结果获取

  • 简化编程:隐藏线程管理细节,代码更简洁易维护


创建线程池

创建线程池有两种方式:

方式一:使用with语句(推荐)

代码示例

from concurrent.futures import ThreadPoolExecutor

# 创建固定大小的线程池
with ThreadPoolExecutor(max_workers=5) as executor:
    # 提交任务
    future = executor.submit(task_func, arg1, arg2)
    # with块结束时自动关闭线程池

方式二:手动管理

代码示例

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=5)
try:
    future = executor.submit(task_func, arg1, arg2)
finally:
    executor.shutdown(wait=True)  # 等待所有任务完成

ThreadPoolExecutor参数:

  • max_workers:最大工作线程数,默认为CPU核心数*5

  • thread_name_prefix:线程名称前缀,便于调试


submit方法

submit() 方法用于提交单个任务到线程池:

代码示例

from concurrent.futures import ThreadPoolExecutor
import time

def task(name, duration):
    print(f"任务 {name} 开始执行")
    time.sleep(duration)
    print(f"任务 {name} 执行完成")
    return f"结果-{name}"

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务
    future1 = executor.submit(task, "A", 2)
    future2 = executor.submit(task, "B", 1)
    future3 = executor.submit(task, "C", 3)
    
    # 获取结果(阻塞等待)
    result1 = future1.result()
    result2 = future2.result()
    result3 = future3.result()
    
    print(f"所有结果: {result1}, {result2}, {result3}")

submit()的特点:

  • 异步执行:立即返回Future对象,不阻塞主线程

  • 灵活参数:支持位置参数和关键字参数

  • 结果获取:通过Future.result()获取返回值


map方法

map() 方法用于批量提交相同函数的多个参数组合:

代码示例

from concurrent.futures import ThreadPoolExecutor
import time

def square(n):
    time.sleep(0.5)
    return n * n

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

with ThreadPoolExecutor(max_workers=5) as executor:
    # 批量提交任务
    results = executor.map(square, numbers)
    
    # results是迭代器,按提交顺序返回结果
    for num, result in zip(numbers, results):
        print(f"{num}^2 = {result}")

map()与submit()的区别:

特性 submit() map()
使用场景 不同函数、不同参数 相同函数、多个参数
返回值 Future对象 结果迭代器
结果顺序 按Future获取 按提交顺序
灵活性 高(可单独控制) 低(批量处理)

Future对象

Future对象代表异步执行的结果,提供以下方法:

代码示例

from concurrent.futures import ThreadPoolExecutor
import time

def task(duration):
    time.sleep(duration)
    return f"完成-{duration}"

with ThreadPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, 2)
    
    # 检查任务状态
    print(f"是否运行中: {future.running()}")
    print(f"是否完成: {future.done()}")
    
    # 等待结果(可设置超时)
    result = future.result(timeout=5)
    print(f"结果: {result}")
    
    # 添加回调函数
    future2 = executor.submit(task, 1)
    future2.add_done_callback(lambda f: print(f"回调: {f.result()}"))

Future常用方法:

  • result(timeout=None):获取结果,可设置超时

  • done():检查任务是否完成

  • running():检查任务是否正在运行

  • cancel():尝试取消任务

  • add_done_callback(fn):添加完成回调函数


代码示例

示例1:批量下载文件

代码示例

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def download_file(url):
    """模拟文件下载"""
    print(f"开始下载: {url}")
    time.sleep(random.uniform(1, 3))
    print(f"下载完成: {url}")
    return f"下载成功-{url}"

urls = [
    "https://example.com/file1.zip",
    "https://example.com/file2.zip",
    "https://example.com/file3.zip",
    "https://example.com/file4.zip",
    "https://example.com/file5.zip",
]

# 使用线程池并发下载
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交所有任务
    future_to_url = {executor.submit(download_file, url): url for url in urls}
    
    # 使用as_completed按完成顺序处理结果
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            result = future.result()
            print(f"{url}: {result}")
        except Exception as e:
            print(f"{url} 下载失败: {e}")

示例2:数据处理管道

代码示例

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_data(id):
    """获取数据"""
    time.sleep(0.5)
    return {"id": id, "value": id * 10}

def process_data(data):
    """处理数据"""
    time.sleep(0.3)
    data["processed"] = data["value"] * 2
    return data

def save_data(data):
    """保存数据"""
    time.sleep(0.2)
    print(f"保存数据: {data}")
    return True

# 数据处理管道
with ThreadPoolExecutor(max_workers=5) as executor:
    # 阶段1: 获取数据
    data_futures = [executor.submit(fetch_data, i) for i in range(10)]
    
    # 阶段2: 处理数据
    process_futures = []
    for future in data_futures:
        data = future.result()
        process_futures.append(executor.submit(process_data, data))
    
    # 阶段3: 保存数据
    save_futures = []
    for future in process_futures:
        data = future.result()
        save_futures.append(executor.submit(save_data, data))
    
    # 等待所有保存完成
    results = [f.result() for f in save_futures]
    print(f"所有数据保存完成: {sum(results)} 条")

示例3:带超时的任务执行

代码示例

from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def slow_task(duration):
    time.sleep(duration)
    return f"任务完成-{duration}"

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [
        executor.submit(slow_task, 1),
        executor.submit(slow_task, 2),
        executor.submit(slow_task, 5),  # 这个会超时
    ]
    
    for i, future in enumerate(futures):
        try:
            result = future.result(timeout=3)
            print(f"任务{i}: {result}")
        except TimeoutError:
            print(f"任务{i}: 执行超时")
        except Exception as e:
            print(f"任务{i}: 执行异常 - {e}")

注意事项

注意1:合理设置max_workers。IO密集型任务可设置较大值(如CPU核心数*5),CPU密集型任务应设置为CPU核心数。

注意2:使用with语句管理线程池,确保线程池正确关闭。手动管理时必须调用shutdown()。

注意3:result()方法会阻塞等待结果。如需非阻塞获取,使用as_completed()或add_done_callback()。

注意4:异常处理很重要。任务中的异常会在调用result()时抛出,需要用try-except捕获。

注意5:避免在线程池中提交过多任务导致内存占用过高。可以使用队列或分批提交。


小结

  • ThreadPoolExecutor作用:提供高级线程池管理,简化并发编程

  • 核心方法:submit()提交单个任务,map()批量提交,Future管理结果

  • 典型应用:批量下载、数据处理管道、并发IO操作

  • 最佳实践:使用with语句管理,合理设置线程数,正确处理异常


练习题

练习1

实现一个并发URL检查器,使用线程池同时检查多个URL的可访问性,返回每个URL的状态码和响应时间。

练习2

实现一个图片处理管道,使用线程池并发读取、处理和保存图片,支持进度显示和错误处理。

常见问题

ThreadPoolExecutor和直接创建Thread有什么区别?

ThreadPoolExecutor自动管理线程生命周期,复用线程减少开销,提供高级API简化编程。直接创建Thread需要手动管理线程,每次都要创建新线程,代码更复杂。

如何获取任务的执行进度?

可以使用as_completed()按完成顺序处理结果,或使用add_done_callback()添加回调函数。也可以在任务内部更新共享的进度计数器(需加锁保护)。

线程池中的任务可以取消吗?

可以调用Future.cancel()尝试取消。但如果任务已经开始执行,cancel()会返回False且无法取消。只有尚未开始执行的任务才能被取消。

如何设置线程池的最大任务数?

ThreadPoolExecutor本身不限制任务数量,但可以通过信号量(Semaphore)或队列控制并发提交的任务数,防止内存溢出。

标签: ThreadPoolExecutor 线程池 submit/map Future对象 并发编程 异步执行

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python Condition条件变量详解 - 生产者消费者模式实现 下一篇: multiprocessing模块详解 - Python多进程编程指南

poll相关推荐