ZetCode

Python BoundedBarrier

最后修改:2025 年 2 月 15 日

在本文中,我们将展示如何使用自定义的 BoundedBarrier 来同步 Python 线程。

BoundedBarrier 是一种同步原语,它允许固定数量的线程相互等待,直到到达一个公共的屏障点。与内置的 threading.Barrier 不同,BoundedBarrier 可以通过额外的约束来实现,例如限制可以在屏障处等待的最大线程数。

本教程演示了如何使用 Python 的 threading.Conditionthreading.Lock 创建自定义的 BoundedBarrier

BoundedBarrier 实现

以下示例演示了如何实现自定义的 BoundedBarrier

main.py
import threading

class BoundedBarrier:
    def __init__(self, max_threads):
        self.max_threads = max_threads
        self.count = 0
        self.condition = threading.Condition()

    def wait(self):
        with self.condition:
            self.count += 1
            if self.count == self.max_threads:
                self.condition.notify_all()  # Notify all waiting threads
                self.count = 0  # Reset the counter for reuse
            else:
                self.condition.wait()  # Wait for other threads

def worker(barrier, thread_name):
    print(f"{thread_name} is starting")
    barrier.wait()  # Wait at the barrier
    print(f"{thread_name} has passed the barrier")

def main():
    max_threads = 3
    barrier = BoundedBarrier(max_threads)

    threads = []
    for i in range(max_threads):  # Create 3 threads
        thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}"))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All threads have passed the barrier")

if __name__ == "__main__":
    main()

在此程序中,使用 threading.Condition 实现了一个自定义的 BoundedBarrier。该屏障允许固定数量的线程在继续之前相互等待。

self.condition = threading.Condition()

BoundedBarrier 使用 Condition 对象来管理线程同步。

self.count += 1
if self.count == self.max_threads:
    self.condition.notify_all()  # Notify all waiting threads
    self.count = 0  # Reset the counter for reuse
else:
    self.condition.wait()  # Wait for other threads

每个线程在到达屏障时增加计数器。如果计数器达到最大线程数,则通知所有等待的线程,并重置计数器。否则,线程等待其他线程到达。

barrier = BoundedBarrier(max_threads)

BoundedBarrier 使用可以在屏障处等待的最大线程数进行初始化。

$ python main.py
Thread-1 is starting
Thread-2 is starting
Thread-3 is starting
Thread-1 has passed the barrier
Thread-2 has passed the barrier
Thread-3 has passed the barrier
All threads have passed the barrier

可重用的 BoundedBarrier

以下示例演示了如何为多个同步点重用 BoundedBarrier

main.py
import threading
import time

class BoundedBarrier:
    def __init__(self, max_threads):
        self.max_threads = max_threads
        self.count = 0
        self.condition = threading.Condition()

    def wait(self):
        with self.condition:
            self.count += 1
            if self.count == self.max_threads:
                self.condition.notify_all()  # Notify all waiting threads
                self.count = 0  # Reset the counter for reuse
            else:
                self.condition.wait()  # Wait for other threads

def worker(barrier, thread_name, num_phases):
    for phase in range(num_phases):
        print(f"{thread_name} is working on phase {phase + 1}")
        time.sleep(1)  # Simulate work for the phase
        print(f"{thread_name} has completed phase {phase + 1}")
        barrier.wait()  # Wait at the barrier
        print(f"{thread_name} is moving to the next phase")

def main():
    max_threads = 3
    num_phases = 2  # Number of phases in the task
    barrier = BoundedBarrier(max_threads)

    threads = []
    for i in range(max_threads):  # Create 3 threads
        thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All phases completed by all threads")

if __name__ == "__main__":
    main()

在此程序中,BoundedBarrier 被重用于多个同步点。每个线程在两个阶段工作,并且屏障确保所有线程在一个阶段完成之前移动到下一个阶段。

barrier.wait()  # Wait at the barrier

每个线程在完成一个阶段后,都会在屏障上调用 wait 方法。这确保了所有线程在移动到下一个阶段之前完成当前阶段。

