pin_drop当前位置:知识文库 ❯ 图文

Python观察者模式完全指南 - 发布订阅与事件驱动

一、什么是观察者模式

观察者模式(Observer Pattern)是行为型设计模式中最常用的一种。它定义了对象之间的一对多依赖关系,当一个对象(被观察者)的状态发生改变时,所有依赖它的对象(观察者)都会自动收到通知并更新。

观察者模式又被称为发布-订阅模式(Publish-Subscribe Pattern)、模型-视图模式、源-监听器模式等。它是事件驱动系统的核心模式。

核心概念

  • Subject(被观察者):维护观察者列表,状态改变时通知所有观察者

  • Observer(观察者):接收通知并做出响应的对象

  • 解耦:被观察者不需要知道观察者的具体实现

  • 动态订阅:观察者可以随时订阅或取消订阅


二、观察者模式的基础实现

1. 经典实现

代码示例

from abc import ABC, abstractmethod

# 观察者基类
class Observer(ABC):
    @abstractmethod
    def update(self, subject):
        pass

# 被观察者基类
class Subject:
    def __init__(self):
        self._observers = []
        self._state = None
    
    def attach(self, observer: Observer):
        """添加观察者"""
        if observer not in self._observers:
            self._observers.append(observer)
    
    def detach(self, observer: Observer):
        """移除观察者"""
        self._observers.remove(observer)
    
    def notify(self):
        """通知所有观察者"""
        for observer in self._observers:
            observer.update(self)
    
    @property
    def state(self):
        return self._state
    
    @state.setter
    def state(self, value):
        self._state = value
        self.notify()  # 状态改变时自动通知

# 具体观察者
class EmailNotifier(Observer):
    def update(self, subject):
        print(f"[邮件通知] 状态变更为: {subject.state}")

class SMSNotifier(Observer):
    def update(self, subject):
        print(f"[短信通知] 状态变更为: {subject.state}")

class LogRecorder(Observer):
    def update(self, subject):
        print(f"[日志记录] 记录状态变更: {subject.state}")

# 具体被观察者
class OrderStatus(Subject):
    def __init__(self):
        super().__init__()
        self._order_id = None
    
    @property
    def order_id(self):
        return self._order_id
    
    @order_id.setter
    def order_id(self, value):
        self._order_id = value

# 使用
order = OrderStatus()
order.attach(EmailNotifier())
order.attach(SMSNotifier())
order.attach(LogRecorder())

order.order_id = "ORD-20260612-001"
order.state = "已创建"
order.state = "已支付"
order.state = "已发货"

2. 支持条件通知的实现

代码示例

class SmartSubject:
    """支持事件过滤的智能被观察者"""
    def __init__(self):
        self._observers = {}  # {event_type: [observers]}
        self._data = {}
    
    def subscribe(self, event_type, observer, condition=None):
        """订阅特定事件"""
        if event_type not in self._observers:
            self._observers[event_type] = []
        self._observers[event_type].append({
            "observer": observer,
            "condition": condition
        })
    
    def unsubscribe(self, event_type, observer):
        """取消订阅"""
        if event_type in self._observers:
            self._observers[event_type] = [
                o for o in self._observers[event_type] 
                if o["observer"] != observer
            ]
    
    def emit(self, event_type, data):
        """触发事件"""
        self._data[event_type] = data
        if event_type in self._observers:
            for sub in self._observers[event_type]:
                # 如果有条件,检查条件是否满足
                if sub["condition"] and not sub["condition"](data):
                    continue
                sub["observer"](event_type, data)

# 使用
bus = SmartSubject()

def log_handler(event, data):
    print(f"[日志] {event}: {data}")

def alert_handler(event, data):
    if data.get("level", 0) > 80:
        print(f"[告警] {event}: 数值过高! {data}")

# 订阅事件
bus.subscribe("cpu_usage", log_handler)
bus.subscribe("cpu_usage", alert_handler, condition=lambda d: d.get("value", 0) > 80)
bus.subscribe("memory_usage", log_handler)

# 触发事件
bus.emit("cpu_usage", {"value": 45, "level": 45})  # 只触发日志
bus.emit("cpu_usage", {"value": 92, "level": 92})  # 触发日志和告警

