Python 信号量
最后修改:2025 年 2 月 15 日
在本文中,我们将展示如何使用 threading.Semaphore 同步 Python 线程。
Semaphore 是一种同步工具,它通过一组许可证来控制对共享资源的访问。它对于管理有限资源非常有用,例如数据库连接或线程池,在这些场景下,每次只能有一定数量的线程同时访问该资源。
Semaphore 初始化时会带有一个许可证数量。线程可以使用 acquire 方法获取许可证,并使用 release 方法释放许可证。如果没有可用的许可证,线程将阻塞,直到另一个线程释放许可证。
信号量示例
下面的示例演示了如何使用 threading.Semaphore 控制对共享资源的访问。
import threading
import time
class SharedResource:
def __init__(self, permits):
self.semaphore = threading.Semaphore(permits)
def use_resource(self, thread_name):
with self.semaphore: # Acquire and release a permit automatically
print(f"{thread_name} is using the resource")
time.sleep(2) # Simulate resource usage
print(f"{thread_name} is releasing the resource")
def worker(shared_resource, thread_name):
shared_resource.use_resource(thread_name)
def main():
shared_resource = SharedResource(2) # Allow 2 permits
threads = []
for i in range(5): # Create 5 threads
thread = threading.Thread(target=worker, args=(shared_resource, f"Thread-{i+1}"))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All tasks completed")
if __name__ == "__main__":
main()
在此程序中,使用 Semaphore 来限制对共享资源的访问。由于信号量初始化时带有两个许可证,因此每次只能有两个线程访问该资源。
self.semaphore = threading.Semaphore(permits)
SharedResource 类使用一个具有指定许可证数量的 Semaphore 进行初始化。
with self.semaphore: # Acquire and release a permit automatically
with 语句用于自动获取和释放许可证,确保正确的资源管理。
shared_resource = SharedResource(2) # Allow 2 permits
SharedResource 使用两个许可证进行初始化,这意味着每次只能有两个线程同时访问该资源。
$ python main.py Thread-1 is using the resource Thread-2 is using the resource Thread-1 is releasing the resource Thread-2 is releasing the resource Thread-3 is using the resource Thread-4 is using the resource Thread-3 is releasing the resource Thread-4 is releasing the resource Thread-5 is using the resource Thread-5 is releasing the resource All tasks completed
带超时的信号量
下面的示例演示了如何使用带超时的 Semaphore。如果在指定超时时间内无法获取许可证,线程将继续执行而不访问资源。
import threading
import time
class SharedResource:
def __init__(self, permits):
self.semaphore = threading.Semaphore(permits)
def use_resource(self, thread_name):
if self.semaphore.acquire(timeout=1): # Try to acquire a permit with a timeout
print(f"{thread_name} is using the resource")
time.sleep(2) # Simulate resource usage
print(f"{thread_name} is releasing the resource")
self.semaphore.release() # Release the permit
else:
print(f"{thread_name} could not acquire the resource")
def worker(shared_resource, thread_name):
shared_resource.use_resource(thread_name)
def main():
shared_resource = SharedResource(2) # Allow 2 permits
threads = []
for i in range(5): # Create 5 threads
thread = threading.Thread(target=worker, args=(shared_resource, f"Thread-{i+1}"))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All tasks completed")
if __name__ == "__main__":
main()
在此程序中,Semaphore 是带超时使用的。如果在 1 秒内无法获取许可证,线程将继续执行而不访问资源。
if self.semaphore.acquire(timeout=1): # Try to acquire a permit with a timeout
调用 acquire 方法时设置了 1 秒的超时时间。如果在该时间内未能获取许可证,线程将继续执行而不访问资源。
$ python main.py Thread-1 is using the resource Thread-2 is using the resource Thread-3 could not acquire the resource Thread-4 could not acquire the resource Thread-1 is releasing the resource Thread-2 is releasing the resource Thread-5 is using the resource Thread-5 is releasing the resource All tasks completed
带信号量的线程池示例
下面的示例演示了如何使用 threading.Semaphore 实现线程池。线程池将并发执行的任务数量限制为固定大小。
import threading
import time
class Task:
def __init__(self, task_id, semaphore):
self.task_id = task_id
self.semaphore = semaphore
def run(self):
with self.semaphore: # Acquire and release a permit automatically
print(f"Task {self.task_id} is running on {threading.current_thread().name}")
time.sleep(2) # Simulate task execution
print(f"Task {self.task_id} is completed")
def worker(task):
task.run()
def main():
pool_size = 3 # Maximum number of concurrent tasks
semaphore = threading.Semaphore(pool_size)
tasks = []
for i in range(10): # Create 10 tasks
task = Task(i + 1, semaphore)
thread = threading.Thread(target=worker, args=(task,))
tasks.append(thread)
thread.start()
for thread in tasks:
thread.join() # Wait for all threads to complete
print("All tasks completed")
if __name__ == "__main__":
main()
在此程序中,使用 Semaphore 将并发执行的任务数量限制为固定大小(在本例中为 3)。每个任务在执行前获取一个许可证,并在完成后释放它。
with self.semaphore: # Acquire and release a permit automatically
with 语句用于自动获取和释放许可证,确保正确的资源管理。
pool_size = 3 # Maximum number of concurrent tasks semaphore = threading.Semaphore(pool_size)
Semaphore 使用 3 的池大小进行初始化,这意味着每次只能有 3 个任务并发运行。
$ python main.py Task 1 is running on Thread-1 Task 2 is running on Thread-2 Task 3 is running on Thread-3 Task 1 is completed Task 4 is running on Thread-4 Task 2 is completed Task 5 is running on Thread-5 Task 3 is completed Task 6 is running on Thread-6 Task 4 is completed Task 7 is running on Thread-7 Task 5 is completed Task 8 is running on Thread-8 Task 6 is completed Task 9 is running on Thread-9 Task 7 is completed Task 10 is running on Thread-10 Task 8 is completed Task 9 is completed Task 10 is completed All tasks completed
来源
在本文中,我们展示了如何使用信号量来管理资源,以同步 Python 线程。
作者
列出所有 Python 教程。