$ python main.py
Thread-1 is working on phase 1
Thread-2 is working on phase 1
Thread-3 is working on phase 1
Thread-1 has completed phase 1
Thread-2 has completed phase 1
Thread-3 has completed phase 1
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
Thread-1 is working on phase 2
Thread-2 is working on phase 2
Thread-3 is working on phase 2
Thread-1 has completed phase 2
Thread-2 has completed phase 2
Thread-3 has completed phase 2
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
All phases completed by all threads

带超时的多阶段任务示例

以下示例演示了如何使用带有超时的自定义 BoundedBarrier 来同步执行的多个阶段中的线程。如果线程未在指定的超时时间内到达屏障,它将继续执行,而无需等待其他线程。

main.py
import threading
import time

class BoundedBarrier:
    def __init__(self, max_threads):
        self.max_threads = max_threads
        self.count = 0
        self.condition = threading.Condition()

    def wait(self, timeout=None):
        with self.condition:
            self.count += 1
            if self.count == self.max_threads:
                self.condition.notify_all()  # Notify all waiting threads
                self.count = 0  # Reset the counter for reuse
                return True  # Barrier tripped
            else:
                if timeout is None:
                    self.condition.wait()  # Wait indefinitely
                else:
                    if not self.condition.wait(timeout):  # Wait with timeout
                        self.count -= 1  # Decrement count if timeout occurs
                        return False  # Barrier not tripped
                return True  # Barrier tripped

def worker(barrier, thread_name, num_phases):
    for phase in range(num_phases):
        print(f"{thread_name} is working on phase {phase + 1}")
        time.sleep(1)  # Simulate work for the phase
        print(f"{thread_name} has completed phase {phase + 1}")
        if not barrier.wait(timeout=2):  # Wait at the barrier with a timeout
            print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}")
            continue
        print(f"{thread_name} is moving to the next phase")

def main():
    max_threads = 3
    num_phases = 2  # Number of phases in the task
    barrier = BoundedBarrier(max_threads)

    threads = []
    for i in range(max_threads):  # Create 3 threads
        thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All phases completed by all threads")

if __name__ == "__main__":
    main()

在此程序中,BoundedBarrier 与超时一起使用,以同步多个阶段中的线程。如果线程未在 2 秒内到达屏障,它将继续执行,而无需等待其他线程。

def wait(self, timeout=None):
    with self.condition:
        self.count += 1
        if self.count == self.max_threads:
            self.condition.notify_all()  # Notify all waiting threads
            self.count = 0  # Reset the counter for reuse
            return True  # Barrier tripped
        else:
            if timeout is None:
                self.condition.wait()  # Wait indefinitely
            else:
                if not self.condition.wait(timeout):  # Wait with timeout
                    self.count -= 1  # Decrement count if timeout occurs
                    return False  # Barrier not tripped
            return True  # Barrier tripped

更新 wait 方法以支持超时。如果发生超时,线程将递减计数器并继续执行,而无需等待其他线程。

if not barrier.wait(timeout=2):  # Wait at the barrier with a timeout
    print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}")
    continue

每个线程都调用超时时间为 2 秒的 wait 方法。 如果在此时间内未触发屏障,则线程将继续执行,而无需等待。

$ python main.py
Thread-1 is working on phase 1
Thread-2 is working on phase 1
Thread-3 is working on phase 1
Thread-1 has completed phase 1
Thread-2 has completed phase 1
Thread-3 has completed phase 1
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
Thread-1 is working on phase 2
Thread-2 is working on phase 2
Thread-3 is working on phase 2
Thread-1 has completed phase 2
Thread-2 has completed phase 2
Thread-3 has completed phase 2
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
All phases completed by all threads

来源

Python threading - 文档

在本文中,我们展示了如何使用自定义的 BoundedBarrier 来同步 Python 线程。

作者

我的名字是 Jan Bodnar,我是一位充满热情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直撰写编程文章。 迄今为止,我已撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程