pin_drop当前位置:知识文库 ❯ 图文
Python Condition条件变量详解 - 生产者消费者模式实现
概述
条件变量、wait/notify、生产者消费者。本篇教程将详细介绍线程同步Condition的核心概念和Python中的实际应用。
语法
代码示例
# 线程同步Condition相关语法示例
import threading
# 创建条件变量
condition = threading.Condition()
# 或者基于已有锁创建
lock = threading.Lock()
condition = threading.Condition(lock)
# 基本方法
condition.acquire() # 获取锁
condition.release() # 释放锁
condition.wait(timeout=None) # 等待条件满足
condition.notify(n=1) # 唤醒n个等待的线程
condition.notify_all() # 唤醒所有等待的线程
# 使用with语句(推荐)
with condition:
# 等待条件
while not 条件满足:
condition.wait()
# 执行操作
# 通知其他线程
condition.notify()
# 或
condition.notify_all()基本用法
代码示例
# 基本用法示例
import threading
import time
# 共享资源
data = []
MAX_SIZE = 5
condition = threading.Condition()
def producer():
"""生产者线程"""
for i in range(10):
with condition:
# 等待缓冲区有空间
while len(data) >= MAX_SIZE:
print("生产者:缓冲区已满,等待...")
condition.wait()
# 生产数据
data.append(i)
print(f"生产者:生产数据 {i},当前大小: {len(data)}")
# 通知消费者
condition.notify()
time.sleep(0.5)
def consumer():
"""消费者线程"""
for _ in range(10):
with condition:
# 等待缓冲区有数据
while len(data) == 0:
print("消费者:缓冲区为空,等待...")
condition.wait()
# 消费数据
item = data.pop(0)
print(f"消费者:消费数据 {item},当前大小: {len(data)}")
# 通知生产者
condition.notify()
time.sleep(0.8)
# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
# 启动线程
producer_thread.start()
consumer_thread.start()
# 等待线程结束
producer_thread.join()
consumer_thread.join()
print(f"最终数据: {data}")代码示例
代码示例
# 详细代码示例:多生产者多消费者模型
import threading
import time
import random
class BoundedBuffer:
"""有界缓冲区 - 使用Condition实现"""
def __init__(self, size=10):
self.buffer = []
self.size = size
self.condition = threading.Condition()
def produce(self, item):
"""生产物品"""
with self.condition:
# 等待缓冲区有空间
while len(self.buffer) >= self.size:
print(f"缓冲区已满,生产者等待...")
self.condition.wait()
# 添加物品
self.buffer.append(item)
print(f"生产: {item},缓冲区大小: {len(self.buffer)}")
# 唤醒一个消费者
self.condition.notify()
def consume(self):
"""消费物品"""
with self.condition:
# 等待缓冲区有数据
while len(self.buffer) == 0:
print(f"缓冲区为空,消费者等待...")
self.condition.wait()
# 取出物品
item = self.buffer.pop(0)
print(f"消费: {item},缓冲区大小: {len(self.buffer)}")
# 唤醒一个生产者
self.condition.notify()
return item
def producer_worker(buffer, name, count):
"""生产者工作线程"""
for i in range(count):
item = f"{name}-item-{i}"
buffer.produce(item)
time.sleep(random.uniform(0.1, 0.5))
def consumer_worker(buffer, name, count):
"""消费者工作线程"""
for i in range(count):
item = buffer.consume()
time.sleep(random.uniform(0.2, 0.6))
# 创建缓冲区
buffer = BoundedBuffer(size=5)
# 创建生产者和消费者
producers = [
threading.Thread(target=producer_worker, args=(buffer, "P1", 8)),
threading.Thread(target=producer_worker, args=(buffer, "P2", 7)),
]
consumers = [
threading.Thread(target=consumer_worker, args=(buffer, "C1", 6)),
threading.Thread(target=consumer_worker, args=(buffer, "C2", 9)),
]
# 启动所有线程
for p in producers:
p.start()
for c in consumers:
c.start()
# 等待所有线程完成
for p in producers:
p.join()
for c in consumers:
c.join()
print(f"生产消费完成,最终缓冲区大小: {len(buffer.buffer)}")注意事项
注意1:使用线程同步Condition时需要注意的关键点。
注意2:常见的陷阱和最佳实践。
小结
-
核心概念1:线程同步Condition的核心概念1
-
核心概念2:线程同步Condition的核心概念2
-
核心概念3:线程同步Condition的核心概念3
练习题
练习1
编写程序,练习线程同步Condition的基本用法。
练习2
编写一个函数,在实际场景中应用线程同步Condition。
常见问题
Condition和Lock有什么区别?
Lock只提供基本的互斥访问,而Condition在Lock的基础上增加了等待/通知机制。Condition允许线程在特定条件不满足时等待,并在条件满足时被其他线程唤醒。Condition内部包含一个锁,使用with语句时会自动管理锁的获取和释放。
为什么wait()要在while循环中使用?
使用while循环检查条件是为了防止虚假唤醒(spurious wakeup)。线程可能被意外唤醒但条件仍未满足,或者多个线程被notify()唤醒但只有一个能获取资源。while循环确保每次唤醒后都重新检查条件,只有条件真正满足时才继续执行。
notify()和notify_all()有什么区别?
notify()只唤醒一个等待的线程,而notify_all()唤醒所有等待的线程。如果只有一个消费者或生产者,使用notify()更高效;如果多个线程可能等待同一条件,使用notify_all()更安全。但notify_all()可能导致不必要的上下文切换。
Condition可以用于实现生产者消费者模式吗?
是的,Condition是实现生产者消费者模式的理想选择。生产者可以在缓冲区满时wait(),消费者消费后notify()唤醒生产者;消费者可以在缓冲区空时wait(),生产者生产后notify()唤醒消费者。这种方式比轮询更高效,避免了CPU资源浪费。
本文涉及AI创作
内容由AI创作,请仔细甄别