概念与作用

移相器,可用于线程进行分组同步控制,解决分阶段共同完成任务的需求

常用API

  • arriveAndAwaitAdvance:到达屏障处等待,等待其他线程达到屏障处,满足条件后继续向下一个屏障执行
  • arriveAndDeregister:动态撤销线程在phaser的注册,表示不参与后面阶段工作
  • getPhase:获取已经达到第几个屏障
  • onAdvance:通过新屏障时被调用,如果通过第一个屏障时返回了true,那么后面的phaser不进行工作,取消屏障
    1
    2
    3
    4
    5
    6
    7
     Phaser phaser = new Phaser(2) {
    protected boolean onAdvance(int phase, int registerdParties) {
    System.out.println("被调用了");
    // true不进行等待
    return true;
    }
    };
  • getRegisteredParties:获取注册的parties数目
  • register: 动态增加parties值,每次加一
  • bulkRegister:动态增加parties值,可以指定每次增加的数目
  • getArrivedParties:获取已使用的parties的数目
  • getUnarrivedParties:获取未使用的parties的数目
  • arrive:是parties加一,且不再屏障出等待
  • awaitAdvance(int phase):如果参数和getPhase方法返回值一样,则在屏障出等待,否则继续执行。
  • awaitAdvanceInterruptibly(int phase):表示可中断
  • awaitAdvanceInterruptibly(int phase,long timeout, TimeUnit unit):指定栏数等待最大的单位时间,如在指定时间内栏数没有变化,则继续向下运行
  • forceTermination:是凭证处于终结状态,即屏障失效
  • isTerminated:判断phaser是否失效

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class PhaserDemo1 {

public static Phaser phaser;

public static void testA() {
System.out.println(Thread.currentThread().getName() + " One Begin" + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " One End" + System.currentTimeMillis());
phaser.getArrivedParties();
System.out.println(Thread.currentThread().getName() + " Two Begin" + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " Two End" + System.currentTimeMillis());
}

public static void testB() {
System.out.println(Thread.currentThread().getName() + " One Begin" + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " One End" + System.currentTimeMillis());


System.out.println(Thread.currentThread().getName() + " Two Begin" + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " Two End" + System.currentTimeMillis());
}

static class ThreadA extends Thread {
private Phaser phaser;

public ThreadA(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
PhaserDemo1.testA();
}
}

static class ThreadB extends Thread {
private Phaser phaser;

public ThreadB(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
PhaserDemo1.testA();
}
}

static class ThreadC extends Thread {
private Phaser phaser;
public ThreadC(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
PhaserDemo1.testB();
}
}

public static void main(String[] args) {
Phaser phaser = new Phaser(3);
PhaserDemo1.phaser=phaser;
new ThreadA(phaser).start();
new ThreadB(phaser).start();
new ThreadC(phaser).start();
}


}

需要注意的是如果parties数大于线程数,将会一直阻塞