三、发布-订阅模式实现

发布-订阅模式是观察者模式的一种变体,它引入了一个中间的事件总线(Event Bus),使发布者和订阅者完全解耦。

1. 事件总线实现

代码示例

import threading
from collections import defaultdict

class EventBus:
    """线程安全的事件总线"""
    def __init__(self):
        self._subscribers = defaultdict(list)
        self._lock = threading.Lock()
    
    def subscribe(self, event, callback):
        """订阅事件"""
        with self._lock:
            self._subscribers[event].append(callback)
    
    def unsubscribe(self, event, callback):
        """取消订阅"""
        with self._lock:
            if callback in self._subscribers[event]:
                self._subscribers[event].remove(callback)
    
    def publish(self, event, **kwargs):
        """发布事件"""
        with self._lock:
            callbacks = self._subscribers.get(event, []).copy()
        
        for callback in callbacks:
            try:
                callback(**kwargs)
            except Exception as e:
                print(f"事件处理异常 [{event}]: {e}")
    
    def once(self, event, callback):
        """只订阅一次"""
        def wrapper(**kwargs):
            callback(**kwargs)
            self.unsubscribe(event, wrapper)
        self.subscribe(event, wrapper)

# 使用
bus = EventBus()

def on_order_created(order_id, amount):
    print(f"订单创建通知: 订单号={order_id}, 金额={amount}")

def on_payment_received(order_id, amount):
    print(f"收款通知: 订单号={order_id}, 金额={amount}")

def send_welcome_email(**kwargs):
    print(f"发送欢迎邮件给用户: {kwargs}")

# 订阅
bus.subscribe("order.created", on_order_created)
bus.subscribe("payment.received", on_payment_received)
bus.once("system.init", send_welcome_email)  # 只触发一次

# 发布
bus.publish("order.created", order_id="ORD001", amount=199.0)
bus.publish("payment.received", order_id="ORD001", amount=199.0)
bus.publish("system.init", user="新用户")  # 第二次发布不会再触发
bus.publish("system.init", user="另一个用户")

2. 支持异步的事件总线

代码示例

import asyncio
from collections import defaultdict

class AsyncEventBus:
    """异步事件总线"""
    def __init__(self):
        self._subscribers = defaultdict(list)
    
    def subscribe(self, event, callback):
        """订阅事件"""
        self._subscribers[event].append(callback)
    
    async def publish(self, event, **kwargs):
        """异步发布事件"""
        tasks = []
        for callback in self._subscribers.get(event, []):
            if asyncio.iscoroutinefunction(callback):
                tasks.append(callback(**kwargs))
            else:
                # 同步回调在线程池中执行
                tasks.append(asyncio.to_thread(callback, **kwargs))
        
        if tasks:
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for r in results:
                if isinstance(r, Exception):
                    print(f"异步事件处理异常: {r}")

# 异步使用示例
async def main():
    bus = AsyncEventBus()
    
    async def async_handler(event_name, **data):
        await asyncio.sleep(0.1)
        print(f"异步处理: {event_name} -> {data}")
    
    def sync_handler(event_name, **data):
        print(f"同步处理: {event_name} -> {data}")
    
    bus.subscribe("data.process", async_handler)
    bus.subscribe("data.process", sync_handler)
    
    await bus.publish("data.process", event_name="data.process", value=42)

asyncio.run(main())
对比项 观察者模式 发布-订阅模式
耦合度 低(被观察者知道观察者) 极低(通过事件总线解耦)
通信方式 直接通知 通过事件中介
扩展性 中等
适用场景 对象间直接关联 大型系统、微服务

四、Python内置的观察者支持

Python标准库提供了对观察者模式的支持,我们可以直接使用或参考其实现。

1. 使用property实现属性监听

代码示例

class ObservableProperty:
    """使用property描述符实现属性监听"""
    def __init__(self, initial_value=None):
        self.value = initial_value
        self._callbacks = []
    
    def on_change(self, callback):
        self._callbacks.append(callback)
        return callback
    
    def __get__(self, obj, objtype=None):
        return self.value
    
    def __set__(self, obj, value):
        old_value = self.value
        self.value = value
        if old_value != value:
            for callback in self._callbacks:
                callback(obj, old_value, value)

