ZetCode

Java CyclicBarrier

最后修改:2025 年 2 月 15 日

本文介绍如何使用 CyclicBarrier 同步 Java 线程。

CyclicBarrier 是一种同步辅助工具,它允许一组线程相互等待,直到到达一个公共的屏障点。 与一次性使用的 CountDownLatch 不同,CyclicBarrier 在等待线程被释放后可以重复使用。

要使用 CyclicBarrier,我们首先创建一个 CyclicBarrier 对象,指定线程数以及可选的 Runnable 操作,该操作在屏障被触发后执行。 每个线程都会调用屏障上的 await 方法,该方法会阻塞,直到所有线程都到达屏障。 一旦所有线程都到达,屏障就会被触发,并且执行可选的操作。 然后线程被释放,并且屏障可以被重复使用。

CyclicBarrier 示例

以下示例展示了 CyclicBarrier 的用法。

Main.java
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;

class Worker implements Runnable {

    private final List<String> messages;
    private final CyclicBarrier cyclicBarrier;

    public Worker(List<String> messages, CyclicBarrier cyclicBarrier) {
        this.messages = messages;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {

        int r = new Random().nextInt(3000, 8000);

        System.out.printf("%s starting, durations %dms %n", 
            Thread.currentThread().getName(), r);

        try {
            Thread.sleep(r);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + " completed");
        messages.add(Thread.currentThread().getName() + " completed");

        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

void main() throws InterruptedException {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());
    var cyclicBarrier = new CyclicBarrier(5, () -> 
        messages.add("Barrier tripped, all threads reached the barrier"));

    List<Thread> workers = Stream.generate(() -> 
        new Thread(new Worker(messages, cyclicBarrier)))
            .limit(5)
            .toList();

    workers.forEach(Thread::start);

    for (Thread worker : workers) {
        worker.join();
    }

    System.out.println(messages);
}

主线程创建五个新线程;这五个线程使用 CyclicBarrier 进行同步。

int r = new Random().nextInt(3000, 8000);

每个线程运行一个 3000 到 8000 毫秒之间的随机时间。

messages.add(Thread.currentThread().getName() + " completed");
cyclicBarrier.await();

任务完成后,它将消息写入同步列表,并在屏障上调用 await。 该线程会阻塞,直到所有线程都到达屏障。

List<String> messages = Collections.synchronizedList(new ArrayList<>());
var cyclicBarrier = new CyclicBarrier(5, () -> 
    messages.add("Barrier tripped, all threads reached the barrier"));

我们创建一个同步列表和一个用于五个任务的 CyclicBarrier。 该屏障还有一个可选的操作,该操作在所有线程到达屏障后执行。

List<Thread> workers = Stream.generate(() -> 
    new Thread(new Worker(messages, cyclicBarrier)))
        .limit(5)
        .toList();

创建包含五个线程的列表。 每个线程都接收一个同步消息列表和 cyclicBarrier

workers.forEach(Thread::start);

我们启动所有线程。

for (Thread worker : workers) {
    worker.join();
}

主线程使用 join 等待所有工作线程完成。

System.out.println(messages);

最后,我们打印所有消息。

可重用的 CyclicBarrier 示例

以下示例演示了 CyclicBarrier 的可重用性。 该屏障被使用两次,以在两个不同的点同步线程。

Main.java
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;

class Worker implements Runnable {

    private final List<String> messages;
    private final CyclicBarrier cyclicBarrier;

    public Worker(List<String> messages, CyclicBarrier cyclicBarrier) {
        this.messages = messages;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {

        int r = new Random().nextInt(3000, 8000);

        System.out.printf("%s starting, durations %dms %n", 
            Thread.currentThread().getName(), r);

        try {
            Thread.sleep(r);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + " completed phase 1");
        messages.add(Thread.currentThread().getName() + " completed phase 1");

        try {
            cyclicBarrier.await(); // Wait for all threads to complete phase 1
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }

        // Phase 2
        r = new Random().nextInt(3000, 8000);
        System.out.printf("%s starting phase 2, durations %dms %n", 
            Thread.currentThread().getName(), r);

        try {
            Thread.sleep(r);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println(Thread.currentThread().getName() + " completed phase 2");
        messages.add(Thread.currentThread().getName() + " completed phase 2");

        try {
            cyclicBarrier.await(); // Wait for all threads to complete phase 2
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

void main() throws InterruptedException {

    List<String> messages = Collections.synchronizedList(new ArrayList<>());
    var cyclicBarrier = new CyclicBarrier(5, () -> 
        messages.add("Barrier tripped, all threads reached the barrier"));

    List<Thread> workers = Stream.generate(() -> 
        new Thread(new Worker(messages, cyclicBarrier)))
            .limit(5)
            .toList();

    workers.forEach(Thread::start);

    for (Thread worker : workers) {
        worker.join();
    }

    System.out.println(messages);
}

在这个程序中,CyclicBarrier 被使用两次,以在执行的两个不同阶段同步线程。 每个线程完成两个阶段的工作,并且屏障确保所有线程在进行到下一个阶段之前完成每个阶段。

cyclicBarrier.await(); // Wait for all threads to complete phase 1

第一次调用 await 确保所有线程在移动到阶段 2 之前完成阶段 1。

cyclicBarrier.await(); // Wait for all threads to complete phase 2

第二次调用 await 确保所有线程在程序终止之前完成阶段 2。

var cyclicBarrier = new CyclicBarrier(5, () -> 
    messages.add("Barrier tripped, all threads reached the barrier"));

每次屏障被触发时,都会执行屏障操作,表明所有线程都已到达同步点。

$ java Main.java
Thread-0 starting, durations 4500ms 
Thread-1 starting, durations 3200ms 
Thread-2 starting, durations 7000ms 
Thread-3 starting, durations 5000ms 
Thread-4 starting, durations 6000ms 
Thread-1 completed phase 1
Thread-0 completed phase 1
Thread-3 completed phase 1
Thread-4 completed phase 1
Thread-2 completed phase 1
Barrier tripped, all threads reached the barrier
Thread-0 starting phase 2, durations 4000ms 
Thread-1 starting phase 2, durations 5500ms 
Thread-2 starting phase 2, durations 3000ms 
Thread-3 starting phase 2, durations 2500ms 
Thread-4 starting phase 2, durations 6000ms 
Thread-3 completed phase 2
Thread-2 completed phase 2
Thread-0 completed phase 2
Thread-1 completed phase 2
Thread-4 completed phase 2
Barrier tripped, all threads reached the barrier
[Thread-1 completed phase 1, Thread-0 completed phase 1, Thread-3 completed phase 1, Thread-4 completed phase 1, Thread-2 completed phase 1, Barrier tripped, all threads reached the barrier, Thread-3 completed phase 2, Thread-2 completed phase 2, Thread-0 completed phase 2, Thread-1 completed phase 2, Thread-4 completed phase 2, Barrier tripped, all threads reached the barrier]

来源

Java CyclicBarrier - 参考

在本文中,我们展示了如何使用 CyclicBarrier 同步 Java 线程。

作者

我叫 Jan Bodnar,是一位充满激情的程序员,拥有丰富的编程经验。 我自 2007 年以来一直在撰写编程文章。到目前为止,我已经撰写了超过 1,400 篇文章和 8 本电子书。 我拥有超过十年的编程教学经验。

列出所有Java教程