pin_drop当前位置:知识文库 ❯ 图文

Python Semaphore信号量详解 - 多线程限流与资源池管理

Semaphore概述

Semaphore(信号量)是Python threading模块提供的一种高级同步原语。与Lock只能让一个线程访问资源不同,Semaphore允许指定数量的线程同时访问共享资源,常用于限流和资源池管理。

Semaphore的核心特性:

  • 计数器机制:内部维护一个计数器,表示可用资源数量

  • acquire()操作:计数器减1,若计数器为0则阻塞等待

  • release()操作:计数器加1,唤醒等待的线程

  • 并发控制:限制同时访问资源的线程数量


Semaphore基本语法

创建和使用Semaphore的基本语法:

代码示例

import threading

# 创建信号量,指定允许同时访问的线程数
semaphore = threading.Semaphore(3)  # 允许3个线程同时访问

# 基本使用方式
semaphore.acquire()  # 获取信号量,计数器-1
try:
    # 访问共享资源
    pass
finally:
    semaphore.release()  # 释放信号量,计数器+1

# 推荐使用with语句
with semaphore:
    # 访问共享资源(自动管理)
    pass

Semaphore的主要方法:

  • acquire(blocking=True, timeout=None):获取信号量,计数器减1

  • release():释放信号量,计数器加1


工作原理详解

Semaphore的工作流程:

  • 初始化:创建时指定计数器初始值N

  • 获取信号量:调用acquire(),若计数器>0则减1并继续;若计数器=0则阻塞等待

  • 释放信号量:调用release(),计数器加1,唤醒一个等待的线程

  • 公平调度:按FIFO顺序唤醒等待的线程

代码示例

import threading
import time

# 演示Semaphore计数器变化
sem = threading.Semaphore(2)
print(f"初始状态")

# 线程1获取
sem.acquire()
print("线程1获取信号量")

# 线程2获取
sem.acquire()
print("线程2获取信号量")

# 线程3尝试获取(会阻塞)
print("线程3尝试获取(将阻塞)...")

def worker3():
    sem.acquire()
    print("线程3获取成功!")
    sem.release()

t = threading.Thread(target=worker3)
t.start()

time.sleep(0.5)
print("释放一个信号量")
sem.release()  # 线程3会被唤醒

t.join()
sem.release()
print("完成")

代码示例

示例1:限制并发下载数

代码示例

import threading
import time
import random

# 模拟最多3个并发下载
download_semaphore = threading.Semaphore(3)

def download_file(file_id):
    with download_semaphore:
        print(f"[开始] 下载文件 {file_id},当前线程: {threading.current_thread().name}")
        # 模拟下载耗时
        time.sleep(random.uniform(0.5, 2))
        print(f"[完成] 文件 {file_id} 下载完成")

# 创建10个下载任务
files = [f"file_{i}.zip" for i in range(10)]
threads = []

for file_id in files:
    t = threading.Thread(target=download_file, args=(file_id,))
    threads.append(t)
    t.start()

# 等待所有下载完成
for t in threads:
    t.join()

print("所有文件下载完成")

示例2:数据库连接池

代码示例

import threading
import time
import random

class ConnectionPool:
    def __init__(self, pool_size=3):
        self.pool_size = pool_size
        self.semaphore = threading.Semaphore(pool_size)
        self.connections = [f"conn_{i}" for i in range(pool_size)]
        self.lock = threading.Lock()
        self.available = self.connections.copy()
    
    def get_connection(self):
        self.semaphore.acquire()
        with self.lock:
            conn = self.available.pop()
            print(f"获取连接: {conn}, 剩余可用: {len(self.available)}")
            return conn
    
    def release_connection(self, conn):
        with self.lock:
            self.available.append(conn)
            print(f"释放连接: {conn}, 剩余可用: {len(self.available)}")
        self.semaphore.release()

# 使用连接池
pool = ConnectionPool(3)

def db_task(task_id):
    conn = pool.get_connection()
    try:
        print(f"任务{task_id}使用{conn}执行查询")
        time.sleep(random.uniform(0.5, 1.5))
    finally:
        pool.release_connection(conn)

# 创建多个任务
threads = [threading.Thread(target=db_task, args=(i,)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print("所有数据库任务完成")

示例3:API请求限流

代码示例

import threading
import time
import random

# 限制同时最多5个API请求
api_semaphore = threading.Semaphore(5)

class APIClient:
    def __init__(self):
        self.request_count = 0
        self.lock = threading.Lock()
    
    def call_api(self, endpoint):
        with api_semaphore:
            with self.lock:
                self.request_count += 1
                count = self.request_count
            
            print(f"[请求{count}] 调用API: {endpoint}")
            # 模拟API调用
            time.sleep(random.uniform(0.3, 1))
            print(f"[响应{count}] API返回结果")

client = APIClient()

def request_worker(request_id):
    client.call_api(f"/api/data/{request_id}")

# 创建20个请求
threads = [threading.Thread(target=request_worker, args=(i,)) for i in range(20)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"总请求数: {client.request_count}")

注意事项

注意1:acquire()和release()必须成对使用。推荐使用with语句避免忘记释放。

注意2:Semaphore允许同一线程多次acquire(),但每次acquire()都需要对应的release()。

注意3:BoundedSemaphore会检查release()次数不能超过初始值,防止计数器无限增长。

注意4:合理设置信号量值。值太小会导致并发度低,值太大会失去限流效果。


小结

  • Semaphore作用:控制同时访问资源的线程数量,实现限流和资源池

  • 计数器机制:acquire()减1,release()加1,计数器为0时阻塞

  • 典型应用:并发下载限制、数据库连接池、API请求限流

  • 最佳实践:使用with语句管理,配合BoundedSemaphore防止计数器溢出


练习题

练习1

实现一个停车场管理系统,使用Semaphore限制同时停放的车辆数(如10个车位),模拟车辆进入和离开的过程。

练习2

实现一个线程安全的日志写入系统,使用Semaphore限制同时写入日志的线程数,避免磁盘IO过载。

常见问题

Semaphore和Lock有什么区别?

Lock只允许一个线程访问资源(互斥),Semaphore允许多个线程同时访问(计数)。Lock是Semaphore的特例(计数器为1)。

BoundedSemaphore和普通Semaphore有什么区别?

BoundedSemaphore会检查release()次数不能超过初始值,防止计数器无限增长导致错误。普通Semaphore没有这个限制。推荐使用BoundedSemaphore更安全。

Semaphore可以用于生产消费者模型吗?

可以。使用两个Semaphore:一个表示空槽位数量,一个表示产品数量。生产者acquire空槽位信号量,消费者acquire产品信号量,实现经典的生产者消费者问题。

如何选择合适的信号量值?

根据资源特性和系统负载决定。数据库连接池通常设置为连接池大小,API限流根据服务端承受能力设置,IO密集型任务可设置较大值,CPU密集型任务设置为CPU核心数。

标签: Semaphore 信号量 并发控制 限流 资源池 线程同步

本文涉及AI创作

内容由AI创作,请仔细甄别

list快速访问

上一篇: Python RLock可重入锁详解 - 递归加锁与嵌套调用 下一篇: Python Event事件详解 - 线程间通信与同步控制

poll相关推荐