Python BoundedBarrier
最后修改:2025 年 2 月 15 日
在本文中,我们将展示如何使用自定义的 BoundedBarrier
来同步 Python 线程。
BoundedBarrier
是一种同步原语,它允许固定数量的线程相互等待,直到到达一个公共的屏障点。与内置的 threading.Barrier
不同,BoundedBarrier
可以通过额外的约束来实现,例如限制可以在屏障处等待的最大线程数。
本教程演示了如何使用 Python 的 threading.Condition
和 threading.Lock
创建自定义的 BoundedBarrier
。
BoundedBarrier 实现
以下示例演示了如何实现自定义的 BoundedBarrier
。
import threading class BoundedBarrier: def __init__(self, max_threads): self.max_threads = max_threads self.count = 0 self.condition = threading.Condition() def wait(self): with self.condition: self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse else: self.condition.wait() # Wait for other threads def worker(barrier, thread_name): print(f"{thread_name} is starting") barrier.wait() # Wait at the barrier print(f"{thread_name} has passed the barrier") def main(): max_threads = 3 barrier = BoundedBarrier(max_threads) threads = [] for i in range(max_threads): # Create 3 threads thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}")) threads.append(thread) thread.start() for thread in threads: thread.join() # Wait for all threads to complete print("All threads have passed the barrier") if __name__ == "__main__": main()
在此程序中,使用 threading.Condition
实现了一个自定义的 BoundedBarrier
。该屏障允许固定数量的线程在继续之前相互等待。
self.condition = threading.Condition()
BoundedBarrier
使用 Condition
对象来管理线程同步。
self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse else: self.condition.wait() # Wait for other threads
每个线程在到达屏障时增加计数器。如果计数器达到最大线程数,则通知所有等待的线程,并重置计数器。否则,线程等待其他线程到达。
barrier = BoundedBarrier(max_threads)
BoundedBarrier
使用可以在屏障处等待的最大线程数进行初始化。
$ python main.py Thread-1 is starting Thread-2 is starting Thread-3 is starting Thread-1 has passed the barrier Thread-2 has passed the barrier Thread-3 has passed the barrier All threads have passed the barrier
可重用的 BoundedBarrier
以下示例演示了如何为多个同步点重用 BoundedBarrier
。
import threading import time class BoundedBarrier: def __init__(self, max_threads): self.max_threads = max_threads self.count = 0 self.condition = threading.Condition() def wait(self): with self.condition: self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse else: self.condition.wait() # Wait for other threads def worker(barrier, thread_name, num_phases): for phase in range(num_phases): print(f"{thread_name} is working on phase {phase + 1}") time.sleep(1) # Simulate work for the phase print(f"{thread_name} has completed phase {phase + 1}") barrier.wait() # Wait at the barrier print(f"{thread_name} is moving to the next phase") def main(): max_threads = 3 num_phases = 2 # Number of phases in the task barrier = BoundedBarrier(max_threads) threads = [] for i in range(max_threads): # Create 3 threads thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases)) threads.append(thread) thread.start() for thread in threads: thread.join() # Wait for all threads to complete print("All phases completed by all threads") if __name__ == "__main__": main()
在此程序中,BoundedBarrier
被重用于多个同步点。每个线程在两个阶段工作,并且屏障确保所有线程在一个阶段完成之前移动到下一个阶段。
barrier.wait() # Wait at the barrier
每个线程在完成一个阶段后,都会在屏障上调用 wait
方法。这确保了所有线程在移动到下一个阶段之前完成当前阶段。
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
带超时的多阶段任务示例
以下示例演示了如何使用带有超时的自定义 BoundedBarrier
来同步执行的多个阶段中的线程。如果线程未在指定的超时时间内到达屏障,它将继续执行,而无需等待其他线程。
import threading import time class BoundedBarrier: def __init__(self, max_threads): self.max_threads = max_threads self.count = 0 self.condition = threading.Condition() def wait(self, timeout=None): with self.condition: self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse return True # Barrier tripped else: if timeout is None: self.condition.wait() # Wait indefinitely else: if not self.condition.wait(timeout): # Wait with timeout self.count -= 1 # Decrement count if timeout occurs return False # Barrier not tripped return True # Barrier tripped def worker(barrier, thread_name, num_phases): for phase in range(num_phases): print(f"{thread_name} is working on phase {phase + 1}") time.sleep(1) # Simulate work for the phase print(f"{thread_name} has completed phase {phase + 1}") if not barrier.wait(timeout=2): # Wait at the barrier with a timeout print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}") continue print(f"{thread_name} is moving to the next phase") def main(): max_threads = 3 num_phases = 2 # Number of phases in the task barrier = BoundedBarrier(max_threads) threads = [] for i in range(max_threads): # Create 3 threads thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases)) threads.append(thread) thread.start() for thread in threads: thread.join() # Wait for all threads to complete print("All phases completed by all threads") if __name__ == "__main__": main()
在此程序中,BoundedBarrier
与超时一起使用,以同步多个阶段中的线程。如果线程未在 2 秒内到达屏障,它将继续执行,而无需等待其他线程。
def wait(self, timeout=None): with self.condition: self.count += 1 if self.count == self.max_threads: self.condition.notify_all() # Notify all waiting threads self.count = 0 # Reset the counter for reuse return True # Barrier tripped else: if timeout is None: self.condition.wait() # Wait indefinitely else: if not self.condition.wait(timeout): # Wait with timeout self.count -= 1 # Decrement count if timeout occurs return False # Barrier not tripped return True # Barrier tripped
更新 wait
方法以支持超时。如果发生超时,线程将递减计数器并继续执行,而无需等待其他线程。
if not barrier.wait(timeout=2): # Wait at the barrier with a timeout print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}") continue
每个线程都调用超时时间为 2 秒的 wait
方法。 如果在此时间内未触发屏障,则线程将继续执行,而无需等待。
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
来源
在本文中,我们展示了如何使用自定义的 BoundedBarrier
来同步 Python 线程。
作者
列出所有 Python 教程。