class User:
    name = ObservableProperty("Anonymous")
    age = ObservableProperty(0)
    
    def __init__(self):
        pass

@User.name.on_change
def name_changed(user, old, new):
    print(f"用户名从 '{old}' 改为 '{new}'")

@User.age.on_change
def age_changed(user, old, new):
    print(f"年龄从 {old} 变为 {new}")

# 使用
user = User()
user.name = "张三"
user.age = 25
user.age = 26

2. 使用signal库(第三方)

代码示例

# pip install blinker
from blinker import signal

# 定义信号
user_created = signal('user-created')
user_deleted = signal('user-deleted')
order_placed = signal('order-placed')

# 订阅信号
@user_created.connect
def send_welcome_email(sender, **kwargs):
    print(f"发送欢迎邮件给: {kwargs.get('username')}")

@user_created.connect
def init_user_profile(sender, **kwargs):
    print(f"初始化用户资料: {kwargs.get('username')}")

@order_placed.connect
def process_payment(sender, **kwargs):
    print(f"处理订单支付: {kwargs.get('order_id')}")

# 发送信号
user_created.send("system", username="新用户123")
order_placed.send("system", order_id="ORD001", amount=99.9)

五、事件驱动架构中的应用

1. 使用类装饰器实现事件注册

代码示例

from collections import defaultdict
import inspect

class EventEmitter:
    """基于装饰器的事件发射器"""
    _handlers = defaultdict(list)
    
    @classmethod
    def on(cls, event_name):
        """事件注册装饰器"""
        def decorator(func):
            cls._handlers[event_name].append(func)
            return func
        return decorator
    
    @classmethod
    def emit(cls, event_name, **kwargs):
        """触发事件"""
        for handler in cls._handlers.get(event_name, []):
            try:
                handler(**kwargs)
            except Exception as e:
                print(f"事件处理失败 [{event_name}]: {e}")
    
    @classmethod
    def event_names(cls):
        """获取所有已注册的事件"""
        return list(cls._handlers.keys())

# 使用装饰器注册事件处理
@EventEmitter.on("app.start")
def on_app_start(**kwargs):
    print(f"应用启动: {kwargs.get('version')}")

@EventEmitter.on("app.start")
def load_plugins(**kwargs):
    print(f"加载插件...")

@EventEmitter.on("user.login")
def on_login(username, **kwargs):
    print(f"用户登录: {username}")

@EventEmitter.on("user.login")
def record_login_log(username, **kwargs):
    print(f"记录登录日志: {username}")

# 触发事件
EventEmitter.emit("app.start", version="1.0.0")
EventEmitter.emit("user.login", username="admin")

六、实际应用场景

1. 股票价格监控系统

代码示例

class Stock:
    """股票 - 被观察者"""
    def __init__(self, symbol, price):
        self.symbol = symbol
        self._price = price
        self._observers = []
    
    @property
    def price(self):
        return self._price
    
    @price.setter
    def price(self, new_price):
        old_price = self._price
        self._price = new_price
        self._notify(old_price, new_price)
    
    def add_observer(self, observer):
        self._observers.append(observer)
    
    def remove_observer(self, observer):
        self._observers.remove(observer)
    
    def _notify(self, old_price, new_price):
        for observer in self._observers:
            observer.update(self.symbol, old_price, new_price)

class PriceAlert:
    """价格提醒 - 观察者"""
    def __init__(self, threshold=0.05):
        self.threshold = threshold  # 5%的变动触发提醒
    
    def update(self, symbol, old_price, new_price):
        if old_price == 0:
            return
        change = abs(new_price - old_price) / old_price
        if change >= self.threshold:
            direction = "上涨" if new_price > old_price else "下跌"
            print(f"⚠️ 价格提醒: {symbol} {direction} {change*100:.1f}%, "
                  f"价格: {old_price:.2f} → {new_price:.2f}")

