Java源码系列(19) -- LinkedBlockingDeque

November 6, 2018

一、类签名

本类是基于链节点的、可选边界的阻塞双端队列。指定可选的容量避免队列过度扩展。

1
2
3
public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>, java.io.Serializable

如果构造方法的容量参数没有指定,则 Integer.MAX_VALUE 将作为默认容量使用。而队列元素插入时,对应链节点动态创建。

多数操作能在常量时间内完成执行。例外的是 remove(Object)removeFirstOccurrenceremoveLastOccurrencecontainsiterator.remove() 和批量操作等方法,消耗的时间是线性的。源码来自JDK11

LinkedBlockingDeque_UML

二、Node

双向链表的节点类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class Node<E> {
    // item,节点已被移除时为空
    E item;

    // 其一:
    // - 真正的前导节点
    // - 这个节点,表明前导节点是尾节点
    // - null,表明没有前导节点
    Node<E> prev;

    // 其一:
    // - 真正的后继节点
    // - 这个节点,表明后继节点是头节点
    // - null,表明没有后继节点
    Node<E> next;

    // 构造方法
    Node(E x) {
        item = x;
    }
}

三、数据成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 指向队列首节点
// Invariant: (first == null && last == null) || (first.prev == null && first.item != null)
transient Node<E> first;

// 指向队列尾节点
// Invariant: (first == null && last == null) || (last.next == null && last.item != null)
transient Node<E> last;

// 双端队列已保存元素数量
private transient int count;

// 双端队列元素最大容量
private final int capacity;

// 保护所有访问操作的主锁
final ReentrantLock lock = new ReentrantLock();

// 等待获取的Condition
private final Condition notEmpty = lock.newCondition();

// 等待存入的Condition
private final Condition notFull = lock.newCondition();

四、构造方法

Integer.MAX_VALUE 构建实例

1
2
3
public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}

用指定容量值构建实例

1
2
3
4
5
public LinkedBlockingDeque(int capacity) {
    // 值小于1时抛出IllegalArgumentException
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}

默认 Integer.MAX_VALUE 构建实例,且用指定集合的元素作为双端队列的初始元素

1
2
3
4
public LinkedBlockingDeque(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    addAll(c);
}

五、成员方法

把指定节点作为第一个元素存入,如果队列已满返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private boolean linkFirst(Node<E> node) {
    // 检查容量是否已满
    if (count >= capacity)
        return false;

    // 获取第一个节点
    Node<E> f = first;

    // 新节点作为原首节点的前导节点
    node.next = f;
    first = node;

    // 处理尾引用
    if (last == null)
        last = node;
    else
        f.prev = node;

    // 元素数量统计递增
    ++count;

    // 通知其他线程取数据
    notEmpty.signal();
    return true;
}

把指定节点作为最后一个元素存入,如果队列已满返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private boolean linkLast(Node<E> node) {
    // 检查容量是否已满
    if (count >= capacity)
        return false;

    // 获取最后一个节点
    Node<E> l = last;

    // 新节点作为原尾节点的后继节点
    node.prev = l;
    last = node;

    // 处理头引用
    if (first == null)
        first = node;
    else
        l.next = node;

    // 元素数量统计递增
    ++count;

    // 通知其他线程取数据
    notEmpty.signal();
    return true;
}

移除并返回第一个元素,如果为空则返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private E unlinkFirst() {
    // 获取首节点
    Node<E> f = first;

    if (f == null)
        return null;

    // 首节点的下一个节点
    Node<E> n = f.next;

    // 获取首节点的item
    E item = f.item;

    // 清空首节点
    f.item = null;

    // 首节点的后继节点引用为自己
    f.next = f;

    // 处理头引用
    first = n;

    if (n == null)
        last = null;
    else
        n.prev = null;

    // 元素数量统计递减
    --count;

    notFull.signal();
    // 返回首节点的内容
    return item;
}

