Java-BlockingQueue阻塞队列

![BlockingQueue阻塞队列](/images/BlockingQueue 阻塞队列xmind.png)

Queue接口

Queue队列的特征是FIFO——先进先出。只有队尾可以进行插入操作,只有队头可以进行删除操作。

img

Java中的Queue继承了Collection接口,并额外实现了以下方法

1
2
3
4
5
6
7
8
public interface Queue<E> extends Collection<E> {
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
}

队列插入数据操作(add/offer)

将新数据插入队尾

add:如果队列满时,插入队尾数据,就会抛出IllegalStateExecption

offer:如果队列满时,插入队尾数据,不会抛出异常,会返回false

队列删除数据操作(remove/poll)

获取队头元素并删除

remove:当队列为空时,删除元素时,会抛出NoSuchElementException

poll:当队列为空时,删除元素时,不会抛出异常,只会返回null

队列检查数据操作(element/peek)

获取队头元素但并不删除

element:当队列为空时,获取队头元素,会抛出NoSuchElementException

peek:当队列为空时,获取队头元素,不会抛出异常,只会返回null

抛出异常 特殊值
插入 add() offer() 返回 false
删除 remove() poll() 返回null
获取数据(检查) element() peek() 返回null

如果把队列相关方法放在一起看,可以先形成一个统一认知:

  • 异常型:失败时直接抛异常,适合把“当前操作一定要成功”视为前提的场景
  • 特殊值型:失败时返回null/false,适合调用方自己决定后续分支

后面的BlockingQueue其实是在这套方法体系上,继续扩展出了阻塞型超时型两类操作。

BlockingQueue接口

系统提供的用于多线程环境下存、取共享变量的一种容器。

BlockingQueue阻塞队列实现了Queue接口,相比于Queue提供了额外的功能:

  • 获取队头元素时,如果队列为空,执行线程会处于阻塞状态,直到队列非空——对应删除和检查操作
  • 添加队尾元素时,如果队列已满,执行线程会处于阻塞状态,直到队列不满——对应插入操作

当触发上述两种情况的出现时,按照不同的设置方式,提供了以下几种处理方案:

  • 抛出异常
  • 返回特殊值(返回nullfalse)
  • 阻塞当前线程直到可以执行
  • 阻塞线程设置最大超时时间,若超过该时间,线程就会继续执行,放弃当次操作

因此从方法家族的角度看,BlockingQueue其实可以统一理解成四类语义:

  • 异常型add/remove/element
  • 特殊值型offer/poll/peek
  • 阻塞型put/take
  • 超时型offer(time)/poll(time)

不过也要注意:BlockingQueue解决的是生产者-消费者之间的协作问题,并不等于“只要用了阻塞队列就天然高并发”。不同实现类在锁粒度、容量模型和吞吐特征上差异很大。

阻塞队列插入数据操作(put/offer(time))

对应阻塞队列在队列已满插入数据时的阻塞或者超时处理

put:如果队列满时,插入队尾数据,会阻塞当前线程直到队列非满

offer(time):如果队列满时,插入队尾数据,会阻塞当前线程直到队列非满或者达到了超时时间,达到超时时间则返回false

这两个方法都可能在等待过程中被中断,因此都需要正确处理InterruptedException,而不是默认认为“阻塞就一定会一直等到成功”。

阻塞队列删除数据操作(take()/poll(time))

对应阻塞队列在队列为空时获取数据时的阻塞超时处理

take():当队列为空时,删除元素时,会阻塞当前线程直到队列非空

poll(time):当队列为空时,删除元素时,会阻塞当前线程直到队列非空或者达到了超时时间,达到超时时间返回null

同样地,take()poll(time)也都需要面对中断语义:线程如果在等待过程中被中断,当前阻塞操作会提前结束。

阻塞队列检查数据操作

抛出异常 特殊值 阻塞 超时
插入 add() offer() 返回 false put() offer(time)返回false
删除 remove() poll() 返回null take() poll(time)返回null
获取数据(检查) element() peek() 返回null / /

注意点

  1. 阻塞队列无法插入null,否则抛出空指针异常
  2. 可以访问阻塞队列中的任意元素,尽量避免使用remove(object)移除对象
  3. 不要轻易使用size()做并发控制判断,因为在多线程场景下它更适合观察状态,而不是作为强一致业务条件

