阻塞队列

ArrayBlockingQueue

通过全局锁的方式,同时只能有一个线程进行存放元素到队列或一个线程从队列中获取元素

  • offer方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    if (count == items.length)
    return false;
    else {
    enqueue(e);
    return true;
    }
    } finally {
    lock.unlock();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    该方法首先获取锁,通过其构造方法可以知道,这个锁是非公平锁:

    ​```java
    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }

    note: 非公平锁:直接尝试占有锁;公平锁,先判断当前线程是否是第一个线程,是则获取锁,否则添加到等待队列上。

    然后调用enqueue方法将元素添加到队列上;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
    putIndex = 0;
    count++;
    notEmpty.signal();
    }

    添加完后,通过 notEmpty.signal() 知非空对队列。

    notEmpty和notFull是Condition类型的示例.

PriorityBlockingQueue 无界优先级队列

内部使用到比较器,用来比较元素大小,由于这是无界队列所以这里没有notFul。默认按元素升序。元素需要实现Comparable接口。

1
DEFAULT_INITIAL_CAPACITY = 11 //队列默认大小

为什么是11?

  • offer 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public boolean offer(E e) {
    if (e == null)
    throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 元素个数大于了队列容量,则使用tryGrow进行扩容
    while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);
    try {
    Comparator<? super E> cmp = comparator;
    if (cmp == null)
    siftUpComparable(n, e, array);
    else
    siftUpUsingComparator(n, e, array, cmp);
    size = n + 1;
    notEmpty.signal();
    } finally {
    lock.unlock();
    }
    return true;
    }

    这里可以看到扩容时调用了tryGrow方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
    UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
    0, 1)) {
    try {
    // 如果oldCap大于等于64,那么队列新的容量则扩容50%
    int newCap = oldCap + ((oldCap < 64) ?
    (oldCap + 2) : // grow faster if small
    (oldCap >> 1));
    if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
    int minCap = oldCap + 1;
    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
    throw new OutOfMemoryError();
    newCap = MAX_ARRAY_SIZE;
    }
    if (newCap > oldCap && queue == array)
    newArray = new Object[newCap];
    } finally {
    allocationSpinLock = 0;
    }
    }
    if (newArray == null) // back off if another thread is allocating
    Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
    }
    }

    note

    • 为什么是64作为阈值?

    • 为什么要提前释放锁

    • 扩容的时候可以进行入队列操作,所以是用到CAS,只允许一个线程进行扩容,如果扩容失败了,则通过Thread.yield()让出CPU,让改线程重新获取锁。

      1
      2
      if (newArray == null) // back off if another thread is allocating
      Thread.yield();

      复制数组是在获取锁后面才执行的,是为了保证复制的数组是最新的。

      排序方法:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      private static <T> void siftUpComparable(int k, T x, Object[] array) {
      Comparable<? super T> key = (Comparable<? super T>) x;
      while (k > 0) {
      // 确定父节点元素位置
      int parent = (k - 1) >>> 1;
      //获取父节点的值
      Object e = array[parent];
      // 如果插入的值比父节点大,那么完成堆的建立
      if (key.compareTo((T) e) >= 0)
      break;
      array[k] = e;
      k = parent;
      }
      array[k] = key;
      }

    例子:假设有三个节点0、1、2,值也是0、1、2。1、2分别是0节点的左右子节点,那么如果插入的k=3,这时先找出他要插入位置的父节点,通过(k-1)>>>1可以得出,parent=1,也就是1节点,这个时候将这个插入的值跟其父节点的值进行比较,如果发现是大于父节点的值,那么完成堆的构建,跳出循环。否则与父节点进行交换,

  • poll方法

    该方法通过dequeue方法进行出队列,获取元素。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private E dequeue() {
    int n = size - 1;
    if (n < 0)
    return null;
    else {
    Object[] array = queue;
    E result = (E) array[0];
    // 获取队列尾部元素,然后将原来的位置的指向设置为空
    E x = (E) array[n];
    array[n] = null;
    Comparator<? super E> cmp = comparator;
    if (cmp == null)
    // 把元素插入到下标为0的位置上,然后调整成最小堆
    siftDownComparable(0, x, array, n);
    else
    siftDownUsingComparator(0, x, array, n, cmp);
    size = n;
    return result;
    }
    }

    note:最小堆构建过程

  • take操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
    while ( (result = dequeue()) == null)
    notEmpty.await();
    } finally {
    lock.unlock();
    }
    return result;
    }

    队列没有元素则一直阻塞

SychronousQueue 同步队列

内部没有容器,只能存放一个元素,只要元素被消费了,才能继续take存放,否则一直阻塞。SychronousQueue 使用的CAS来实现线程的安全访问。

公平模式下的实现:队尾匹配,队头出队。底层实现:使用TransferQueue内部队列。

非公平模式下的实现:后入栈、先匹配。 底层使用TransferStack栈。