一、类签名
ArrayBlockingQueue 为有界数组。队列元素顺序为先进先出。队头指针元素在队列内保存时间最长,队尾指针元素在队列内保存时间最短。新元素插入到队尾,而遍历操作则从队头获取元素。
1
2
| public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
|
这是传统的有界缓冲,元素保存在固定大小的数组中,由生产者插入元素、消费者取出元素。一旦创建成功,数组容量不能再修改。向已满的队列存入元素会导致操作阻塞,同样,从空队列中取出元素也会引起阻塞。
本队列支持可选的公平策略,以便安排正在等待的生产者和消费者。该公平特性默认是不保证的。然而,创建实例时把 fairness 设为 true ,能保证线程按照FIFO的方式获取。公平特性会降低吞吐量,但也能降低可变性并避免线程饥饿。
源码版本:JDK11
二、成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // 保存元素的数组
final Object[] items;
// 下次可执行take、poll、peek或remove操作的位置索引
int takeIndex;
// 下次可执行put、offer或add操作的位置索引
int putIndex;
// 队列已保存元素数量
int count;
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
|
三、构造方法
构造方法内初始化保存元素的环形数组,同时也初始化同步锁。形参 fair 控制队列是否支持公平锁,一般建议使用非公平锁。因为公平锁为了平衡读写线程的优先级,需要牺牲吞吐量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| // 支持自定义公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 使用指定集合初始化队列,支持自定义公平锁
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 先调用上述构造方法进行初始化
this(capacity, fair);
// 获取锁,处理集合c的元素
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Object[] items = this.items;
int i = 0;
try {
// 把指定集合的元素放入队列
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
// 如果c的元素数量超过capacity,抛出ArrayIndexOutOfBoundsException
throw new IllegalArgumentException();
}
// 更新已保存元素数量
count = i;
// 更新存入指针的索引
putIndex = (i == capacity) ? 0 : i;
} finally {
// 操作完成,解锁
lock.unlock();
}
}
|
四、实例方法
4.1 增
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| // 在指定索引插入元素,方法在调用前需要先获取锁
private void enqueue(E e) {
final Object[] items = this.items;
// 索引插入元素
items[putIndex] = e;
// 计算下一个插入元素的有效索引
if (++putIndex == items.length) putIndex = 0;
// 已保存元素递增
count++;
notEmpty.signal();
}
// 如果没有超过队列容量,则指定元素直接插入到队列的尾部
// 此方法插入失败直接抛出异常,所以建议使用方法offer(E e)
public boolean add(E e) {
return super.add(e);
}
// 如果没有超过队列容量,则指定元素直接插入到队列的尾部
public boolean offer(E e) {
// 插入元素不能为空
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
// 先获取锁
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 插入指定元素到队列的尾部,如果队列空间已满则阻塞等待
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 循环一直等待队列出现空间
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// 插入指定元素到队列的尾部,如果队列空间已满则阻塞等待,可自定义超时时间
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 队列没有空间则继续等待
while (count == items.length) {
if (nanos <= 0L)
// 元素没有插入,返回false
return false;
// 计算剩余超时时间
nanos = notFull.awaitNanos(nanos);
}
// 队列出现空余且操作没有超时,元素存入队列
enqueue(e);
// 成功插入元素,返回true
return true;
} finally {
lock.unlock();
}
}
|
4.2 取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
| // 从takeIndex索引获取元素,由take()方法调用
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 获取索引的元素,即头引用的元素
E e = (E) items[takeIndex];
items[takeIndex] = null;
// 索引后移
if (++takeIndex == items.length) takeIndex = 0;
// 已保存元素数量递减
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
// 从队列取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 循环一直等待队列非空
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// 从队列取元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 从队列取元素,如果队列空间已空则阻塞等待,可自定义超时时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
|
4.3 查
因为是个环形数组,所以查找时需要注意开始索引和结束索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| public boolean contains(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 快速失败,如果队列为空,不进行任何查找操作
if (count > 0) {
final Object[] items = this.items;
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
// 先遍历唤醒数组的后段,元素没有命中,再遍历数组前段
for (; i < to; i++)
if (o.equals(items[i]))
return true;
// 全部元素遍历完成,没有找到目标元素
if (to == end) break;
}
}
// 队列为空,或队列不存在目标元素
return false;
} finally {
lock.unlock();
}
}
|
4.4 清除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
| // 根据索引移出元素
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// 移除的元素是队列头元素,该元素置空,指针后移即可
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// 移除的元素在环形队列内部
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
// 移除中间的元素
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
// 后续元素需逐一向前移动一个位置
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
// 从队列中移除指定元素
public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final Object[] items = this.items;
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++)
// 要先遍历队列才能找到指定元素
if (o.equals(items[i])) {
// 元素命中则移除命中目标
removeAt(i);
// 移除成功返回true
return true;
}
// 队列已经完全遍历完毕,没有发现指定元素,退出遍历
if (to == end) break;
}
}
// 没有找到指定元素
return false;
} finally {
lock.unlock();
}
}
public void clear() {
final ReentrantLock lock = this.lock;
// 获取锁,开始清除所有元素
lock.lock();
try {
int k;
if ((k = count) > 0) {
circularClear(items, takeIndex, putIndex);
// 重置takeIndex和putIndex
takeIndex = putIndex;
// 已保存元素数量置0
count = 0;
// 清除迭代器
if (itrs != null)
itrs.queueIsEmpty();
// 通知所有等待的Waiters
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
// 清除环形队列内所有元素
private static void circularClear(Object[] items, int i, int end) {
for (int to = (i < end) ? end : items.length;
; i = 0, to = end) {
for (; i < to; i++) items[i] = null;
if (to == end) break;
}
}
|
4.5 取余计算
由于这个是环形队列,所以需要处理队列尾部和队列头部索引的关系
1
2
3
4
5
6
7
8
9
10
11
| // 处理队列尾部递增的情况
static final int inc(int i, int modulus) {
if (++i >= modulus) i = 0;
return i;
}
// 处理队列头部递减的情况
static final int dec(int i, int modulus) {
if (--i < 0) i = modulus - 1;
return i;
}
|