Java中的阻塞队列

Java中的阻塞队列,可以基于BlockingQueue接口自定义实现多种自定义阻塞队列。阻塞队列最经典的应用主要是帮助线程池管理任务,是一个经典的等待/通知(生产者/消费者)模型。这里根据源码主要介绍几种比较经典的实现,并提炼出一些经典范式。






















add remove 抛出异常
put take 等待直到中断
offer poll 直接返回
offer(time) poll(time) 设定等待时间

LinkedBlockingQueue(基于链表的有界阻塞队列)

Executors.newFixedThreadPool(根据需要可重用部分消费者线程数)和Executors.newSingledThreadPool(单个消费者线程)都使用了LinkedBlockingQueue无界队列来管理任务。

put/take(double lock+wait/notify)

通过双锁来增加队列的并发度,通过wait/notify进行等待/通知。因为通常无界队列默认可以容下Integer.MAX_VALUE的任务。

Design Bounded Blocking Queue

设计一个阻塞队列,通过等待通知机制设计一个阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package P1188;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 设计一个有界阻塞队列
* Design Bounded Blocking Queue
* @autor yeqiaozhu.
* @date 2019-10-08
*/
public class Solution {

//定义一个队列
private Queue<Integer> queue;
//队列的capacity 限制容量
private Integer size;
//定义一个lock
private ReentrantLock reentrantLock=new ReentrantLock();

//定义两个队列
private Condition notEmpty=reentrantLock.newCondition();

private Condition notFull=reentrantLock.newCondition();

public Solution(Integer size) {
this.queue=new LinkedList();
this.size=size;
}

public void add(Integer integer) throws InterruptedException {
reentrantLock.lockInterruptibly();
try {
while (queue.size()==size){
notFull.await();
}
queue.add(integer);
notEmpty.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}

}

public void remove() throws InterruptedException {
reentrantLock.lockInterruptibly();

try {
while (queue.size()==0){
notEmpty.await();
}
queue.remove();
notFull.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
}