并发队列原理之ConcurrentLinkedQueue原理解析

概述

ConcurrentLinkedQueue是无界非阻塞队列,底层有单向链表实现,通过CAS来保证线程的安全

使用Node内部类作为链表的节点,Node的item域存放节点的值,next表示下一个节点,类图如下:

原理分析

offer方法:在队列尾部添加一个元素,返回为true,不能添加null元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean offer(E e) {
// 检查元素是否为空
checkNotNull(e);
// 插入尾部的元素
final Node<E> newNode = new Node<E>(e);
// 从尾节点开始循环
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// 通过CAS将队列原最后一个元素的next指向新节点
if (p.casNext(null, newNode)) {
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// 由于自引用问题需要重新找新的head
p = (t != (t = tail)) ? t : head;
else
// 寻找尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}

add方法:实际调用的内部offer方法

1
2
3
public boolean add(E e) {
return offer(e);
}

poll方法:移除队列头部一个元素,并返回该值,队列为空则返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 当前节点有值则设置为null
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
// 从链表中移除
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 当前节点被自引用了,重新寻找队列头节点
else if (p == q)
// 发现头结点被修改,通过goto预缴跳出外层循环重新获取头结点
continue restartFromHead;
else
p = q;
}
}
}

peek方法:获取头节点元素但是不移除

具体逻辑跟poll差不多,区别在于不需要移除元素

remove方法:移除指定元素,存在多个则移除第一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
// 使用CAS设置为null
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
removed = p.casItem(item, null);
}
// 获取next节点
next = succ(p);
// 前驱节点和next节点都不为空,则链接前驱节点到next节点
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}