pin_drop当前位置:知识文库 ❯ 图文
进程间通信Queue详解 - Python生产者消费者模式
概述
在多进程编程中,由于每个进程拥有独立的内存空间,进程之间无法直接共享变量。multiprocessing.Queue 是Python提供的进程安全队列,专门用于在多个进程之间传递数据。
Queue底层使用管道和锁机制实现,支持多个生产者和多个消费者同时操作,保证数据的线程安全和进程安全。它是实现生产者-消费者模式的核心工具。
语法
代码示例
from multiprocessing import Process, Queue
# 创建队列
q = Queue(maxsize=0)
# 参数说明:
# maxsize: 队列最大容量,0表示无限制
# 常用方法:
# q.put(item, block=True, timeout=None) — 放入元素
# q.get(block=True, timeout=None) — 取出元素
# q.qsize() — 队列当前大小
# q.empty() — 判断是否为空
# q.full() — 判断是否已满
# q.close() — 关闭队列
# JoinableQueue 额外方法:
# q.task_done() — 标记一个任务完成
# q.join() — 等待所有任务完成基本用法
put 放入数据
代码示例
from multiprocessing import Queue
q = Queue()
# 基本用法
q.put("消息1")
q.put("消息2")
q.put(100) # 可以放入任意可序列化对象
q.put({"key": "value"})
# 阻塞模式(队列满时等待)
q.put("数据", block=True, timeout=5) # 5秒超时
# 非阻塞模式
try:
q.put("数据", block=False)
except:
print("队列已满")get 取出数据
代码示例
from multiprocessing import Queue
q = Queue()
q.put("第一条消息")
q.put("第二条消息")
# 基本用法(阻塞等待)
msg = q.get()
print(msg) # 输出: 第一条消息
# 带超时的 get
try:
msg = q.get(timeout=3)
print(msg)
except:
print("超时,未获取到数据")
# 非阻塞 get
try:
msg = q.get(block=False)
print(msg)
except:
print("队列为空")代码示例
示例1:生产者-消费者模式
代码示例
from multiprocessing import Process, Queue
import time
import random
def producer(queue, items):
"""生产者:生产数据并放入队列"""
for item in items:
print(f"生产: {item}")
queue.put(item)
time.sleep(random.uniform(0.1, 0.5))
queue.put(None) # 发送结束信号
def consumer(queue):
"""消费者:从队列取出数据并处理"""
while True:
item = queue.get()
if item is None: # 收到结束信号
break
print(f"消费: {item}")
time.sleep(random.uniform(0.2, 0.8))
print("消费者结束")
if __name__ == '__main__':
q = Queue()
p = Process(target=producer, args=(q, ['苹果', '香蕉', '橙子', '葡萄']))
c = Process(target=consumer, args=(q,))
c.start()
p.start()
p.join()
c.join()
print("生产者-消费者模式执行完毕")示例2:多生产者-多消费者
代码示例
from multiprocessing import Process, Queue
import time
import random
def producer(queue, name, count):
for i in range(count):
item = f"{name}-产品{i}"
queue.put(item)
print(f"[{name}] 生产了 {item}")
time.sleep(random.uniform(0.1, 0.3))
def consumer(queue, name):
while True:
try:
item = queue.get(timeout=2)
print(f"[{name}] 消费了 {item}")
time.sleep(random.uniform(0.1, 0.5))
except:
print(f"[{name}] 队列为空,退出")
break
if __name__ == '__main__':
q = Queue(maxsize=10)
producers = [
Process(target=producer, args=(q, f'工厂{i}', 5))
for i in range(2)
]
consumers = [
Process(target=consumer, args=(q, f'消费者{i}'))
for i in range(3)
]
for c in consumers:
c.start()
for p in producers:
p.start()
for p in producers:
p.join()
for c in consumers:
c.join()注意事项
注意1:Queue中传递的对象会被pickle序列化和反序列化。无法传递不可序列化的对象(如文件句柄、套接字等)。
注意2:消费者进程需要知道何时停止。通常使用
None作为结束哨兵值,每个消费者都需要收到一个结束信号。
注意3:当设置了
maxsize时,put操作可能阻塞。建议合理设置队列大小,或使用超时机制避免死锁。
小结
-
multiprocessing.Queue:进程安全的FIFO队列,支持多生产者、多消费者
-
put/get:put放入数据,get取出数据,均支持阻塞/非阻塞/超时模式
-
生产者-消费者:经典并发模式,Queue实现解耦和负载均衡
-
JoinableQueue:支持task_done/join,便于追踪任务完成状态
练习题
练习1
编写程序,使用multiprocessing.Queue实现一个简单的日志系统:主进程收集用户输入的消息,通过Queue发送给日志子进程,日志子进程将消息格式化后打印。支持优雅的退出机制。
练习2
编写一个函数,使用JoinableQueue实现一个URL爬虫任务分发器:主进程将待抓取的URL放入队列,3个爬虫子进程并发抓取(模拟延迟),使用task_done标记每个URL处理完成,主进程使用join等待所有URL抓取完毕。
常见问题
Queue和Pipe有什么区别?应该选哪个?
Queue是FIFO队列,支持多生产者多消费者,内置锁保证安全,适合任务分发。Pipe是双向管道,只适合两个进程间通信,性能更高但不支持多对多。多数场景选择Queue更安全方便;如果是严格的一对一通信且追求性能,选择Pipe。
如何避免消费者进程永远阻塞在get()上?
有三种方式:1)使用sentinel值(如None)作为结束信号;2)使用get(timeout=N)设置超时;3)将消费者设为守护进程(daemon=True)。推荐方式1。
Queue能传递哪些类型的数据?
Queue可以传递所有可被pickle序列化的Python对象,包括基本数据类型、列表、字典、自定义类的实例等。不能传递不可序列化的对象如文件句柄、数据库连接、线程锁等。
本文涉及AI创作
内容由AI创作,请仔细甄别