ReentrantReadWriteLock原理分析

概念

ReentrantReadWriteLock采用读写分离的策略,允许多个线程同时获取读锁,写锁是独占式锁,读写锁之间为互斥。

ReentrantReadWriteLock.png
ReentrantReadWriteLock.png

其内部有内部类:WriteLock和ReadLock,分别表示写锁和读锁的操作。ReentrantReadWriteLock中将状态值state分为了两部分,第一部分高16位表示的是读状态(获取到读锁的次数),低16位表示是写状态(获取到写锁的次数)。

其内部类Sync的成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 共享锁偏移量
static final int SHARED_SHIFT = 16;
// 共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 排它锁(写锁)掩码,二进制,15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 返回读锁线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回写锁可重入个数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 第一个获取到锁的线程
private transient Thread firstReader = null;
// 第一个获取到读锁的线程获取读锁的可重入次数
private transient int firstReaderHoldCount;
// 最后一个获取读锁的线程获取读锁的可重入次数
private transient HoldCounter cachedHoldCounter;

写锁的获取和释放

lock

ReentrantReadWriteLock的写锁是独占锁也是可重入锁,如果写锁和读锁没有被获取,那么当前线程可以获取到写锁,否则线程被挂起。

1
2
3
public void lock() {
sync.acquire(1);
}

ReentrantReadWriteLock的lock方法同样是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire实现的。里面使用到tryAcquire方法是Sync重写的

1
2
3
4
5
6
public final void acquire(int arg) {
// 该tryAcquire方法同样是掉用其的Sync内部类实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 获取可重入次数
int w = exclusiveCount(c);
if (c != 0) {
// 不为0则写锁或读锁已经被其他线程获取了
// (Note: if c != 0 and w == 0 then shared count != 0)
// c!=0且w为0说明低16位为0,高16位不为0,这就代表了读锁已经被获取了,w不为0,代表写锁被获取了
if (w == 0 || current != getExclusiveOwnerThread())
// 读锁别获取了或者当前线程不是写锁持有者,则返回false
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 设置可重入次数
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

lockInterruptibly

对中断进行响应的获取锁的方法

tryLock

尝试获取写,成功返回true

1
2
3
public boolean tryLock( ) {
return sync.tryWriteLock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
// c!=0且w为0说明低16位为0,高16位不为0,这就代表了读锁已经被获取了,w不为0,代表写锁被获取了
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}

tryLock(long timeout, TimeUnit unit)

增加超时等待获取锁,能够对中断进行响应

unlock

尝试释放锁,对状态值减一,减一后结果为0则会释放锁

1
2
3
public void unlock() {
sync.release(1);
}
1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
// 调用子类的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
// 判断释放锁的是否是锁的持有线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 状态值减一
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 如果未0则成功释放锁
setExclusiveOwnerThread(null);
// 否则只是进行简单的减一
setState(nextc);
return free;
}

读锁的获取和释放

lock

如果写锁没有被获取,则读锁可以被获取,获取后状态值的高16位的值加一。

1
2
3
public void lock() {
sync.acquireShared(1);
}
1
2
3
4
5
6
public final void acquireShared(int arg) {
// 调用的是具体子类的方法
if (tryAcquireShared(arg) < 0)
// 调用的是AQS的方法
doAcquireShared(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断写锁是否被占用,如果获取到写锁的不是当前线程则返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁持有的线程数
int r = sharedCount(c);
// 尝试获取锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// r等于0表示读锁是第一次被获取,则设置当前线程为第一个获取到锁的线程
firstReader = current;
// 设置读锁的可重入次数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果当前线程是第一个获取到读锁的线程,则重入次数加一
firstReaderHoldCount++;
} else {
// 记录其他线程获取读锁的可重入数或记录最后一个获取读锁的线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

如果当前线程已经获取到写锁,而后先要获取读锁是允许的。readerShouldBlock用来判断队列中的第一个元素是否正尝试或获取写锁,不是则判断获取读锁的可重入次数是否达到了最大值。

tryLock

尝试获取锁

tryReleaseShared

释放锁,跟写锁的过程查不到,多了对firstReader、firstReaderHoldCount和cachedHoldCounter的额外处理,并通过自旋释放锁。