BlockingQueue实现类

Java-线程池中的workQueue设置的就是BlockingQueue接口的实现类,

例如

  • ArrayBlockingQueue:数组构成的有界阻塞队列
  • LinkedBlockingQueue:链表构成的有界阻塞队列,如果不设置大小的话,近似无界阻塞队列
  • SynchronousQueue:不存储任何元素的阻塞队列
  • PriorityBlockingQueue:支持优先级排序的无界阻塞队列

BlockingQueue原理

BlockingQueue只是一个接口,真正的实现都是在XXBloxckingQueue中的,想要分析对应的原理就需要从实现类进行分析

ArrayBlockingQueue

由数组实现的有界阻塞队列,大小一旦确定就无法改变队列的长度。

关键成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** The queued items 维护队列元素的数组*/
final Object[] items;

/** items index for next take, poll, peek or remove 移除数据的数组下标*/
int takeIndex;

/** items index for next put, offer, or add 插入数据的数组下标*/
int putIndex;

/** Number of elements in the queue 数组长度*/
int count;

/** Main lock guarding all access 数据并发控制类*/
final ReentrantLock lock;

/** Condition for waiting takes 控制take操作是否让线程等待*/
private final Condition notEmpty;

/** Condition for waiting puts 控制put操作是否让线程等待*/
private final Condition notFull;

ArrayBlockingQueue阻塞功能的实现就是依赖了ReentrantLock以及Condition实现了等待机制

具体可参考Java-ReentrantLock原理及解析

构造函数
1
2
3
4
5
6
7
8
9
10
11
12
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

capacity:设置阻塞队列的数组容量

fair:设置线程并发是否公平(默认配置非公平锁)

当前锁被一个线程持有时,其他线程会被挂起等待锁的释放,等待时加入等待队列。

公平锁:当锁释放时,等待队列的前端线程会优先获取锁

非公平锁:当锁释放时,等待队列中的所有线程都会去尝试获取锁

ArrayBlockingQueue初始化时,构造ReentrantLock锁以及两个Condition对象控制数据插入、删除时的阻塞。

实现方法

offer() 非阻塞添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 public boolean offer(E e) {
Objects.requireNonNull(e);//检查将要添加的数据是否为null
final ReentrantLock lock = this.lock;
lock.lock();//上锁
try {
if (count == items.length)//队列已满
return false;
else {
enqueue(e);//数据加入队列
return true;
}
} finally {
lock.unlock();//解锁
}
}

//数据入队
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;//数组赋值
//如果此时放入的是最后一个下标的数据,重置下标为0,下一次从第一个开始放元素
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();//通知 数组非空
}

offer()添加数据时,将当前线程上锁。

  • 在当前队列已满时,直接返回false
  • 当前队列未满时,调用enqueue()添加数据,putIndex设置对应数据且putIndex++。 然后通知阻塞的消费线程notEmpty

poll() 非阻塞取出数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();//当前队列非空 取出数据
} finally {
lock.unlock();
}
}

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//如果此时取出的是最后一个下标的数据,重置下标为0,下一次从第一个开始取出元素
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();//数据迭代减少,保证遍历线程安全
notFull.signal();//通知 数组不满
return x;
}

poll()取出数据时,将当前线程上锁

  • 当前队列为空的时候,直接返回null
  • 当前队列非空的时候,调用dequeue()takeIndex元素出队,设置takeIndex处元素为nulltakeIndex--。然后通知阻塞的生产线程notFull

offer(time)不超时阻塞添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

offer(time)添加数据时,将当前线程上锁

  • 在当前队列已满时,阻塞生产线程notFull,超过time后,队列还是满的话,直接返回false
  • 当前队列未满时,调用enqueue()添加数据,putIndex设置对应数据且putIndex++。 然后通知阻塞的消费者notEmpty

poll(time)不超时阻塞取出数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

poll(time)取出数据时,将当前线程上锁

  • 当前队列为空的时候,阻塞消费线程notEmpty,超过time后,队列还是空的话,直接返回null
  • 当前队列非空的时候,调用dequeue()takeIndex元素出队,设置takeIndex处元素为nulltakeIndex--。然后通知阻塞的生产者notFull
