Queue接口 Queue队列
的特征是FIFO——先进先出
。只有队尾可以进行插入操作,只有队头可以进行删除操作。
Java中的Queue
继承了Collection
接口,并额外实现了以下方法
1 2 3 4 5 6 7 8 public interface Queue <E > extends Collection <E > { boolean add (E e) ; boolean offer (E e) ; E remove () ; E poll () ; E element () ; E peek () ; }
队列插入数据操作(add/offer
)
将新数据插入队尾
add
:如果队列满时,插入队尾数据,就会抛出IllegalStateExecption
offer
:如果队列满时,插入队尾数据,不会抛出异常,会返回false
队列删除数据操作(remove/poll
)
获取队头元素并删除
remove
:当队列为空时,删除元素时,会抛出NoSuchElementException
poll
:当队列为空时,删除元素时,不会抛出异常,只会返回null
队列检查数据操作(element/peek
)
获取队头元素但并不删除
element
:当队列为空时,获取队头元素,会抛出NoSuchElementException
peek
:当队列为空时,获取队头元素,不会抛出异常,只会返回null
抛出异常
特殊值
插入
add()
offer()
返回 false
删除
remove()
poll()
返回null
获取数据(检查)
element()
peek()
返回null
BlockingQueue接口 系统提供的用于多线程环境下存、取共享变量的一种容器。
BlockingQueue阻塞队列
实现了Queue接口
,相比于Queue
提供了额外的功能:
获取队头元素时,如果队列为空,执行线程会处于阻塞状态,直到队列非空——对应删除和检查操作
添加队尾元素时,如果队列已满,执行线程会处于阻塞状态,直到队列不满——对应插入操作
当触发上述两种情况的出现时,按照不同的设置方式,提供了以下几种处理方案:
抛出异常
返回特殊值(返回null
或false
)
阻塞当前线程直到可以执行
阻塞线程设置最大超时时间,若超过该时间,线程就会继续执行,放弃当次操作
阻塞队列插入数据操作(put/offer(time)
)
对应阻塞队列在队列已满插入数据时的阻塞
或者超时处理
put
:如果队列满时,插入队尾数据,会阻塞当前线程直到队列非满
offer(time)
:如果队列满时,插入队尾数据,会阻塞当前线程直到队列非满或者达到了超时时间
,达到超时时间则返回false
阻塞队列删除数据操作(take()/poll(time)
)
对应阻塞队列在队列为空时获取数据时的阻塞
或超时处理
take()
:当队列为空时,删除元素时,会阻塞当前线程直到队列非空
poll(time)
:当队列为空时,删除元素时,会阻塞当前线程直到队列非空或者达到了超时时间
,达到超时时间返回null
阻塞队列检查数据操作
抛出异常
特殊值
阻塞
超时
插入
add()
offer()
返回 false
put()
offer(time)
返回false
删除
remove()
poll()
返回null
take()
poll(time)
返回null
获取数据(检查)
element()
peek()
返回null
/
/
注意点
阻塞队列无法插入null
,否则抛出空指针异常
可以访问阻塞队列中的任意元素,尽量避免使用remove(object)
移除对象
BlockingQueue
实现类在{% post_link Java-线程池%}中的workQueue
设置的就是BlockingQueue
接口的实现类,
例如
ArrayBlockingQueue
:数组构成的有界阻塞队列
LinkedBlockingQueue
:链表构成的有界阻塞队列,如果不设置大小的话,近似无界阻塞队列
SynchronousQueue
:不存储任何元素的阻塞队列
PriorityBlockingQueue
:支持优先级排序的无界阻塞队列
BlockingQueue
原理BlockingQueue
只是一个接口,真正的实现都是在XXBloxckingQueue
中的,想要分析对应的原理就需要从实现类进行分析
ArrayBlockingQueue
由数组实现的有界阻塞队列,大小一旦确定就无法改变队列的长度。
关键成员变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 final Object[] items; int takeIndex;int putIndex;int count;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;
ArrayBlockingQueue
阻塞功能的实现就是依赖了ReentrantLock
以及Condition
实现了等待机制
。
具体可参考
构造函数 1 2 3 4 5 6 7 8 9 10 11 12 public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
capacity
:设置阻塞队列的数组容量
fair
:设置线程并发是否公平(默认配置非公平锁
)
当前锁被一个线程持有时,其他线程会被挂起等待锁的释放,等待时加入等待队列。
公平锁
:当锁释放时,等待队列的前端线程会优先获取锁
非公平锁
:当锁释放时,等待队列中的所有线程都会去尝试获取锁
在ArrayBlockingQueue
初始化时,构造ReentrantLock
锁以及两个Condition
对象控制数据插入、删除时的阻塞。
实现方法 offer()
非阻塞添加数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public boolean offer (E e) { Objects.requireNonNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } } private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
offer()
添加数据时,将当前线程上锁。
在当前队列已满时,直接返回false
当前队列未满时,调用enqueue()
添加数据,putIndex
设置对应数据且putIndex++
。 然后通知阻塞的消费线程notEmpty
poll()
非阻塞取出数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } }private E dequeue () { final Object[] items = this .items; @SuppressWarnings ("unchecked" ) E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
poll()
取出数据时,将当前线程上锁
当前队列为空的时候,直接返回null
当前队列非空的时候,调用dequeue()
将takeIndex
元素出队,设置takeIndex
处元素为null
且takeIndex--
。然后通知阻塞的生产线程notFull
offer(time)
不超时阻塞添加数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { Objects.requireNonNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0L ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true ; } finally { lock.unlock(); } }
offer(time)
添加数据时,将当前线程上锁
在当前队列已满时,阻塞生产线程notFull
,超过time
后,队列还是满的话,直接返回false
当前队列未满时,调用enqueue()
添加数据,putIndex
设置对应数据且putIndex++
。 然后通知阻塞的消费者notEmpty
poll(time)
不超时阻塞取出数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0L ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
poll(time)
取出数据时,将当前线程上锁
当前队列为空的时候,阻塞消费线程notEmpty
,超过time
后,队列还是空的话,直接返回null
当前队列非空的时候,调用dequeue()
将takeIndex
元素出队,设置takeIndex
处元素为null
且takeIndex--
。然后通知阻塞的生产者notFull
put()
阻塞添加数据1 2 3 4 5 6 7 8 9 10 11 12 public void put (E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
put()
添加数据时
当前队列已满时,阻塞当前线程,等待notFull
通知(队列未满
)
当前队列未满时,调用enqueue()
添加数据,putIndex
设置对应数据且putIndex++
。 然后通知阻塞的消费者notEmpty
take()
阻塞获取数据1 2 3 4 5 6 7 8 9 10 11 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
take()
获取数据时
当前队列为空时,阻塞当前线程,等待notEmpty
通知(队列新增数据
)
当前队列非空的时候,调用dequeue()
将takeIndex
元素出队,设置takeIndex
处元素为null
且takeIndex--
。然后通知阻塞的生产者notFull
其中Condition
的await/signal
类似于Object
的wait/notify
实现等待与通知的功能。
在分析enqueue()
和dequeue()
时,发现底层数组不会进行扩容,而是在到达边缘时,重置index
为0,重复利用数组。
从上述源码对ArrayBlockingQueue
进行总结:
底层数据结构是一个 数组,生产者和消费者由同一个锁(ReetrantLock
)控制,生产和消费效率低。
LinkedBlockingQueue
由链表实现的阻塞队列,默认最大长度为Integer.MAX
。
关键成员变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private final int capacity;private final AtomicInteger count = new AtomicInteger();transient Node<E> head;private transient Node<E> last;private final ReentrantLock takeLock = new ReentrantLock();private final Condition notEmpty = takeLock.newCondition();private final ReentrantLock putLock = new ReentrantLock();private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue
采用了两把锁putLock、takeLock
,分别进行控制,提高了并发性能。
构造函数 1 2 3 4 5 6 7 8 9 public LinkedBlockingQueue () { this (Integer.MAX_VALUE); }public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .capacity = capacity; last = head = new Node<E>(null ); }
capacity
:设置单链表长度上限,若不设置该值,默认为Integer.MAX
构造函数初始化了底层的链表结构。
实现方法 offer()
非阻塞添加数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public boolean offer (E e) { if (e == null ) throw new NullPointerException(); final AtomicInteger count = this .count; if (count.get() == capacity) return false ; int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return c >= 0 ; }private void enqueue (Node<E> node) { last = last.next = node; }
offer
添加数据时,当队列已满时,直接返回false
。未满时,插入新数据后,count
自加后唤醒notFull、notEmpty
。
poll()
非阻塞获取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; E x = null ; int c = -1 ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() > 0 ) { x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
poll
获取数据时,队列为空时,直接返回null
。队列非空时,获取数据后,数据出队,count
自减后,先后唤醒notEmpty
、notFull
。
put()
阻塞添加数据1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
put()
添加数据时,队列已满时,会进行阻塞等待直到队列非满。
take()
阻塞获取数据1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
take()
获取数据时,队列为空时,会进行阻塞等到直到队列非空。
从上述源码对LinkedBlockingQueue
进行总结:
LinkedBlockingQueue
底层数据结构为单链表
,内部持有两个Lock:putLock、takeLock
,相互之间不会干扰执行,提高了并发性能。
与ArrayBlockingQueue
比较
ArrayBlockingQueue
LinkedBlockingQueue
构造方法
必须指定构造大小 指定后无法修改
默认大小为Integer.MAX
可以指定大小
底层数据结构
数组
单链表
锁
出队入队使用同一把锁 数据的删除和添加操作互斥
出队使用takeLock
,入队使用putLock
数据删除、添加操作不干扰,提升并发性能
SynchronousQueue
容量为0,无法储存数据的阻塞队列。提供了公平与非公平锁的设置。
关键成员变量 1 2 3 4 5 6 7 private transient volatile Transferer<E> transferer; abstract static class Transferer <E > { abstract E transfer (E e, boolean timed, long nanos) ; }
构造函数 1 2 3 4 5 6 7 8 public SynchronousQueue () { this (false ); }public SynchronousQueue (boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
实现方法 数据操作offer()/poll() put()/take()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public boolean offer (E e) { if (e == null ) throw new NullPointerException(); return transferer.transfer(e, true , 0 ) != null ; }public E poll () { return transferer.transfer(null , true , 0 ); }public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); if (transferer.transfer(e, false , 0 ) == null ) { Thread.interrupted(); throw new InterruptedException(); } }public E take () throws InterruptedException { E e = transferer.transfer(null , false , 0 ); if (e != null ) return e; Thread.interrupted(); throw new InterruptedException(); }
上述的数据操作方法都涉及到了两部分内容:
transferer
:数据调度
transfer
:数据执行
TransferQueue
SynchronousQueue
的公平 实现,内部实现使用队列,可以保证先进先出的特性。
基本实现方法:
当前队列为空的时候或者存在了与即将添加的QNode
操作模式一致(isData一致
)的节点,线程进行同步等待,QNode
添加到队列中。
继续等待与队列头部QNode
操作互补(写操作(isData = true),等待一个读操作(isData = false)
)的QNode
节点
新添加的QNode
节点与队头QNode
操作互补时,尝试通过CAS
更新等待节点的item字段,然后让队头等待节点出列,并返回节点元素和更新队头节点。
如果队列中的节点的waiter
等待线程被取消,节点也会被清理出队列。
isData
:true 表示put
操作,false表示take
操作。
next
:下一个节点
waiter
:当前等待的线程
item
:元素信息
TransferStack
SynchronousQueue
非公平实现,内部实现使用栈 ,实现了先进后出
的特性。
基本实现方法:
当前栈为空或者存在了与即将添加的SNode
模式相同的节点(mode一致
),线程进行同步等待,SNode
添加到栈中
继续等待与栈顶SNode
操作互补(写操作(mode = DATA),读操作(mode=REQUEST)
)的节点
出现与栈顶SNode
操作互补的节点后,新增SNode
节点的mode
会变为FULFILLING
,与栈顶节点匹配,匹配完成后,将俩节点都弹出并返回匹配节点的结果。
如果栈顶元素找到匹配节点,就会继续向下帮助匹配(此时上一个匹配操作还没结束又进入一个新的请求
)
next
:下一个元素
item
:元素信息
waiter
:当前等待的线程
match
:匹配的节点
mode
:DATA(1)
-添加数据、REQUEST(0)
-获取数据、FULFILLING(2)
-互补模式
特点
SynchronousQueue
容量为0,无法进行数据存储
每次写入数据时,写线程都需要等待;直到另一个线程执行读操作,写线程会返回数据。写入元素不能为null
peek()
返回null
;size()
返回0
;无法进行迭代操作
提供了公平
,非公平
两种策略处理,分别是基于Queue-TransferQueue
与Stack-TransferStack
实现。
原理介绍 绝大多数都是利用了Lock锁的多条件(Condition)阻塞控制 。
拿ArrayBlockigQueue
进行简单描述就是:
put
和take
操作都需要先获取锁 ,无法获取的话就要一直自旋拿锁,直到获取锁为止
在拿到锁以后。还需要判断当前队列是否可用(队列非满且非空
),如果队列不可用就会被阻塞,并释放锁
阻塞的线程被唤醒时,依然需要在拿到锁之后才可以继续执行,否则,自旋拿锁,拿到锁继续判断当前队列是否可用(使用while判断 )
使用场景 生产-消费模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class PCDemo { private int queueSize = 10 ; private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize, true ); public static void main (String[] args) { PCDemo blockQueue = new PCDemo(); Producter producter = blockQueue.new Producter(); Customer customer = blockQueue.new Customer(); producter.start(); customer.start(); } class Customer extends Thread { @Override public void run () { while (true ) { try { queue.take(); System.err.println("消费哦,剩余空间为" + queue.size()); Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } class Producter extends Thread { @Override public void run () { while (true ) { try { queue.put(1 ); System.err.println("生产哦,剩余空间为" + (queueSize - queue.size())); Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
线程池 {% post_link Java-线程池%}
拓展知识 Guarded Suspension(保护性暂时挂起) 当服务进程准备好时,才提供服务。
本质是一种等待唤醒机制的实现 ,也称为多线程的if
基本实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class GuardedObject <T > { private T obj; private final ReetrantLock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public T get (Predicate<T> p) { lock.lock(); try { while (!p.test(obj)){ done.await(); } }catch (Exception e){ e.printStacktrace(); }finally { lock.unlock(); } return obj; } public void onChange (T obj) { lock.lock(); try { this .obj = obj; done.signAll(); }finally { lock.unlock(); } } }
参考链接 SynchronousQueue-公平模式
SynchronousQueue