ZetCode

Python 屏障

最后修改日期:2024 年 7 月 15 日

在本文中,我们将展示如何使用 threading.Barrier 同步 Python 线程。

Barrier 是一种同步原语,它允许固定数量的线程等待彼此到达一个公共的屏障点。 它对于协调多个线程以分阶段执行任务非常有用。

Barrier 初始化时需要一个计数,表示在所有线程可以继续之前必须到达屏障的线程数。 每个线程都调用屏障上的 wait 方法,该方法会阻塞,直到所有线程都到达屏障。 一旦所有线程都到达,它们将被释放,并且可以重复使用该屏障。

屏障示例

以下示例演示如何使用 threading.Barrier 同步多个线程。

main.py
import threading
import time

def worker(barrier, thread_name):
    print(f"{thread_name} is starting")
    time.sleep(2)  # Simulate some work
    print(f"{thread_name} is waiting at the barrier")
    barrier.wait()  # Wait for all threads to reach the barrier
    print(f"{thread_name} is continuing after the barrier")

def main():
    num_threads = 3
    barrier = threading.Barrier(num_threads)

    threads = []
    for i in range(num_threads):  # Create 3 threads
        thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}"))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All threads completed")

if __name__ == "__main__":
    main()

在此程序中,使用 Barrier 来同步三个线程。 每个线程执行一些工作,在屏障处等待,然后在所有线程都到达屏障后继续。

barrier = threading.Barrier(num_threads)

Barrier 使用必须到达屏障的线程数进行初始化,之后所有线程才能继续。

barrier.wait()  # Wait for all threads to reach the barrier

每个线程都调用屏障上的 wait 方法,该方法会阻塞,直到所有线程都到达屏障。

for thread in threads:
    thread.join()  # Wait for all threads to complete

主线程使用 join 方法等待所有工作线程完成。

$ python main.py
Thread-1 is starting
Thread-2 is starting
Thread-3 is starting
Thread-1 is waiting at the barrier
Thread-2 is waiting at the barrier
Thread-3 is waiting at the barrier
Thread-1 is continuing after the barrier
Thread-2 is continuing after the barrier
Thread-3 is continuing after the barrier
All threads completed

带有 Action 的屏障

以下示例演示如何使用带有 action 的 Barrier。 一旦所有线程都到达屏障,就会执行该 action。

main.py
import threading
import time

def worker(barrier, thread_name):
    print(f"{thread_name} is starting")
    time.sleep(2)  # Simulate some work
    print(f"{thread_name} is waiting at the barrier")
    barrier.wait()  # Wait for all threads to reach the barrier
    print(f"{thread_name} is continuing after the barrier")

def barrier_action():
    print("All threads have reached the barrier")

def main():
    num_threads = 3
    barrier = threading.Barrier(num_threads, action=barrier_action)

    threads = []
    for i in range(num_threads):  # Create 3 threads
        thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}"))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All threads completed")

if __name__ == "__main__":
    main()

在此程序中,Barrier 使用一个 action 进行初始化,该 action 一旦所有线程都到达屏障,就会执行。

barrier = threading.Barrier(num_threads, action=barrier_action)

Barrier 使用一个 action 进行初始化,该 action 一旦所有线程都到达屏障,就会执行。

def barrier_action():
    print("All threads have reached the barrier")

该 action 是一个函数,一旦所有线程都到达屏障,就会执行该函数。

$ python main.py
Thread-1 is starting
Thread-2 is starting
Thread-3 is starting
Thread-1 is waiting at the barrier
Thread-2 is waiting at the barrier
Thread-3 is waiting at the barrier
All threads have reached the barrier
Thread-1 is continuing after the barrier
Thread-2 is continuing after the barrier
Thread-3 is continuing after the barrier
All threads completed

使用屏障的多阶段任务示例

以下示例演示如何使用 threading.Barrier 在多个执行阶段同步线程。 每个阶段代表一个不同的工作阶段,并且屏障确保所有线程在进入下一个阶段之前完成一个阶段。

main.py
import threading
import time

def worker(barrier, thread_name, num_phases):
    for phase in range(num_phases):
        print(f"{thread_name} is working on phase {phase + 1}")
        time.sleep(1)  # Simulate work for the phase
        print(f"{thread_name} has completed phase {phase + 1}")
        barrier.wait()  # Wait for all threads to complete the phase
        print(f"{thread_name} is moving to the next phase")

def main():
    num_threads = 3
    num_phases = 3  # Number of phases in the task
    barrier = threading.Barrier(num_threads)

    threads = []
    for i in range(num_threads):  # Create 3 threads
        thread = threading.Thread(target=worker, args=(barrier, f"Thread-{i+1}", num_phases))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()  # Wait for all threads to complete

    print("All phases completed by all threads")

if __name__ == "__main__":
    main()

在此程序中,使用 Barrier 在三个执行阶段同步三个线程。 每个线程在一个阶段工作,等待所有线程完成该阶段,然后进入下一个阶段。

barrier = threading.Barrier(num_threads)

Barrier 使用必须到达屏障的线程数进行初始化,之后所有线程才能进入下一个阶段。

barrier.wait()  # Wait for all threads to complete the phase

每个线程在完成一个阶段后都调用屏障上的 wait 方法。 这确保所有线程在进入下一个阶段之前完成当前阶段。

for thread in threads:
    thread.join()  # Wait for all threads to complete

主线程使用 join 方法等待所有工作线程完成。

$ python main.py
Thread-1 is working on phase 1
Thread-2 is working on phase 1
Thread-3 is working on phase 1
Thread-1 has completed phase 1
Thread-2 has completed phase 1
Thread-3 has completed phase 1
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
Thread-1 is working on phase 2
Thread-2 is working on phase 2
Thread-3 is working on phase 2
Thread-1 has completed phase 2
Thread-2 has completed phase 2
Thread-3 has completed phase 2
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
Thread-1 is working on phase 3
Thread-2 is working on phase 3
Thread-3 is working on phase 3
Thread-1 has completed phase 3
Thread-2 has completed phase 3
Thread-3 has completed phase 3
Thread-1 is moving to the next phase
Thread-2 is moving to the next phase
Thread-3 is moving to the next phase
All phases completed by all threads

来源

Python 屏障 - 文档

在本文中,我们展示了如何使用 Barrier 同步 Python 线程以进行线程协调。

作者

我叫 Jan Bodnar,是一位充满激情的程序员,拥有丰富的编程经验。 自 2007 年以来,我一直撰写编程文章。 迄今为止,我已经撰写了 1,400 多篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有 Python 教程