ZetCode

Python 信号量

最后修改:2025 年 2 月 15 日

在本文中,我们将展示如何使用 threading.Semaphore 同步 Python 线程。

Semaphore 是一种同步工具,它通过一组许可证来控制对共享资源的访问。它对于管理有限资源非常有用,例如数据库连接或线程池,在这些场景下,每次只能有一定数量的线程同时访问该资源。

Semaphore 初始化时会带有一个许可证数量。线程可以使用 acquire 方法获取许可证,并使用 release 方法释放许可证。如果没有可用的许可证,线程将阻塞,直到另一个线程释放许可证。

信号量示例

下面的示例演示了如何使用 threading.Semaphore 控制对共享资源的访问。

main.py
import threading
import time

class SharedResource:
    def __init__(self, permits):
        self.semaphore = threading.Semaphore(permits)

    def use_resource(self, thread_name):
        with self.semaphore:  # Acquire and release a permit automatically
            print(f"{thread_name} is using the resource")
            time.sleep(2)  # Simulate resource usage
            print(f"{thread_name} is releasing the resource")

def worker(shared_resource, thread_name):
    shared_resource.use_resource(thread_name)

def main():
    shared_resource = SharedResource(2)  # Allow 2 permits

    threads = []
    for i in range(5):  # Create 5 threads
        thread = threading.Thread(target=worker, args=(shared_resource, f"Thread-{i+1}"))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All tasks completed")

if __name__ == "__main__":
    main()

在此程序中,使用 Semaphore 来限制对共享资源的访问。由于信号量初始化时带有两个许可证,因此每次只能有两个线程访问该资源。

self.semaphore = threading.Semaphore(permits)

SharedResource 类使用一个具有指定许可证数量的 Semaphore 进行初始化。

with self.semaphore:  # Acquire and release a permit automatically

with 语句用于自动获取和释放许可证,确保正确的资源管理。

shared_resource = SharedResource(2)  # Allow 2 permits

SharedResource 使用两个许可证进行初始化,这意味着每次只能有两个线程同时访问该资源。

$ python main.py
Thread-1 is using the resource
Thread-2 is using the resource
Thread-1 is releasing the resource
Thread-2 is releasing the resource
Thread-3 is using the resource
Thread-4 is using the resource
Thread-3 is releasing the resource
Thread-4 is releasing the resource
Thread-5 is using the resource
Thread-5 is releasing the resource
All tasks completed

带超时的信号量

下面的示例演示了如何使用带超时的 Semaphore。如果在指定超时时间内无法获取许可证,线程将继续执行而不访问资源。

main.py
import threading
import time

class SharedResource:
    def __init__(self, permits):
        self.semaphore = threading.Semaphore(permits)

    def use_resource(self, thread_name):
        if self.semaphore.acquire(timeout=1):  # Try to acquire a permit with a timeout
            print(f"{thread_name} is using the resource")
            time.sleep(2)  # Simulate resource usage
            print(f"{thread_name} is releasing the resource")
            self.semaphore.release()  # Release the permit
        else:
            print(f"{thread_name} could not acquire the resource")

def worker(shared_resource, thread_name):
    shared_resource.use_resource(thread_name)

def main():
    shared_resource = SharedResource(2)  # Allow 2 permits

    threads = []
    for i in range(5):  # Create 5 threads
        thread = threading.Thread(target=worker, args=(shared_resource, f"Thread-{i+1}"))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All tasks completed")

if __name__ == "__main__":
    main()

在此程序中,Semaphore 是带超时使用的。如果在 1 秒内无法获取许可证,线程将继续执行而不访问资源。

if self.semaphore.acquire(timeout=1):  # Try to acquire a permit with a timeout

调用 acquire 方法时设置了 1 秒的超时时间。如果在该时间内未能获取许可证,线程将继续执行而不访问资源。

$ python main.py
Thread-1 is using the resource
Thread-2 is using the resource
Thread-3 could not acquire the resource
Thread-4 could not acquire the resource
Thread-1 is releasing the resource
Thread-2 is releasing the resource
Thread-5 is using the resource
Thread-5 is releasing the resource
All tasks completed

带信号量的线程池示例

下面的示例演示了如何使用 threading.Semaphore 实现线程池。线程池将并发执行的任务数量限制为固定大小。

main.py
import threading
import time

class Task:
    def __init__(self, task_id, semaphore):
        self.task_id = task_id
        self.semaphore = semaphore

    def run(self):
        with self.semaphore:  # Acquire and release a permit automatically
            print(f"Task {self.task_id} is running on {threading.current_thread().name}")
            time.sleep(2)  # Simulate task execution
            print(f"Task {self.task_id} is completed")

def worker(task):
    task.run()

def main():
    pool_size = 3  # Maximum number of concurrent tasks
    semaphore = threading.Semaphore(pool_size)

    tasks = []
    for i in range(10):  # Create 10 tasks
        task = Task(i + 1, semaphore)
        thread = threading.Thread(target=worker, args=(task,))
        tasks.append(thread)
        thread.start()

    for thread in tasks:
        thread.join()  # Wait for all threads to complete

    print("All tasks completed")

if __name__ == "__main__":
    main()

在此程序中,使用 Semaphore 将并发执行的任务数量限制为固定大小(在本例中为 3)。每个任务在执行前获取一个许可证,并在完成后释放它。

with self.semaphore:  # Acquire and release a permit automatically

with 语句用于自动获取和释放许可证,确保正确的资源管理。

pool_size = 3  # Maximum number of concurrent tasks
semaphore = threading.Semaphore(pool_size)

Semaphore 使用 3 的池大小进行初始化,这意味着每次只能有 3 个任务并发运行。

$ python main.py
Task 1 is running on Thread-1
Task 2 is running on Thread-2
Task 3 is running on Thread-3
Task 1 is completed
Task 4 is running on Thread-4
Task 2 is completed
Task 5 is running on Thread-5
Task 3 is completed
Task 6 is running on Thread-6
Task 4 is completed
Task 7 is running on Thread-7
Task 5 is completed
Task 8 is running on Thread-8
Task 6 is completed
Task 9 is running on Thread-9
Task 7 is completed
Task 10 is running on Thread-10
Task 8 is completed
Task 9 is completed
Task 10 is completed
All tasks completed

来源

Python 信号量 - 文档

在本文中,我们展示了如何使用信号量来管理资源,以同步 Python 线程。

作者

我的名字是 Jan Bodnar,我是一名充满激情的程序员,拥有丰富的编程经验。我从 2007 年开始撰写编程文章。迄今为止,我已撰写了 1,400 多篇文章和 8 本电子书。我在编程教学方面拥有十多年的经验。

列出所有 Python 教程