作用

实现屏障等待功能,支持多个线程一起完成任务的目标,与CountDownLatch相比较,CyclicBarrier侧重的是一个线程没完成任务,则多个线程必须等待。

API

  • await:等待
  • await(long timeout, TimeUnit unit):在规定时间内还没达到parties数量,则抛出超时异常
  • getNumberWaiting: 获取已到屏障掉的线程数
  • getParties:获取parties数
  • isBroken: 判断屏障是否处于损坏状态
  • reset:重置屏障,即将原来的parties重置为初始值

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class CyclicBarrierDemo1 extends Thread {

private CyclicBarrier cyclicBarrier;

public CyclicBarrierDemo1(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + "完成了");
// 全部完成才会是否
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
public void run() {
// cyclicBarrier全部执行了将会回调run方法
System.out.println("全部已完成");
}
});

for (int i = 0; i < 3; i++) {
new CyclicBarrierDemo1(cyclicBarrier).start();
}
}
}

当线程数等于CyclicBarrier的parties数时,这是正常的,大于也是正常的,但是线程数如果小于parties了,那么将会一直阻塞。

特性

  • 屏障重置性:parties可以重置归为0
  • 破坏模型:当其中一个线程中断或提前离开了屏障点,其他线程也会离开屏障点

与CountDownLatch比较

  • CountDownLatch的计数不能重置,CyclicBarrier可以
  • CountDownLatch计数为0释放等待的线程,CyclicBarrier达到指定值,释放等待的线程
  • CountDownLatch不可以重复利用,CyclicBarrier可以

源码分析

成员变量

  • 内部类,该类表示代
    1
    2
    3
    private static class Generation {
    boolean broken = false;
    }
  • lock:表示获取当锁
  • parties:指定parties数量
  • generation:当前代
  • count:仍在等待的数量

await方法

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

它调用了dowait方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 表示当前“代”
final Generation g = generation;
// 判断屏障是否损坏
if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
// 进行破坏屏障
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
// 表示所有线程都达到屏障了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 那么将会回调执行CyclicBarries的run方法
command.run();
// 可以认为执行栅栏任务成功了
ranAction = true;
// 唤醒所有线程
nextGeneration();
return 0;
} finally {
if (!ranAction)
// 执行失败,则破坏屏障
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// timed这个值是由构造方法决定的,如果是使用有超时设置的构造方法,那么该值为true
if (!timed)
trip.await();
else if (nanos > 0L)
// 否则等待指定的时长
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 当前代,且屏障没有被破坏
breakBarrier();
throw ie;
} else {

Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
// 判断当前CyclicBarrier是不是这代的,不是则直接返回下标
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}