移除并返回最后一个元素,如果为空则返回null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private E unlinkLast() {
    // 获取尾节点
    Node<E> l = last;
    if (l == null)
        return null;

    // 尾节点的上一个节点
    Node<E> p = l.prev;

    // 获取尾节点的item
    E item = l.item;

    // 清空尾节点
    l.item = null;
    l.prev = l;

    // 处理尾引用
    last = p;

    if (p == null)
        first = null;
    else
        p.next = null;

    // 元素数量统计递减
    --count;

    notFull.signal();
    // 返回尾节点的内容
    return item;
}

从队列中移除x节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void unlink(Node<E> x) {
    Node<E> p = x.prev;
    Node<E> n = x.next;
    
    // p为空表示x是队列的首节点
    if (p == null) {
        unlinkFirst();
    } else if (n == null) {
        // n为空表示x是队列的尾节点
        unlinkLast();
    } else {
        // x是队列的中间节点
        p.next = n;
        n.prev = p;
        x.item = null;
        // 不要置空x.next的节点,因为x节点在iterator里可能在使用
        --count;
        notFull.signal();
    }
}

5.3 阻塞双端队列

两个 add 方法是 offer 方法的变体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public void addFirst(E e) {
    if (!offerFirst(e))
        throw new IllegalStateException("Deque full");
}

public void addLast(E e) {
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}

public boolean add(E e) {
    addLast(e);
    return true;
}

public boolean offer(E e) {
    return offerLast(e);
}

public void put(E e) throws InterruptedException {
    putLast(e);
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    return offerLast(e, timeout, unit);
}

// 默认移除并返回双端队列的头节点
public E remove() {
    return removeFirst();
}

public E poll() {
    return pollFirst();
}

public E take() throws InterruptedException {
    return takeFirst();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    return pollFirst(timeout, unit);
}

// 获取双端队列的头节点,但是获取之后不会移除该节点
public E element() {
    return getFirst();
}

public E peek() {
    return peekFirst();
}

// 返回双端队列可用容量
// 不要通过观察队列剩余容量来确定元素成功插入,因为其他线程也可能在增加或移除元素
public int remainingCapacity() {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        // 计算剩余有效容量
        return capacity - count;
    } finally {
        // 解锁
        lock.unlock();
    }
}

public int drainTo(Collection<? super E> c) {
    return drainTo(c, Integer.MAX_VALUE);
}

public int drainTo(Collection<? super E> c, int maxElements) {
    Objects.requireNonNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        int n = Math.min(maxElements, count);
        for (int i = 0; i < n; i++) {
            c.add(first.item);   // In this order, in case add() throws.
            unlinkFirst();
        }
        return n;
    } finally {
        // 解锁
        lock.unlock();
    }
}

5.4 offer

存入元素为空则抛出NullPointerException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean offerFirst(E e) {
    // 存入元素不能为空
    if (e == null) throw new NullPointerException();
    // 把e封装为新Node
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        return linkFirst(node);
    } finally {
        // 解锁
        lock.unlock();
    }
}

public boolean offerLast(E e) {
    // 存入元素不能为空
    if (e == null) throw new NullPointerException();
    // 把e封装为新Node
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        return linkLast(node);
    } finally {
        // 解锁
        lock.unlock();
    }
}

等待存入元素到队列头时被中断抛出 InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offerFirst(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 存入元素不能为空
    if (e == null) throw new NullPointerException();
    // 创建新节点
    Node<E> node = new Node<E>(e);
    // 换算为纳秒
    long nanos = unit.toNanos(timeout);
    // 获取锁,且锁可中断
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkFirst(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}

等待存入元素到队列尾时被中断抛出 InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean offerLast(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 存入元素不能为空
    if (e == null) throw new NullPointerException();
    // 创建新节点
    Node<E> node = new Node<E>(e);
    // 换算为纳秒
    long nanos = unit.toNanos(timeout);
    // 获取锁,且锁可中断
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkLast(node)) {
            if (nanos <= 0L)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}

5.5 put

阻塞存入元素,当队列已满没有空间存入新元素,以下两个方法会阻塞等待并通知,等待没有设置超时时间。

元素存到队列头部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void putFirst(E e) throws InterruptedException {
    // 存入元素不能为空
    if (e == null) throw new NullPointerException();
    // 把e封装为新Node
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        // 解锁
        lock.unlock();
    }
}