put()阻塞添加数据
1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//生产者线程上锁
try {
while (count == items.length)
notFull.await();//等待消费者线程通知
enqueue(e);
} finally {
lock.unlock();
}
}

put()添加数据时

  • 当前队列已满时,阻塞当前线程,等待notFull通知(队列未满)
  • 当前队列未满时,调用enqueue()添加数据,putIndex设置对应数据且putIndex++。 然后通知阻塞的消费者notEmpty
take()阻塞获取数据
1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//消费者线程上锁
try {
while (count == 0)
notEmpty.await();//等待生产者线程通知
return dequeue();
} finally {
lock.unlock();
}
}

take()获取数据时

  • 当前队列为空时,阻塞当前线程,等待notEmpty通知(队列新增数据)
  • 当前队列非空的时候,调用dequeue()takeIndex元素出队,设置takeIndex处元素为nulltakeIndex--。然后通知阻塞的生产者notFull

其中Conditionawait/signal类似于Objectwait/notify实现等待与通知的功能。

在分析enqueue()dequeue()时,发现底层数组不会进行扩容,而是在到达边缘时,重置index为0,重复利用数组。

ArrayBlockingQueue循环数组

从上述源码对ArrayBlockingQueue进行总结:

底层数据结构是一个 数组,生产者和消费者由同一个锁(ReetrantLock)控制,生产和消费效率低。

如果再概括得更完整一点,ArrayBlockingQueue有三个非常鲜明的特征:

  • 有界:容量固定,天然具备背压能力
  • 单锁:生产和消费会共享同一把锁,竞争更集中
  • 循环数组:内存结构紧凑,不需要为每个元素额外创建链表节点

因此它更适合:希望明确限制队列容量、控制内存上界,并接受生产消费存在更多互斥的场景。

LinkedBlockingQueue

由链表实现的阻塞队列,默认最大长度为Integer.MAX

关键成员变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** The capacity bound, or Integer.MAX_VALUE if none 链表最大长度*/
private final int capacity;

/** Current number of elements 当前元素个数*/
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
* 链表头节点
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
* 链表尾节点
*/
private transient Node<E> last;

/** Lock held by take, poll, etc 控制消费并发*/
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes 控制take线程等待 非空条件*/
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc 控制生产并发*/
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts 控制put线程等待 非满条件*/
private final Condition notFull = putLock.newCondition();

LinkedBlockingQueue采用了两把锁putLock、takeLock,分别进行控制,提高了并发性能。

构造函数
1
2
3
4
5
6
7
8
9
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

capacity:设置单链表长度上限,若不设置该值,默认为Integer.MAX

构造函数初始化了底层的链表结构。

实现方法

offer()非阻塞添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity) //达到上限直接返回false
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();//线程上锁
try {
if (count.get() < capacity) {
enqueue(node);//插入链表
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();//唤醒 等待的入队线程
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//唤醒等待的 出队线程
return c >= 0;
}

private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;//赋值操作
}

offer添加数据时,当队列已满时,直接返回false。未满时,插入新数据后,count自加后唤醒notFull、notEmpty

poll()非阻塞获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;//队列为空返回null
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();//出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//通知非空线程
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//通知非满线程
return x;
}

private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

poll获取数据时,队列为空时,直接返回null。队列非空时,获取数据后,数据出队,count自减后,先后唤醒notEmptynotFull

put()阻塞添加数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();//队列已满时,等待非满通知
}
enqueue(node);//插入新数据
c = count.getAndIncrement();//数据自增
if (c + 1 < capacity)
notFull.signal();//通知非满线程
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//通知非空线程
}

put()添加数据时,队列已满时,会进行阻塞等待直到队列非满。

take()阻塞获取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();//队列为空时,等待非空通知
}
x = dequeue();//出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//通知非空线程
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//通知非满线程
return x;
}

take()获取数据时,队列为空时,会进行阻塞等到直到队列非空。

img

从上述源码对LinkedBlockingQueue进行总结:

LinkedBlockingQueue底层数据结构为单链表,内部持有两个Lock:putLock、takeLock,相互之间不会干扰执行,提高了并发性能。

