Java CyclicBarrier
最后修改:2025 年 2 月 15 日
本文介绍如何使用 CyclicBarrier 同步 Java 线程。
CyclicBarrier 是一种同步辅助工具,它允许一组线程相互等待,直到到达一个公共的屏障点。 与一次性使用的 CountDownLatch 不同,CyclicBarrier 在等待线程被释放后可以重复使用。
要使用 CyclicBarrier,我们首先创建一个 CyclicBarrier 对象,指定线程数以及可选的 Runnable 操作,该操作在屏障被触发后执行。 每个线程都会调用屏障上的 await 方法,该方法会阻塞,直到所有线程都到达屏障。 一旦所有线程都到达,屏障就会被触发,并且执行可选的操作。 然后线程被释放,并且屏障可以被重复使用。
CyclicBarrier 示例
以下示例展示了 CyclicBarrier 的用法。
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;
class Worker implements Runnable {
private final List<String> messages;
private final CyclicBarrier cyclicBarrier;
public Worker(List<String> messages, CyclicBarrier cyclicBarrier) {
this.messages = messages;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
int r = new Random().nextInt(3000, 8000);
System.out.printf("%s starting, durations %dms %n",
Thread.currentThread().getName(), r);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " completed");
messages.add(Thread.currentThread().getName() + " completed");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
void main() throws InterruptedException {
List<String> messages = Collections.synchronizedList(new ArrayList<>());
var cyclicBarrier = new CyclicBarrier(5, () ->
messages.add("Barrier tripped, all threads reached the barrier"));
List<Thread> workers = Stream.generate(() ->
new Thread(new Worker(messages, cyclicBarrier)))
.limit(5)
.toList();
workers.forEach(Thread::start);
for (Thread worker : workers) {
worker.join();
}
System.out.println(messages);
}
主线程创建五个新线程;这五个线程使用 CyclicBarrier 进行同步。
int r = new Random().nextInt(3000, 8000);
每个线程运行一个 3000 到 8000 毫秒之间的随机时间。
messages.add(Thread.currentThread().getName() + " completed"); cyclicBarrier.await();
任务完成后,它将消息写入同步列表,并在屏障上调用 await。 该线程会阻塞,直到所有线程都到达屏障。
List<String> messages = Collections.synchronizedList(new ArrayList<>());
var cyclicBarrier = new CyclicBarrier(5, () ->
messages.add("Barrier tripped, all threads reached the barrier"));
我们创建一个同步列表和一个用于五个任务的 CyclicBarrier。 该屏障还有一个可选的操作,该操作在所有线程到达屏障后执行。
List<Thread> workers = Stream.generate(() ->
new Thread(new Worker(messages, cyclicBarrier)))
.limit(5)
.toList();
创建包含五个线程的列表。 每个线程都接收一个同步消息列表和 cyclicBarrier。
workers.forEach(Thread::start);
我们启动所有线程。
for (Thread worker : workers) {
worker.join();
}
主线程使用 join 等待所有工作线程完成。
System.out.println(messages);
最后,我们打印所有消息。
可重用的 CyclicBarrier 示例
以下示例演示了 CyclicBarrier 的可重用性。 该屏障被使用两次,以在两个不同的点同步线程。
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;
class Worker implements Runnable {
private final List<String> messages;
private final CyclicBarrier cyclicBarrier;
public Worker(List<String> messages, CyclicBarrier cyclicBarrier) {
this.messages = messages;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
int r = new Random().nextInt(3000, 8000);
System.out.printf("%s starting, durations %dms %n",
Thread.currentThread().getName(), r);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " completed phase 1");
messages.add(Thread.currentThread().getName() + " completed phase 1");
try {
cyclicBarrier.await(); // Wait for all threads to complete phase 1
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
// Phase 2
r = new Random().nextInt(3000, 8000);
System.out.printf("%s starting phase 2, durations %dms %n",
Thread.currentThread().getName(), r);
try {
Thread.sleep(r);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " completed phase 2");
messages.add(Thread.currentThread().getName() + " completed phase 2");
try {
cyclicBarrier.await(); // Wait for all threads to complete phase 2
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
void main() throws InterruptedException {
List<String> messages = Collections.synchronizedList(new ArrayList<>());
var cyclicBarrier = new CyclicBarrier(5, () ->
messages.add("Barrier tripped, all threads reached the barrier"));
List<Thread> workers = Stream.generate(() ->
new Thread(new Worker(messages, cyclicBarrier)))
.limit(5)
.toList();
workers.forEach(Thread::start);
for (Thread worker : workers) {
worker.join();
}
System.out.println(messages);
}
在这个程序中,CyclicBarrier 被使用两次,以在执行的两个不同阶段同步线程。 每个线程完成两个阶段的工作,并且屏障确保所有线程在进行到下一个阶段之前完成每个阶段。
cyclicBarrier.await(); // Wait for all threads to complete phase 1
第一次调用 await 确保所有线程在移动到阶段 2 之前完成阶段 1。
cyclicBarrier.await(); // Wait for all threads to complete phase 2
第二次调用 await 确保所有线程在程序终止之前完成阶段 2。
var cyclicBarrier = new CyclicBarrier(5, () ->
messages.add("Barrier tripped, all threads reached the barrier"));
每次屏障被触发时,都会执行屏障操作,表明所有线程都已到达同步点。
$ java Main.java Thread-0 starting, durations 4500ms Thread-1 starting, durations 3200ms Thread-2 starting, durations 7000ms Thread-3 starting, durations 5000ms Thread-4 starting, durations 6000ms Thread-1 completed phase 1 Thread-0 completed phase 1 Thread-3 completed phase 1 Thread-4 completed phase 1 Thread-2 completed phase 1 Barrier tripped, all threads reached the barrier Thread-0 starting phase 2, durations 4000ms Thread-1 starting phase 2, durations 5500ms Thread-2 starting phase 2, durations 3000ms Thread-3 starting phase 2, durations 2500ms Thread-4 starting phase 2, durations 6000ms Thread-3 completed phase 2 Thread-2 completed phase 2 Thread-0 completed phase 2 Thread-1 completed phase 2 Thread-4 completed phase 2 Barrier tripped, all threads reached the barrier [Thread-1 completed phase 1, Thread-0 completed phase 1, Thread-3 completed phase 1, Thread-4 completed phase 1, Thread-2 completed phase 1, Barrier tripped, all threads reached the barrier, Thread-3 completed phase 2, Thread-2 completed phase 2, Thread-0 completed phase 2, Thread-1 completed phase 2, Thread-4 completed phase 2, Barrier tripped, all threads reached the barrier]
来源
在本文中,我们展示了如何使用 CyclicBarrier 同步 Java 线程。
作者
列出所有Java教程。