Python BoundedBarrier
最后修改:2025 年 2 月 15 日
在本文中,我们将展示如何使用自定义的 BoundedBarrier 来同步 Python 线程。
BoundedBarrier 是一种同步原语,它允许固定数量的线程相互等待,直到到达一个公共的屏障点。与内置的 threading.Barrier 不同,BoundedBarrier 可以通过额外的约束来实现,例如限制可以在屏障处等待的最大线程数。
本教程演示了如何使用 Python 的 threading.Condition 和 threading.Lock 创建自定义的 BoundedBarrier。
BoundedBarrier 实现
以下示例演示了如何实现自定义的 BoundedBarrier。
import threading
class BoundedBarrier:
def __init__(self, max_threads):
self.max_threads = max_threads
self.count = 0
self.condition = threading.Condition()
def wait(self):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
def worker(barrier, thread_name):
print(f"{thread_name} is starting")
barrier.wait() # Wait at the barrier
print(f"{thread_name} has passed the barrier")
def main():
max_threads = 3
barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}"))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All threads have passed the barrier")
if __name__ == "__main__":
main()
在此程序中,使用 threading.Condition 实现了一个自定义的 BoundedBarrier。该屏障允许固定数量的线程在继续之前相互等待。
self.condition = threading.Condition()
BoundedBarrier 使用 Condition 对象来管理线程同步。
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
每个线程在到达屏障时增加计数器。如果计数器达到最大线程数,则通知所有等待的线程,并重置计数器。否则,线程等待其他线程到达。
barrier = BoundedBarrier(max_threads)
BoundedBarrier 使用可以在屏障处等待的最大线程数进行初始化。
$ python main.py Thread-1 is starting Thread-2 is starting Thread-3 is starting Thread-1 has passed the barrier Thread-2 has passed the barrier Thread-3 has passed the barrier All threads have passed the barrier
可重用的 BoundedBarrier
以下示例演示了如何为多个同步点重用 BoundedBarrier。
import threading
import time
class BoundedBarrier:
def __init__(self, max_threads):
self.max_threads = max_threads
self.count = 0
self.condition = threading.Condition()
def wait(self):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
else:
self.condition.wait() # Wait for other threads
def worker(barrier, thread_name, num_phases):
for phase in range(num_phases):
print(f"{thread_name} is working on phase {phase + 1}")
time.sleep(1) # Simulate work for the phase
print(f"{thread_name} has completed phase {phase + 1}")
barrier.wait() # Wait at the barrier
print(f"{thread_name} is moving to the next phase")
def main():
max_threads = 3
num_phases = 2 # Number of phases in the task
barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All phases completed by all threads")
if __name__ == "__main__":
main()
在此程序中,BoundedBarrier 被重用于多个同步点。每个线程在两个阶段工作,并且屏障确保所有线程在一个阶段完成之前移动到下一个阶段。
barrier.wait() # Wait at the barrier
每个线程在完成一个阶段后,都会在屏障上调用 wait 方法。这确保了所有线程在移动到下一个阶段之前完成当前阶段。
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
带超时的多阶段任务示例
以下示例演示了如何使用带有超时的自定义 BoundedBarrier 来同步执行的多个阶段中的线程。如果线程未在指定的超时时间内到达屏障,它将继续执行,而无需等待其他线程。
import threading
import time
class BoundedBarrier:
def __init__(self, max_threads):
self.max_threads = max_threads
self.count = 0
self.condition = threading.Condition()
def wait(self, timeout=None):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
return True # Barrier tripped
else:
if timeout is None:
self.condition.wait() # Wait indefinitely
else:
if not self.condition.wait(timeout): # Wait with timeout
self.count -= 1 # Decrement count if timeout occurs
return False # Barrier not tripped
return True # Barrier tripped
def worker(barrier, thread_name, num_phases):
for phase in range(num_phases):
print(f"{thread_name} is working on phase {phase + 1}")
time.sleep(1) # Simulate work for the phase
print(f"{thread_name} has completed phase {phase + 1}")
if not barrier.wait(timeout=2): # Wait at the barrier with a timeout
print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}")
continue
print(f"{thread_name} is moving to the next phase")
def main():
max_threads = 3
num_phases = 2 # Number of phases in the task
barrier = BoundedBarrier(max_threads)
threads = []
for i in range(max_threads): # Create 3 threads
thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
threads.append(thread)
thread.start()
for thread in threads:
thread.join() # Wait for all threads to complete
print("All phases completed by all threads")
if __name__ == "__main__":
main()
在此程序中,BoundedBarrier 与超时一起使用,以同步多个阶段中的线程。如果线程未在 2 秒内到达屏障,它将继续执行,而无需等待其他线程。
def wait(self, timeout=None):
with self.condition:
self.count += 1
if self.count == self.max_threads:
self.condition.notify_all() # Notify all waiting threads
self.count = 0 # Reset the counter for reuse
return True # Barrier tripped
else:
if timeout is None:
self.condition.wait() # Wait indefinitely
else:
if not self.condition.wait(timeout): # Wait with timeout
self.count -= 1 # Decrement count if timeout occurs
return False # Barrier not tripped
return True # Barrier tripped
更新 wait 方法以支持超时。如果发生超时,线程将递减计数器并继续执行,而无需等待其他线程。
if not barrier.wait(timeout=2): # Wait at the barrier with a timeout
print(f"{thread_name} timed out waiting for the barrier in phase {phase + 1}")
continue
每个线程都调用超时时间为 2 秒的 wait 方法。 如果在此时间内未触发屏障,则线程将继续执行,而无需等待。
$ python main.py Thread-1 is working on phase 1 Thread-2 is working on phase 1 Thread-3 is working on phase 1 Thread-1 has completed phase 1 Thread-2 has completed phase 1 Thread-3 has completed phase 1 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase Thread-1 is working on phase 2 Thread-2 is working on phase 2 Thread-3 is working on phase 2 Thread-1 has completed phase 2 Thread-2 has completed phase 2 Thread-3 has completed phase 2 Thread-1 is moving to the next phase Thread-2 is moving to the next phase Thread-3 is moving to the next phase All phases completed by all threads
来源
在本文中,我们展示了如何使用自定义的 BoundedBarrier 来同步 Python 线程。
作者
列出所有 Python 教程。