如果从使用角度总结,LinkedBlockingQueue还有几个关键特征:

  • 默认容量近似无界,如果不显式指定大小,任务和数据可能持续堆积
  • 由于生产和消费拆成两把锁,吞吐通常会优于单锁队列
  • 链表节点需要额外对象开销,内存占用通常会比数组队列更高
ArrayBlockingQueue比较
ArrayBlockingQueue LinkedBlockingQueue
构造方法 必须指定构造大小
指定后无法修改
默认大小为Integer.MAX
可以指定大小
底层数据结构 数组 单链表
出队入队使用同一把锁
数据的删除和添加操作互斥
出队使用takeLock,入队使用putLock
数据删除、添加操作不干扰,提升并发性能

如果从实际选型角度来记会更直接:

  • 想要明确容量上限、天然背压、稳定内存占用,优先考虑ArrayBlockingQueue
  • 想要更灵活的容量和更高的并发吞吐,可以考虑LinkedBlockingQueue
  • 但线程池里如果直接使用默认无界LinkedBlockingQueue,要特别小心任务无限堆积导致内存风险

SynchronousQueue

容量为0,无法储存数据的阻塞队列。提供了公平与非公平锁的设置。

这里最容易误解的一点是:SynchronousQueue并不是“容量为1”,而是真正意义上的容量为0。它不会把元素放进内部容器里等待后续消费,而是要求每一次生产都要和一次消费直接配对完成,属于一种**直接移交(handoff)**机制。

关键成员变量
1
2
3
4
5
6
7
//针对不同操作定义的统一接口
private transient volatile Transferer<E> transferer;

abstract static class Transferer<E> {
//e为空则表示 需要获取数据;e不为空表示 需要添加数据
abstract E transfer(E e, boolean timed, long nanos);
}
构造函数
1
2
3
4
5
6
7
8
public SynchronousQueue() {
this(false);//默认非公平构造
}

public SynchronousQueue(boolean fair) {
//公平与非公平对应两种实现形式
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
实现方法

数据操作offer()/poll() put()/take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}

public E poll() {
return transferer.transfer(null, true, 0);
}

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}

public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}

上述的数据操作方法都涉及到了两部分内容:

  • transferer:数据调度
  • transfer:数据执行
TransferQueue

SynchronousQueue公平实现,内部实现使用队列,可以保证先进先出的特性。

基本实现方法:

当前队列为空的时候或者存在了与即将添加的QNode操作模式一致(isData一致)的节点,线程进行同步等待,QNode添加到队列中。

继续等待与队列头部QNode操作互补(写操作(isData = true),等待一个读操作(isData = false))的QNode节点

新添加的QNode节点与队头QNode操作互补时,尝试通过CAS更新等待节点的item字段,然后让队头等待节点出列,并返回节点元素和更新队头节点。

如果队列中的节点的waiter等待线程被取消,节点也会被清理出队列。

TransferQueue公平队列

isData:true 表示put操作,false表示take操作。

next:下一个节点

waiter:当前等待的线程

item:元素信息

TransferStack

SynchronousQueue非公平实现,内部实现使用,实现了先进后出的特性。

基本实现方法:

当前栈为空或者存在了与即将添加的SNode模式相同的节点(mode一致),线程进行同步等待,SNode添加到栈中

继续等待与栈顶SNode操作互补(写操作(mode = DATA),读操作(mode=REQUEST))的节点

出现与栈顶SNode操作互补的节点后,新增SNode节点的mode会变为FULFILLING,与栈顶节点匹配,匹配完成后,将俩节点都弹出并返回匹配节点的结果。

如果栈顶元素找到匹配节点,就会继续向下帮助匹配(此时上一个匹配操作还没结束又进入一个新的请求)

TransferStack

next:下一个元素

item:元素信息

waiter:当前等待的线程

match:匹配的节点

modeDATA(1)-添加数据、REQUEST(0)-获取数据、FULFILLING(2)-互补模式

特点
  • SynchronousQueue容量为0,无法进行数据存储
  • 每次写入数据时,写线程都需要等待;直到另一个线程执行读操作,写线程会返回数据。写入元素不能为null
  • peek()返回nullsize()返回0;无法进行迭代操作
  • 提供了公平非公平两种策略处理,分别是基于Queue-TransferQueueStack-TransferStack实现。

