pin_drop当前位置:知识文库 ❯ 图文

Python Event事件详解 - 线程间通信与同步控制

概述

事件通知、set/wait、线程间通信。本篇教程将详细介绍线程同步Event的核心概念和Python中的实际应用。


语法

代码示例

# 线程同步Event相关语法示例
import threading

# 创建事件对象
event = threading.Event()

# 设置事件(标志位设为True)
event.set()

# 清除事件(标志位设为False)
event.clear()

# 等待事件(阻塞直到标志位为True)
event.wait()           # 无限等待
event.wait(timeout=5)  # 带超时等待

# 检查事件状态
event.is_set()  # 返回True或False

# 基本使用模式
def worker(event):
    print("等待事件...")
    event.wait()  # 阻塞直到event被set
    print("事件已触发,继续执行")

# 创建线程
t = threading.Thread(target=worker, args=(event,))
t.start()

# 主线程设置事件
event.set()  # 唤醒等待的线程

基本用法

代码示例

# 基本用法示例
import threading
import time

# 创建事件对象
start_event = threading.Event()
stop_event = threading.Event()

def worker(name):
    """工作线程"""
    print(f"[{name}] 启动,等待开始信号...")
    start_event.wait()  # 等待开始信号
    
    print(f"[{name}] 开始工作")
    while not stop_event.is_set():
        print(f"[{name}] 工作中...")
        time.sleep(1)
    
    print(f"[{name}] 收到停止信号,退出")

# 创建多个工作线程
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"Worker-{i}",))
    threads.append(t)
    t.start()

# 等待2秒后发送开始信号
time.sleep(2)
print("发送开始信号")
start_event.set()

# 工作5秒后发送停止信号
time.sleep(5)
print("发送停止信号")
stop_event.set()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程已退出")

代码示例

代码示例

# 详细代码示例:生产者消费者模式使用Event
import threading
import time
import random

class DataBuffer:
    """数据缓冲区"""
    def __init__(self, max_size=10):
        self.buffer = []
        self.max_size = max_size
        self.lock = threading.Lock()
        self.not_empty = threading.Event()  # 缓冲区非空事件
        self.not_full = threading.Event()   # 缓冲区未满事件
        self.not_full.set()  # 初始时缓冲区未满

class Producer:
    """生产者"""
    def __init__(self, buffer, name):
        self.buffer = buffer
        self.name = name
        self.running = True
    
    def produce(self):
        while self.running:
            # 等待缓冲区有空间
            self.buffer.not_full.wait()
            
            with self.buffer.lock:
                if len(self.buffer.buffer) < self.buffer.max_size:
                    data = f"{self.name}-data-{random.randint(1, 100)}"
                    self.buffer.buffer.append(data)
                    print(f"[{self.name}] 生产: {data}, 缓冲区大小: {len(self.buffer.buffer)}")
                    
                    # 设置非空事件,唤醒消费者
                    self.buffer.not_empty.set()
                    
                    # 如果缓冲区满了,清除未满事件
                    if len(self.buffer.buffer) >= self.buffer.max_size:
                        self.buffer.not_full.clear()
            
            time.sleep(random.uniform(0.5, 1.5))

class Consumer:
    """消费者"""
    def __init__(self, buffer, name):
        self.buffer = buffer
        self.name = name
        self.running = True
    
    def consume(self):
        while self.running:
            # 等待缓冲区非空
            self.buffer.not_empty.wait()
            
            with self.buffer.lock:
                if self.buffer.buffer:
                    data = self.buffer.buffer.pop(0)
                    print(f"[{self.name}] 消费: {data}, 缓冲区大小: {len(self.buffer.buffer)}")
                    
                    # 设置未满事件,唤醒生产者
                    self.buffer.not_full.set()
                    
                    # 如果缓冲区空了,清除非空事件
                    if not self.buffer.buffer:
                        self.buffer.not_empty.clear()
            
            time.sleep(random.uniform(0.8, 2.0))

# 创建缓冲区
buffer = DataBuffer(max_size=5)

# 创建生产者和消费者
producers = [Producer(buffer, f"Producer-{i}") for i in range(2)]
consumers = [Consumer(buffer, f"Consumer-{i}") for i in range(3)]

# 启动线程
producer_threads = []
consumer_threads = []

for p in producers:
    t = threading.Thread(target=p.produce)
    t.start()
    producer_threads.append(t)

for c in consumers:
    t = threading.Thread(target=c.consume)
    t.start()
    consumer_threads.append(t)

# 运行10秒
time.sleep(10)

# 停止所有线程
for p in producers:
    p.running = False
for c in consumers:
    c.running = False

# 等待线程结束
for t in producer_threads + consumer_threads:
    t.join()

print("生产消费完成")

小贴士

Event对象是线程间通信的简单机制,适用于一个线程通知其他线程某个事件发生的场景。它比Condition更简单,但功能也相对有限。Event常用于:1)线程启动/停止控制;2)任务完成通知;3)简单的状态同步。


注意事项

注意1:使用线程同步Event时需要注意的关键点。

注意2:常见的陷阱和最佳实践。


小结

  • 核心概念1:线程同步Event的核心概念1

  • 核心概念2:线程同步Event的核心概念2

  • 核心概念3:线程同步Event的核心概念3


练习题

练习1

编写程序,练习线程同步Event的基本用法。

练习2

编写一个函数,在实际场景中应用线程同步Event。

常见问题

Event对象和Lock有什么区别?

Lock用于互斥访问,确保同一时刻只有一个线程访问共享资源;Event用于线程间通信,一个线程设置事件,其他线程等待事件。Lock是排他性的,Event是通知性的。Lock必须成对使用(acquire/release),Event可以单独使用。

Event的wait()方法会一直阻塞吗?

wait()方法默认会一直阻塞直到事件被设置。但可以传入timeout参数指定超时时间,超时后即使事件未被设置也会返回。返回值为事件是否被设置(True/False)。建议在重要场景中使用超时机制避免永久阻塞。

Event可以被重复使用吗?

可以。Event被set()后,所有等待的线程都会被唤醒。之后可以调用clear()清除事件,使其回到未设置状态,然后再次使用。但要注意,如果在clear()之前有新线程调用wait(),它会立即返回,因为事件仍处于设置状态。

标签: Event 事件通知 线程通信 set/wait Python

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python Semaphore信号量详解 - 多线程限流与资源池管理 下一篇: Python Condition条件变量详解 - 生产者消费者模式实现

poll相关推荐