Java CyclicBarrier
最后修改:2025 年 2 月 15 日
本文介绍如何使用 CyclicBarrier 同步 Java 线程。
CyclicBarrier
是一种同步辅助工具,它允许一组线程相互等待,直到到达一个公共的屏障点。 与一次性使用的 CountDownLatch
不同,CyclicBarrier
在等待线程被释放后可以重复使用。
要使用 CyclicBarrier
,我们首先创建一个 CyclicBarrier
对象,指定线程数以及可选的 Runnable
操作,该操作在屏障被触发后执行。 每个线程都会调用屏障上的 await
方法,该方法会阻塞,直到所有线程都到达屏障。 一旦所有线程都到达,屏障就会被触发,并且执行可选的操作。 然后线程被释放,并且屏障可以被重复使用。
CyclicBarrier 示例
以下示例展示了 CyclicBarrier
的用法。
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.stream.Stream; class Worker implements Runnable { private final List<String> messages; private final CyclicBarrier cyclicBarrier; public Worker(List<String> messages, CyclicBarrier cyclicBarrier) { this.messages = messages; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { int r = new Random().nextInt(3000, 8000); System.out.printf("%s starting, durations %dms %n", Thread.currentThread().getName(), r); try { Thread.sleep(r); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + " completed"); messages.add(Thread.currentThread().getName() + " completed"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } } } void main() throws InterruptedException { List<String> messages = Collections.synchronizedList(new ArrayList<>()); var cyclicBarrier = new CyclicBarrier(5, () -> messages.add("Barrier tripped, all threads reached the barrier")); List<Thread> workers = Stream.generate(() -> new Thread(new Worker(messages, cyclicBarrier))) .limit(5) .toList(); workers.forEach(Thread::start); for (Thread worker : workers) { worker.join(); } System.out.println(messages); }
主线程创建五个新线程;这五个线程使用 CyclicBarrier
进行同步。
int r = new Random().nextInt(3000, 8000);
每个线程运行一个 3000 到 8000 毫秒之间的随机时间。
messages.add(Thread.currentThread().getName() + " completed"); cyclicBarrier.await();
任务完成后,它将消息写入同步列表,并在屏障上调用 await
。 该线程会阻塞,直到所有线程都到达屏障。
List<String> messages = Collections.synchronizedList(new ArrayList<>()); var cyclicBarrier = new CyclicBarrier(5, () -> messages.add("Barrier tripped, all threads reached the barrier"));
我们创建一个同步列表和一个用于五个任务的 CyclicBarrier
。 该屏障还有一个可选的操作,该操作在所有线程到达屏障后执行。
List<Thread> workers = Stream.generate(() -> new Thread(new Worker(messages, cyclicBarrier))) .limit(5) .toList();
创建包含五个线程的列表。 每个线程都接收一个同步消息列表和 cyclicBarrier
。
workers.forEach(Thread::start);
我们启动所有线程。
for (Thread worker : workers) { worker.join(); }
主线程使用 join
等待所有工作线程完成。
System.out.println(messages);
最后,我们打印所有消息。
可重用的 CyclicBarrier 示例
以下示例演示了 CyclicBarrier
的可重用性。 该屏障被使用两次,以在两个不同的点同步线程。
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.stream.Stream; class Worker implements Runnable { private final List<String> messages; private final CyclicBarrier cyclicBarrier; public Worker(List<String> messages, CyclicBarrier cyclicBarrier) { this.messages = messages; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { int r = new Random().nextInt(3000, 8000); System.out.printf("%s starting, durations %dms %n", Thread.currentThread().getName(), r); try { Thread.sleep(r); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + " completed phase 1"); messages.add(Thread.currentThread().getName() + " completed phase 1"); try { cyclicBarrier.await(); // Wait for all threads to complete phase 1 } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } // Phase 2 r = new Random().nextInt(3000, 8000); System.out.printf("%s starting phase 2, durations %dms %n", Thread.currentThread().getName(), r); try { Thread.sleep(r); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName() + " completed phase 2"); messages.add(Thread.currentThread().getName() + " completed phase 2"); try { cyclicBarrier.await(); // Wait for all threads to complete phase 2 } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } } } void main() throws InterruptedException { List<String> messages = Collections.synchronizedList(new ArrayList<>()); var cyclicBarrier = new CyclicBarrier(5, () -> messages.add("Barrier tripped, all threads reached the barrier")); List<Thread> workers = Stream.generate(() -> new Thread(new Worker(messages, cyclicBarrier))) .limit(5) .toList(); workers.forEach(Thread::start); for (Thread worker : workers) { worker.join(); } System.out.println(messages); }
在这个程序中,CyclicBarrier
被使用两次,以在执行的两个不同阶段同步线程。 每个线程完成两个阶段的工作,并且屏障确保所有线程在进行到下一个阶段之前完成每个阶段。
cyclicBarrier.await(); // Wait for all threads to complete phase 1
第一次调用 await
确保所有线程在移动到阶段 2 之前完成阶段 1。
cyclicBarrier.await(); // Wait for all threads to complete phase 2
第二次调用 await
确保所有线程在程序终止之前完成阶段 2。
var cyclicBarrier = new CyclicBarrier(5, () -> messages.add("Barrier tripped, all threads reached the barrier"));
每次屏障被触发时,都会执行屏障操作,表明所有线程都已到达同步点。
$ java Main.java Thread-0 starting, durations 4500ms Thread-1 starting, durations 3200ms Thread-2 starting, durations 7000ms Thread-3 starting, durations 5000ms Thread-4 starting, durations 6000ms Thread-1 completed phase 1 Thread-0 completed phase 1 Thread-3 completed phase 1 Thread-4 completed phase 1 Thread-2 completed phase 1 Barrier tripped, all threads reached the barrier Thread-0 starting phase 2, durations 4000ms Thread-1 starting phase 2, durations 5500ms Thread-2 starting phase 2, durations 3000ms Thread-3 starting phase 2, durations 2500ms Thread-4 starting phase 2, durations 6000ms Thread-3 completed phase 2 Thread-2 completed phase 2 Thread-0 completed phase 2 Thread-1 completed phase 2 Thread-4 completed phase 2 Barrier tripped, all threads reached the barrier [Thread-1 completed phase 1, Thread-0 completed phase 1, Thread-3 completed phase 1, Thread-4 completed phase 1, Thread-2 completed phase 1, Barrier tripped, all threads reached the barrier, Thread-3 completed phase 2, Thread-2 completed phase 2, Thread-0 completed phase 2, Thread-1 completed phase 2, Thread-4 completed phase 2, Barrier tripped, all threads reached the barrier]
来源
在本文中,我们展示了如何使用 CyclicBarrier 同步 Java 线程。
作者
列出所有Java教程。