元素存到队列尾部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void putLast(E e) throws InterruptedException {
    // 存入元素不能为空
    if (e == null) throw new NullPointerException();
    // 把e封装为新Node
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();
    } finally {
        // 解锁
        lock.unlock();
    }
}

5.6 remove

若双端队列为空,则抛出NoSuchElementException

1
2
3
4
5
6
7
8
9
10
11
public E removeFirst() {
    E x = pollFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

public E removeLast() {
    E x = pollLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

5.7 poll

先获取线程锁,然后调用对应方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkFirst();
    } finally {
        lock.unlock();
    }
}

public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}

// 带超时的poll
public E pollFirst(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

// 带超时的poll
public E pollLast(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkLast()) == null) {
            if (nanos <= 0L)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

5.8 take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

5.9 get

若队列为空则抛出 NoSuchElementException

1
2
3
4
5
public E getFirst() {
    E x = peekFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

若队列为空则抛出 NoSuchElementException

1
2
3
4
5
public E getLast() {
    E x = peekLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

5.10 peek

获取双端链表的头节点或尾节点,访问后不移出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public E peekFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (first == null) ? null : first.item;
    } finally {
        lock.unlock();
    }
}

public E peekLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (last == null) ? null : last.item;
    } finally {
        lock.unlock();
    }
}

5.11 removeOccurrence

移出第一个命中的指定元素。如果队列存在多个相同元素,每次调用方法仅移除一个元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean removeFirstOccurrence(Object o) {
    if (o == null) return false;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 顺序查找
        for (Node<E> p = first; p != null; p = p.next) {
            if (o.equals(p.item)) {
                unlink(p);
                return true;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}

移出最后一个命中的指定元素。如果队列存在多个相同元素,每次调用方法仅移除一个元素。实现方式为倒序查找。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean removeLastOccurrence(Object o) {
    if (o == null) return false;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 倒叙查找
        for (Node<E> p = last; p != null; p = p.prev) {
            if (o.equals(p.item)) {
                unlink(p);
                return true;
            }
        }
        return false;
    } finally {
        lock.unlock();
    }
}

5.12 栈方法

压栈,即向双端队列头部添加元素

1
2
3
public void push(E e) {
    addFirst(e);
}

弹栈,即从双端队列头部移除节点

1
2
3
public E pop() {
    return removeFirst();
}

5.13 Collection方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// 移除双端队列第一次遇到的指定节点,若节点不存在返回false
public boolean remove(Object o) {
    return removeFirstOccurrence(o);
}

// 返回双端队列已保存节点数量
public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

// 若双端队列包含指定元素返回true,即可队列保存多个相同元素,查找时命中了其中一个
public boolean contains(Object o) {
    if (o == null) return false;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 从队列头部元素开始遍历匹配目标元素
        for (Node<E> p = first; p != null; p = p.next)
            if (o.equals(p.item))
                return true;
        return false;
    } finally {
        lock.unlock();
    }
}

// 把指定集合的元素全部添加到本双端队列尾部中,元素插入的顺序由c的迭代器决定
public boolean addAll(Collection<? extends E> c) {
    // 当插入集合和本实例是同一双端队列,抛出IllegalArgumentException
    if (c == this)
        throw new IllegalArgumentException();

    // 把c所有元素迁移到私有节点链
    Node<E> beg = null, end = null;
    int n = 0;

    // 把c所有元素封装为Node,并连接到beg
    for (E e : c) {
        Objects.requireNonNull(e);
        n++;
        Node<E> newNode = new Node<E>(e);
        if (beg == null)
            beg = end = newNode;
        else {
            // end的后继节点是newNode
            end.next = newNode;
            // newNode的前导节点是end
            newNode.prev = end;
            // 尾引用从end移到newNode
            end = newNode;
        }
    }
    
    // 集合c没有元素,或里面的元素全为null
    if (beg == null)
        return false;

    // 把元素原子性地插入到队列尾部中
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        if (count + n <= capacity) {
            beg.prev = last;
            // 原队列为null
            if (first == null)
                // beg作为头结点
                first = beg;
            else
                // 元素插入到队列尾部
                last.next = beg;
            // 更新尾引用
            last = end;
            // 更新队列元素总数量
            count += n;
            // 通知其他线程获取元素
            notEmpty.signalAll();
            return true;
        }
    } finally {
        lock.unlock();
    }

    // 当元素插入容量溢出导致IllegalStateException时,回到旧的非原子性的实现方法
    return super.addAll(c);
}