也正因为它不缓存元素,SynchronousQueue特别适合那种“任务不要排队,来了就直接找线程接手,否则就促使线程池扩容或触发拒绝策略”的场景。

原理介绍

绝大多数都是利用了Lock锁的多条件(Condition)阻塞控制

ArrayBlockingQueue进行简单描述就是:

  1. puttake操作都需要先获取锁,无法获取的话就要一直自旋拿锁,直到获取锁为止
  2. 在拿到锁以后。还需要判断当前队列是否可用(队列非满且非空),如果队列不可用就会被阻塞,并释放锁
  3. 阻塞的线程被唤醒时,依然需要在拿到锁之后才可以继续执行,否则,自旋拿锁,拿到锁继续判断当前队列是否可用(使用while判断)

使用场景

生产-消费模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PCDemo {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize, true);

public static void main(String[] args) {

PCDemo blockQueue = new PCDemo();
Producter producter = blockQueue.new Producter();
Customer customer = blockQueue.new Customer();

producter.start();
customer.start();
}

class Customer extends Thread {
@Override
public void run() {
while (true) {
try {
queue.take();
System.err.println("消费哦,剩余空间为" + queue.size());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Producter extends Thread {
@Override
public void run() {
while (true) {
try {
queue.put(1);
System.err.println("生产哦,剩余空间为" + (queueSize - queue.size()));
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
线程池
Java-线程池

BlockingQueue在线程池中的作用,不只是“把任务存起来”这么简单,它会直接影响线程池的行为策略:

  • 使用LinkedBlockingQueue时,任务更容易先排队,线程数往往不容易继续扩到maximumPoolSize
  • 使用ArrayBlockingQueue时,队列容量有限,任务堆积到一定程度后更容易促使线程池扩容
  • 使用SynchronousQueue时,由于队列本身不缓存任务,任务提交更像直接交给工作线程处理,因此更容易触发线程数扩张

也就是说,线程池中的workQueue本质上决定的是:任务优先排队、优先扩容,还是优先直接移交。

拓展知识

常见误用与风险

  • 误把无界LinkedBlockingQueue当成“安全默认值”,结果在高峰时持续堆积任务
  • 在不合适的线程中直接调用put()/take(),导致主线程、核心业务线程被阻塞
  • 忽略InterruptedException,导致线程无法优雅退出
  • 依赖size()来做严格并发判断,结果和真实队列状态产生竞态
  • 使用remove(object)这类按值删除操作,导致额外遍历成本和并发语义复杂化

所以从工程实践上看,BlockingQueue更适合承担削峰、缓冲、解耦、等待协作这些职责,而不是作为“随便塞任务进去总没问题”的万能容器。

与其他队列的区别

  • BlockingQueue:强调阻塞协作,适合生产者-消费者模型
  • ConcurrentLinkedQueue:强调非阻塞并发访问,不提供put/take这类等待能力
  • Deque:强调双端操作语义,是否阻塞要看具体实现

因此选型时最好先问自己:当前需求到底是要阻塞等待要高并发无锁访问,还是要双端操作能力,再决定是不是应该上BlockingQueue

Guarded Suspension(保护性暂时挂起)

当服务进程准备好时,才提供服务。

img

本质是一种等待唤醒机制的实现,也称为多线程的if

基本实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class GuardedObject<T>{
private T obj;
private final ReetrantLock lock = new ReentrantLock();
private final Condition done = lock.newCondition();

public T get(Predicate<T> p){
lock.lock();
try{
while(!p.test(obj)){
done.await(); //等待事件执行
}
}catch(Exception e){
e.printStacktrace();
}finally{
lock.unlock();
}
return obj;
}

public void onChange(T obj){
lock.lock();
try{
this.obj = obj;
done.signAll();//数据发生变化,进行通知
}finally{
lock.unlock();
}
}
}

参考链接

SynchronousQueue-公平模式

SynchronousQueue


Java-BlockingQueue阻塞队列
https://leo-wxy.github.io/2018/12/24/Java-BockingQueue阻塞队列/
作者
Leo-Wxy
发布于
2018年12月24日
许可协议