class DataLogger:
    """数据记录器 - 观察者"""
    def __init__(self):
        self.history = []
    
    def update(self, symbol, old_price, new_price):
        self.history.append({
            "symbol": symbol,
            "old": old_price,
            "new": new_price
        })
        print(f"[记录] {symbol}: {old_price:.2f} → {new_price:.2f}")

# 使用
apple = Stock("AAPL", 150.0)
alert = PriceAlert(threshold=0.03)  # 3%阈值
logger = DataLogger()

apple.add_observer(alert)
apple.add_observer(logger)

# 价格变动
apple.price = 152.0
apple.price = 158.0  # 超过3%,触发提醒
apple.price = 145.0  # 大幅下跌,触发提醒

2. MVC模式中的数据绑定

代码示例

class Model:
    """MVC中的模型"""
    def __init__(self):
        self._data = {}
        self._views = []
    
    def set(self, key, value):
        old = self._data.get(key)
        self._data[key] = value
        if old != value:
            self._notify(key, value)
    
    def get(self, key):
        return self._data.get(key)
    
    def bind_view(self, view):
        self._views.append(view)
    
    def _notify(self, key, value):
        for view in self._views:
            view.update(key, value)

class TextView:
    """文本视图"""
    def update(self, key, value):
        if key == "username":
            print(f"视图更新 - 用户名显示: {value}")
        elif key == "status":
            print(f"视图更新 - 状态显示: {value}")

class ChartView:
    """图表视图"""
    def update(self, key, value):
        if key == "score":
            print(f"视图更新 - 图表数据刷新: {value}")

# 使用
model = Model()
model.bind_view(TextView())
model.bind_view(ChartView())

model.set("username", "张三")
model.set("score", 95)
model.set("status", "在线")

小贴士

在大型系统中,观察者模式的回调函数可能会抛出异常导致后续观察者无法收到通知。建议在通知逻辑中捕获异常,或使用异步事件总线隔离各个观察者的执行环境。


七、小结与练习题

核心要点总结

  • 核心思想:一对多依赖,状态改变时自动通知所有观察者

  • 发布-订阅:通过事件总线实现发布者和订阅者的完全解耦

  • Python特性:利用property、装饰器和信号库简化观察者实现

  • 注意事项:注意回调异常处理、内存泄漏(遗忘取消订阅)、循环通知等问题

练习题

练习1

实现一个天气预报系统:气象站(被观察者)在温度、湿度、气压变化时通知多个显示设备(观察者),包括当前天气显示、统计数据显示和预报显示。

练习2

设计一个购物车事件系统,当用户添加商品、修改数量、结算时触发不同事件,实现库存扣减、积分计算、优惠券核销等观察者逻辑。在实际场景中应用观察者模式。

常见问题

观察者模式和发布-订阅模式有什么区别?

主要区别在于解耦程度。观察者模式中,被观察者直接维护观察者列表并通知它们;而发布-订阅模式通过一个中间的事件总线来解耦发布者和订阅者。发布-订阅模式的耦合度更低,更适合大型分布式系统。

如何避免观察者导致的内存泄漏?

如果观察者不再需要接收通知,应该调用unsubscribe/removeObserver取消订阅。可以使用弱引用(Python的weakref模块)来存储观察者,这样当观察者被销毁时会自动从列表中移除。也可以使用上下文管理器确保在作用域结束时自动取消订阅。

观察者的回调是同步还是异步执行的?

默认情况下是同步顺序执行的。如果某个观察者的回调耗时较长,会阻塞其他观察者的通知。在需要高性能或隔离性的场景中,可以使用异步事件总线、线程池或消息队列(如RabbitMQ、Kafka)来实现异步通知。

如何处理观察者之间的循环依赖问题?

当观察者A的回调触发了被观察者状态改变,可能导致无限循环。解决方法:1)在通知期间禁止状态修改;2)使用标记位防止重入;3)将状态变更加入队列延迟执行;4)重新设计避免循环依赖。

标签: Python 观察者模式 发布订阅 事件驱动 解耦设计 事件总线

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python工厂模式完全指南 - 三种工厂实现详解 下一篇: Python策略模式完全指南 - 消除条件分支的最佳实践

poll相关推荐