5.14 toArray

返回包含双端队列所有元素的数组,元素顺序和双端队列元素顺序一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
s@SuppressWarnings("unchecked")
public Object[] toArray() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 创建新数组,大小为count
        Object[] a = new Object[count];
        int k = 0;
        for (Node<E> p = first; p != null; p = p.next)
            // 把队列的元素逐个放入数组中
            a[k++] = p.item;
        // 返回数组
        return a;
    } finally {
        // 解锁
        lock.unlock();
    }
}

传入目标数组,并把双端队列所有元素放入该数组,元素顺序和双端队列元素顺序一致。返回数组的类型和传入数组类型相同。

如果传入数组大小不足容纳所有元素,方法会创建新数组,容量和所放入元素数量一致,放入所有元素后返回新数组。如果传入数组空间足够存入所有元素,该数组的下一个空间会被置为 null

本方法可以实现队列转数组的功能:String[] y = x.toArray(new String[0]);。且值得注意的是,传入 toArray(new Object[0]) 和 传入 toArray() 效果完全相同。

传入 数组元素的运行类型,不是 本队列元素的运行类型 的子类时,抛出 ArrayStoreException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
    final ReentrantLock lock = this.lock;
    // 上锁
    lock.lock();
    try {
        // 双端队列元素数量比指定数组空间大,则需要创建新数组并赋值给a
        if (a.length < count)
            // 新数组类型为a.getClass().getComponentType(),大小为count
            a = (T[])java.lang.reflect.Array.newInstance
                (a.getClass().getComponentType(), count);
                
        // 初始数组索引k
        int k = 0;
        for (Node<E> p = first; p != null; p = p.next)
            // 把队列的元素逐个放入数组中
            a[k++] = (T)p.item;
        if (a.length > k)
            // 置空索引k的值
            a[k] = null;
        // 返回数组
        return a;
    } finally {
        // 解锁
        lock.unlock();
    }
}

5.15 clear

原子性地从双端队列移除所有元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void clear() {
    final ReentrantLock lock = this.lock;
    // 获取锁
    lock.lock();
    try {
        for (Node<E> f = first; f != null; ) {
            // 清空节点负载
            f.item = null;
            // 解除节点f的链接
            Node<E> n = f.next;
            // 清空前导引用
            f.prev = null;
            // 清空后继引用
            f.next = null;
            f = n;
        }
        first = last = null; // 头尾指针置空
        count = 0; // 总元素数置0
        notFull.signalAll();
    } finally {
        // 解锁
        lock.unlock();
    }
}

5.16 succ

不完全上锁的情况下进行元素遍历,此遍历需要应付以下两个问题:

  • 已出队节点 (p.next == p)
  • 多个内部的可能已移除的节点 (p.item == null)
1
2
3
4
5
Node<E> succ(Node<E> p) {
    if (p == (p = p.next))
        p = first;
    return p;
}

5.17 checkInvariants

1
2
3
4
5
6
7
8
9
void checkInvariants() {
    // assert lock.isHeldByCurrentThread();
    // Nodes may get self-linked or lose their item, but only
    // after being unlinked and becoming unreachable from first.
    for (Node<E> p = first; p != null; p = p.next) {
        // assert p.next != p;
        // assert p.item != null;
    }
}