pin_drop当前位置:知识文库 ❯ 图文
Python Semaphore信号量详解 - 多线程限流与资源池管理
Semaphore概述
Semaphore(信号量)是Python threading模块提供的一种高级同步原语。与Lock只能让一个线程访问资源不同,Semaphore允许指定数量的线程同时访问共享资源,常用于限流和资源池管理。
Semaphore的核心特性:
-
计数器机制:内部维护一个计数器,表示可用资源数量
-
acquire()操作:计数器减1,若计数器为0则阻塞等待
-
release()操作:计数器加1,唤醒等待的线程
-
并发控制:限制同时访问资源的线程数量
Semaphore基本语法
创建和使用Semaphore的基本语法:
代码示例
import threading
# 创建信号量,指定允许同时访问的线程数
semaphore = threading.Semaphore(3) # 允许3个线程同时访问
# 基本使用方式
semaphore.acquire() # 获取信号量,计数器-1
try:
# 访问共享资源
pass
finally:
semaphore.release() # 释放信号量,计数器+1
# 推荐使用with语句
with semaphore:
# 访问共享资源(自动管理)
passSemaphore的主要方法:
-
acquire(blocking=True, timeout=None):获取信号量,计数器减1
-
release():释放信号量,计数器加1
工作原理详解
Semaphore的工作流程:
-
初始化:创建时指定计数器初始值N
-
获取信号量:调用acquire(),若计数器>0则减1并继续;若计数器=0则阻塞等待
-
释放信号量:调用release(),计数器加1,唤醒一个等待的线程
-
公平调度:按FIFO顺序唤醒等待的线程
代码示例
import threading
import time
# 演示Semaphore计数器变化
sem = threading.Semaphore(2)
print(f"初始状态")
# 线程1获取
sem.acquire()
print("线程1获取信号量")
# 线程2获取
sem.acquire()
print("线程2获取信号量")
# 线程3尝试获取(会阻塞)
print("线程3尝试获取(将阻塞)...")
def worker3():
sem.acquire()
print("线程3获取成功!")
sem.release()
t = threading.Thread(target=worker3)
t.start()
time.sleep(0.5)
print("释放一个信号量")
sem.release() # 线程3会被唤醒
t.join()
sem.release()
print("完成")代码示例
示例1:限制并发下载数
代码示例
import threading
import time
import random
# 模拟最多3个并发下载
download_semaphore = threading.Semaphore(3)
def download_file(file_id):
with download_semaphore:
print(f"[开始] 下载文件 {file_id},当前线程: {threading.current_thread().name}")
# 模拟下载耗时
time.sleep(random.uniform(0.5, 2))
print(f"[完成] 文件 {file_id} 下载完成")
# 创建10个下载任务
files = [f"file_{i}.zip" for i in range(10)]
threads = []
for file_id in files:
t = threading.Thread(target=download_file, args=(file_id,))
threads.append(t)
t.start()
# 等待所有下载完成
for t in threads:
t.join()
print("所有文件下载完成")示例2:数据库连接池
代码示例
import threading
import time
import random
class ConnectionPool:
def __init__(self, pool_size=3):
self.pool_size = pool_size
self.semaphore = threading.Semaphore(pool_size)
self.connections = [f"conn_{i}" for i in range(pool_size)]
self.lock = threading.Lock()
self.available = self.connections.copy()
def get_connection(self):
self.semaphore.acquire()
with self.lock:
conn = self.available.pop()
print(f"获取连接: {conn}, 剩余可用: {len(self.available)}")
return conn
def release_connection(self, conn):
with self.lock:
self.available.append(conn)
print(f"释放连接: {conn}, 剩余可用: {len(self.available)}")
self.semaphore.release()
# 使用连接池
pool = ConnectionPool(3)
def db_task(task_id):
conn = pool.get_connection()
try:
print(f"任务{task_id}使用{conn}执行查询")
time.sleep(random.uniform(0.5, 1.5))
finally:
pool.release_connection(conn)
# 创建多个任务
threads = [threading.Thread(target=db_task, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print("所有数据库任务完成")示例3:API请求限流
代码示例
import threading
import time
import random
# 限制同时最多5个API请求
api_semaphore = threading.Semaphore(5)
class APIClient:
def __init__(self):
self.request_count = 0
self.lock = threading.Lock()
def call_api(self, endpoint):
with api_semaphore:
with self.lock:
self.request_count += 1
count = self.request_count
print(f"[请求{count}] 调用API: {endpoint}")
# 模拟API调用
time.sleep(random.uniform(0.3, 1))
print(f"[响应{count}] API返回结果")
client = APIClient()
def request_worker(request_id):
client.call_api(f"/api/data/{request_id}")
# 创建20个请求
threads = [threading.Thread(target=request_worker, args=(i,)) for i in range(20)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"总请求数: {client.request_count}")注意事项
注意1:acquire()和release()必须成对使用。推荐使用with语句避免忘记释放。
注意2:Semaphore允许同一线程多次acquire(),但每次acquire()都需要对应的release()。
注意3:BoundedSemaphore会检查release()次数不能超过初始值,防止计数器无限增长。
注意4:合理设置信号量值。值太小会导致并发度低,值太大会失去限流效果。
小结
-
Semaphore作用:控制同时访问资源的线程数量,实现限流和资源池
-
计数器机制:acquire()减1,release()加1,计数器为0时阻塞
-
典型应用:并发下载限制、数据库连接池、API请求限流
-
最佳实践:使用with语句管理,配合BoundedSemaphore防止计数器溢出
练习题
练习1
实现一个停车场管理系统,使用Semaphore限制同时停放的车辆数(如10个车位),模拟车辆进入和离开的过程。
练习2
实现一个线程安全的日志写入系统,使用Semaphore限制同时写入日志的线程数,避免磁盘IO过载。
常见问题
Semaphore和Lock有什么区别?
Lock只允许一个线程访问资源(互斥),Semaphore允许多个线程同时访问(计数)。Lock是Semaphore的特例(计数器为1)。
BoundedSemaphore和普通Semaphore有什么区别?
BoundedSemaphore会检查release()次数不能超过初始值,防止计数器无限增长导致错误。普通Semaphore没有这个限制。推荐使用BoundedSemaphore更安全。
Semaphore可以用于生产消费者模型吗?
可以。使用两个Semaphore:一个表示空槽位数量,一个表示产品数量。生产者acquire空槽位信号量,消费者acquire产品信号量,实现经典的生产者消费者问题。
如何选择合适的信号量值?
根据资源特性和系统负载决定。数据库连接池通常设置为连接池大小,API限流根据服务端承受能力设置,IO密集型任务可设置较大值,CPU密集型任务设置为CPU核心数。
本文涉及AI创作
内容由AI创作,请仔细甄别