pin_drop当前位置:知识文库 ❯ 图文

进程间通信Queue详解 - Python生产者消费者模式

概述

在多进程编程中,由于每个进程拥有独立的内存空间,进程之间无法直接共享变量。multiprocessing.Queue 是Python提供的进程安全队列,专门用于在多个进程之间传递数据。

Queue底层使用管道和锁机制实现,支持多个生产者和多个消费者同时操作,保证数据的线程安全和进程安全。它是实现生产者-消费者模式的核心工具。


语法

代码示例

from multiprocessing import Process, Queue

# 创建队列
q = Queue(maxsize=0)

# 参数说明:
# maxsize: 队列最大容量,0表示无限制

# 常用方法:
# q.put(item, block=True, timeout=None)  — 放入元素
# q.get(block=True, timeout=None)        — 取出元素
# q.qsize()                              — 队列当前大小
# q.empty()                              — 判断是否为空
# q.full()                               — 判断是否已满
# q.close()                              — 关闭队列

# JoinableQueue 额外方法:
# q.task_done()                          — 标记一个任务完成
# q.join()                               — 等待所有任务完成

基本用法

put 放入数据

代码示例

from multiprocessing import Queue

q = Queue()

# 基本用法
q.put("消息1")
q.put("消息2")
q.put(100)               # 可以放入任意可序列化对象
q.put({"key": "value"})

# 阻塞模式(队列满时等待)
q.put("数据", block=True, timeout=5)  # 5秒超时

# 非阻塞模式
try:
    q.put("数据", block=False)
except:
    print("队列已满")

get 取出数据

代码示例

from multiprocessing import Queue

q = Queue()
q.put("第一条消息")
q.put("第二条消息")

# 基本用法(阻塞等待)
msg = q.get()
print(msg)  # 输出: 第一条消息

# 带超时的 get
try:
    msg = q.get(timeout=3)
    print(msg)
except:
    print("超时,未获取到数据")

# 非阻塞 get
try:
    msg = q.get(block=False)
    print(msg)
except:
    print("队列为空")

代码示例

示例1:生产者-消费者模式

代码示例

from multiprocessing import Process, Queue
import time
import random

def producer(queue, items):
    """生产者:生产数据并放入队列"""
    for item in items:
        print(f"生产: {item}")
        queue.put(item)
        time.sleep(random.uniform(0.1, 0.5))
    queue.put(None)  # 发送结束信号

def consumer(queue):
    """消费者:从队列取出数据并处理"""
    while True:
        item = queue.get()
        if item is None:  # 收到结束信号
            break
        print(f"消费: {item}")
        time.sleep(random.uniform(0.2, 0.8))
    print("消费者结束")

if __name__ == '__main__':
    q = Queue()
    
    p = Process(target=producer, args=(q, ['苹果', '香蕉', '橙子', '葡萄']))
    c = Process(target=consumer, args=(q,))
    
    c.start()
    p.start()
    
    p.join()
    c.join()
    
    print("生产者-消费者模式执行完毕")

示例2:多生产者-多消费者

代码示例

from multiprocessing import Process, Queue
import time
import random

def producer(queue, name, count):
    for i in range(count):
        item = f"{name}-产品{i}"
        queue.put(item)
        print(f"[{name}] 生产了 {item}")
        time.sleep(random.uniform(0.1, 0.3))

def consumer(queue, name):
    while True:
        try:
            item = queue.get(timeout=2)
            print(f"[{name}] 消费了 {item}")
            time.sleep(random.uniform(0.1, 0.5))
        except:
            print(f"[{name}] 队列为空,退出")
            break

if __name__ == '__main__':
    q = Queue(maxsize=10)
    
    producers = [
        Process(target=producer, args=(q, f'工厂{i}', 5))
        for i in range(2)
    ]
    consumers = [
        Process(target=consumer, args=(q, f'消费者{i}'))
        for i in range(3)
    ]
    
    for c in consumers:
        c.start()
    for p in producers:
        p.start()
    
    for p in producers:
        p.join()
    for c in consumers:
        c.join()
对比项 Queue Pipe
通信方向 单向(FIFO) 双向
多进程支持 支持多生产者、多消费者 适合两个进程间通信
数据安全 进程安全(内置锁) 非完全安全
适用场景 任务分发、消息队列 一对一双向通信
性能 稍慢(额外锁开销) 更快(轻量级)

注意事项

注意1:Queue中传递的对象会被pickle序列化和反序列化。无法传递不可序列化的对象(如文件句柄、套接字等)。

注意2:消费者进程需要知道何时停止。通常使用 None 作为结束哨兵值,每个消费者都需要收到一个结束信号。

注意3:当设置了 maxsize 时,put操作可能阻塞。建议合理设置队列大小,或使用超时机制避免死锁。


小结

  • multiprocessing.Queue:进程安全的FIFO队列,支持多生产者、多消费者

  • put/get:put放入数据,get取出数据,均支持阻塞/非阻塞/超时模式

  • 生产者-消费者:经典并发模式,Queue实现解耦和负载均衡

  • JoinableQueue:支持task_done/join,便于追踪任务完成状态


练习题

练习1

编写程序,使用multiprocessing.Queue实现一个简单的日志系统:主进程收集用户输入的消息,通过Queue发送给日志子进程,日志子进程将消息格式化后打印。支持优雅的退出机制。

练习2

编写一个函数,使用JoinableQueue实现一个URL爬虫任务分发器:主进程将待抓取的URL放入队列,3个爬虫子进程并发抓取(模拟延迟),使用task_done标记每个URL处理完成,主进程使用join等待所有URL抓取完毕。

常见问题

Queue和Pipe有什么区别?应该选哪个?

Queue是FIFO队列,支持多生产者多消费者,内置锁保证安全,适合任务分发。Pipe是双向管道,只适合两个进程间通信,性能更高但不支持多对多。多数场景选择Queue更安全方便;如果是严格的一对一通信且追求性能,选择Pipe。

如何避免消费者进程永远阻塞在get()上?

有三种方式:1)使用sentinel值(如None)作为结束信号;2)使用get(timeout=N)设置超时;3)将消费者设为守护进程(daemon=True)。推荐方式1。

Queue能传递哪些类型的数据?

Queue可以传递所有可被pickle序列化的Python对象,包括基本数据类型、列表、字典、自定义类的实例等。不能传递不可序列化的对象如文件句柄、数据库连接、线程锁等。

标签: 进程通信 Queue 生产者消费者 多进程 JoinableQueue Python教程

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: multiprocessing模块详解 - Python多进程编程指南 下一篇: 进程间通信Pipe详解 - Python双向管道send/recv

poll相关推荐