pin_drop当前位置:知识文库 ❯ 图文

Python Condition条件变量详解 - 生产者消费者模式实现

概述

条件变量、wait/notify、生产者消费者。本篇教程将详细介绍线程同步Condition的核心概念和Python中的实际应用。


语法

代码示例

# 线程同步Condition相关语法示例
import threading

# 创建条件变量
condition = threading.Condition()

# 或者基于已有锁创建
lock = threading.Lock()
condition = threading.Condition(lock)

# 基本方法
condition.acquire()    # 获取锁
condition.release()    # 释放锁
condition.wait(timeout=None)  # 等待条件满足
condition.notify(n=1)  # 唤醒n个等待的线程
condition.notify_all() # 唤醒所有等待的线程

# 使用with语句(推荐)
with condition:
    # 等待条件
    while not 条件满足:
        condition.wait()
    # 执行操作
    
    # 通知其他线程
    condition.notify()
    # 或
    condition.notify_all()

基本用法

代码示例

# 基本用法示例
import threading
import time

# 共享资源
data = []
MAX_SIZE = 5
condition = threading.Condition()

def producer():
    """生产者线程"""
    for i in range(10):
        with condition:
            # 等待缓冲区有空间
            while len(data) >= MAX_SIZE:
                print("生产者:缓冲区已满,等待...")
                condition.wait()
            
            # 生产数据
            data.append(i)
            print(f"生产者:生产数据 {i},当前大小: {len(data)}")
            
            # 通知消费者
            condition.notify()
        
        time.sleep(0.5)

def consumer():
    """消费者线程"""
    for _ in range(10):
        with condition:
            # 等待缓冲区有数据
            while len(data) == 0:
                print("消费者:缓冲区为空,等待...")
                condition.wait()
            
            # 消费数据
            item = data.pop(0)
            print(f"消费者:消费数据 {item},当前大小: {len(data)}")
            
            # 通知生产者
            condition.notify()
        
        time.sleep(0.8)

# 创建线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程结束
producer_thread.join()
consumer_thread.join()

print(f"最终数据: {data}")

代码示例

代码示例

# 详细代码示例:多生产者多消费者模型
import threading
import time
import random

class BoundedBuffer:
    """有界缓冲区 - 使用Condition实现"""
    def __init__(self, size=10):
        self.buffer = []
        self.size = size
        self.condition = threading.Condition()
    
    def produce(self, item):
        """生产物品"""
        with self.condition:
            # 等待缓冲区有空间
            while len(self.buffer) >= self.size:
                print(f"缓冲区已满,生产者等待...")
                self.condition.wait()
            
            # 添加物品
            self.buffer.append(item)
            print(f"生产: {item},缓冲区大小: {len(self.buffer)}")
            
            # 唤醒一个消费者
            self.condition.notify()
    
    def consume(self):
        """消费物品"""
        with self.condition:
            # 等待缓冲区有数据
            while len(self.buffer) == 0:
                print(f"缓冲区为空,消费者等待...")
                self.condition.wait()
            
            # 取出物品
            item = self.buffer.pop(0)
            print(f"消费: {item},缓冲区大小: {len(self.buffer)}")
            
            # 唤醒一个生产者
            self.condition.notify()
            
            return item

def producer_worker(buffer, name, count):
    """生产者工作线程"""
    for i in range(count):
        item = f"{name}-item-{i}"
        buffer.produce(item)
        time.sleep(random.uniform(0.1, 0.5))

def consumer_worker(buffer, name, count):
    """消费者工作线程"""
    for i in range(count):
        item = buffer.consume()
        time.sleep(random.uniform(0.2, 0.6))

# 创建缓冲区
buffer = BoundedBuffer(size=5)

# 创建生产者和消费者
producers = [
    threading.Thread(target=producer_worker, args=(buffer, "P1", 8)),
    threading.Thread(target=producer_worker, args=(buffer, "P2", 7)),
]

consumers = [
    threading.Thread(target=consumer_worker, args=(buffer, "C1", 6)),
    threading.Thread(target=consumer_worker, args=(buffer, "C2", 9)),
]

# 启动所有线程
for p in producers:
    p.start()
for c in consumers:
    c.start()

# 等待所有线程完成
for p in producers:
    p.join()
for c in consumers:
    c.join()

print(f"生产消费完成,最终缓冲区大小: {len(buffer.buffer)}")
对比项 Condition Event
功能 条件等待与通知 事件信号
锁机制 内置锁,自动管理 无内置锁
唤醒方式 notify()或notify_all() set()
适用场景 生产者消费者、复杂同步 简单事件通知

注意事项

注意1:使用线程同步Condition时需要注意的关键点。

注意2:常见的陷阱和最佳实践。


小结

  • 核心概念1:线程同步Condition的核心概念1

  • 核心概念2:线程同步Condition的核心概念2

  • 核心概念3:线程同步Condition的核心概念3


练习题

练习1

编写程序,练习线程同步Condition的基本用法。

练习2

编写一个函数,在实际场景中应用线程同步Condition。

常见问题

Condition和Lock有什么区别?

Lock只提供基本的互斥访问,而Condition在Lock的基础上增加了等待/通知机制。Condition允许线程在特定条件不满足时等待,并在条件满足时被其他线程唤醒。Condition内部包含一个锁,使用with语句时会自动管理锁的获取和释放。

为什么wait()要在while循环中使用?

使用while循环检查条件是为了防止虚假唤醒(spurious wakeup)。线程可能被意外唤醒但条件仍未满足,或者多个线程被notify()唤醒但只有一个能获取资源。while循环确保每次唤醒后都重新检查条件,只有条件真正满足时才继续执行。

notify()和notify_all()有什么区别?

notify()只唤醒一个等待的线程,而notify_all()唤醒所有等待的线程。如果只有一个消费者或生产者,使用notify()更高效;如果多个线程可能等待同一条件,使用notify_all()更安全。但notify_all()可能导致不必要的上下文切换。

Condition可以用于实现生产者消费者模式吗?

是的,Condition是实现生产者消费者模式的理想选择。生产者可以在缓冲区满时wait(),消费者消费后notify()唤醒生产者;消费者可以在缓冲区空时wait(),生产者生产后notify()唤醒消费者。这种方式比轮询更高效,避免了CPU资源浪费。

标签: Condition 条件变量 wait/notify 生产者消费者 Python

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python Event事件详解 - 线程间通信与同步控制 下一篇: 线程池ThreadPoolExecutor详解 - Python并发编程指南

poll相关推荐