Python 信号量
最后修改:2025 年 2 月 15 日
在本文中,我们将展示如何使用 threading.Semaphore
同步 Python 线程。
Semaphore
是一种同步工具,它通过一组许可证来控制对共享资源的访问。它对于管理有限资源非常有用,例如数据库连接或线程池,在这些场景下,每次只能有一定数量的线程同时访问该资源。
Semaphore
初始化时会带有一个许可证数量。线程可以使用 acquire
方法获取许可证,并使用 release
方法释放许可证。如果没有可用的许可证,线程将阻塞,直到另一个线程释放许可证。
信号量示例
下面的示例演示了如何使用 threading.Semaphore
控制对共享资源的访问。
import threading import time class SharedResource: def __init__(self, permits): self.semaphore = threading.Semaphore(permits) def use_resource(self, thread_name): with self.semaphore: # Acquire and release a permit automatically print(f"{thread_name} is using the resource") time.sleep(2) # Simulate resource usage print(f"{thread_name} is releasing the resource") def worker(shared_resource, thread_name): shared_resource.use_resource(thread_name) def main(): shared_resource = SharedResource(2) # Allow 2 permits threads = [] for i in range(5): # Create 5 threads thread = threading.Thread(target=worker, args=(shared_resource, f"Thread-{i+1}")) threads.append(thread) thread.start() for thread in threads: thread.join() # Wait for all threads to complete print("All tasks completed") if __name__ == "__main__": main()
在此程序中,使用 Semaphore
来限制对共享资源的访问。由于信号量初始化时带有两个许可证,因此每次只能有两个线程访问该资源。
self.semaphore = threading.Semaphore(permits)
SharedResource
类使用一个具有指定许可证数量的 Semaphore
进行初始化。
with self.semaphore: # Acquire and release a permit automatically
with
语句用于自动获取和释放许可证,确保正确的资源管理。
shared_resource = SharedResource(2) # Allow 2 permits
SharedResource
使用两个许可证进行初始化,这意味着每次只能有两个线程同时访问该资源。
$ python main.py Thread-1 is using the resource Thread-2 is using the resource Thread-1 is releasing the resource Thread-2 is releasing the resource Thread-3 is using the resource Thread-4 is using the resource Thread-3 is releasing the resource Thread-4 is releasing the resource Thread-5 is using the resource Thread-5 is releasing the resource All tasks completed
带超时的信号量
下面的示例演示了如何使用带超时的 Semaphore
。如果在指定超时时间内无法获取许可证,线程将继续执行而不访问资源。
import threading import time class SharedResource: def __init__(self, permits): self.semaphore = threading.Semaphore(permits) def use_resource(self, thread_name): if self.semaphore.acquire(timeout=1): # Try to acquire a permit with a timeout print(f"{thread_name} is using the resource") time.sleep(2) # Simulate resource usage print(f"{thread_name} is releasing the resource") self.semaphore.release() # Release the permit else: print(f"{thread_name} could not acquire the resource") def worker(shared_resource, thread_name): shared_resource.use_resource(thread_name) def main(): shared_resource = SharedResource(2) # Allow 2 permits threads = [] for i in range(5): # Create 5 threads thread = threading.Thread(target=worker, args=(shared_resource, f"Thread-{i+1}")) threads.append(thread) thread.start() for thread in threads: thread.join() # Wait for all threads to complete print("All tasks completed") if __name__ == "__main__": main()
在此程序中,Semaphore
是带超时使用的。如果在 1 秒内无法获取许可证,线程将继续执行而不访问资源。
if self.semaphore.acquire(timeout=1): # Try to acquire a permit with a timeout
调用 acquire
方法时设置了 1 秒的超时时间。如果在该时间内未能获取许可证,线程将继续执行而不访问资源。
$ python main.py Thread-1 is using the resource Thread-2 is using the resource Thread-3 could not acquire the resource Thread-4 could not acquire the resource Thread-1 is releasing the resource Thread-2 is releasing the resource Thread-5 is using the resource Thread-5 is releasing the resource All tasks completed
带信号量的线程池示例
下面的示例演示了如何使用 threading.Semaphore
实现线程池。线程池将并发执行的任务数量限制为固定大小。
import threading import time class Task: def __init__(self, task_id, semaphore): self.task_id = task_id self.semaphore = semaphore def run(self): with self.semaphore: # Acquire and release a permit automatically print(f"Task {self.task_id} is running on {threading.current_thread().name}") time.sleep(2) # Simulate task execution print(f"Task {self.task_id} is completed") def worker(task): task.run() def main(): pool_size = 3 # Maximum number of concurrent tasks semaphore = threading.Semaphore(pool_size) tasks = [] for i in range(10): # Create 10 tasks task = Task(i + 1, semaphore) thread = threading.Thread(target=worker, args=(task,)) tasks.append(thread) thread.start() for thread in tasks: thread.join() # Wait for all threads to complete print("All tasks completed") if __name__ == "__main__": main()
在此程序中,使用 Semaphore
将并发执行的任务数量限制为固定大小(在本例中为 3)。每个任务在执行前获取一个许可证,并在完成后释放它。
with self.semaphore: # Acquire and release a permit automatically
with
语句用于自动获取和释放许可证,确保正确的资源管理。
pool_size = 3 # Maximum number of concurrent tasks semaphore = threading.Semaphore(pool_size)
Semaphore
使用 3 的池大小进行初始化,这意味着每次只能有 3 个任务并发运行。
$ python main.py Task 1 is running on Thread-1 Task 2 is running on Thread-2 Task 3 is running on Thread-3 Task 1 is completed Task 4 is running on Thread-4 Task 2 is completed Task 5 is running on Thread-5 Task 3 is completed Task 6 is running on Thread-6 Task 4 is completed Task 7 is running on Thread-7 Task 5 is completed Task 8 is running on Thread-8 Task 6 is completed Task 9 is running on Thread-9 Task 7 is completed Task 10 is running on Thread-10 Task 8 is completed Task 9 is completed Task 10 is completed All tasks completed
来源
在本文中,我们展示了如何使用信号量来管理资源,以同步 Python 线程。
作者
列出所有 Python 教程。