Java-AbstractQueuedSynchronizer简介
AQS简介
AQS全称为AbstractQueuedSynchronizer
,意为抽象队列同步器
。
Abstract
:抽象类,只实现主要逻辑,其他交由子类实现Queued
:FIFO
队列存储数据Synchronizer
:同步
在Lock
中,是非常重要的核心组件。AQS
是用来构建锁和同步器的框架,使用AQS
可以简单且高效构建同步器。我们常见的ReentrantLock、CountdownLatch
都是基于AQS
构建的。
AQS
主要做了三件事情:
- 同步状态的管理
- 线程的阻塞和唤醒
- 同步队列的维护
AQS同步方式
从使用层面来讲,AQS同步方式分为以下两种:
独占模式(Exclusive)
资源是独占的,一次只能有一个线程获取。
例如ReentrantLock
共享模式(Share)
资源是共享的,可以被多个线程同时获取并访问,还可以指定允许访问的资源个数。
例如CountdownLatch
、Semaphore
混合模式(mixed)
将两种模式混合在一起进行使用,可以在特定条件下进行独占
或共享
资源。
例如ReentrantReadWhiteLock
AQS的数据结构
AQS依赖内部的一个FIFO双端队列
实现同步状态(state
)的管理,并且使用了head
和tail
分别表示队列的首尾节点。
队列中储存的是Node
节点,其中包含了当前线程以及等待状态信息。
state
表示资源当前状态。
1 |
|
同时定义了了几个关于state
的方法,提供给子类覆盖实现自身逻辑。
例如:
ReentrantLock
:表示的资源为独占锁
,state=0
表示没持有锁,state=1
表示锁被占用,state>1
表示了锁重入次数。
CountdownLatch
:表示的资源为计数
,state=0
表示计数器归零,可以被其他线程访问资源,state>0
表示所有线程访问资源时都需要阻塞。
1 |
|
Node
AQS内部等待队列的节点
1 |
|
prev
:当前节点的上一个节点
next
:当前节点的下一个节点
thread
:当前节点持有的线程
waitStatus
:当前节点的状态
nextWaiter
:下一个处于CONDITION
状态的节点
Node
是一个变体CLH
的节点,CLH
应用了自旋锁,节点保存了当前阻塞线程的信息。如果他的前驱节点释放了,就需要通过修改waitStatus
字段出队前驱节点,让当前节点尝试获取锁。若有新的等待线程要入队,就会加入到队列的尾部。
其中waitStatus
有以下几种状态:
waitStatus | 值 | 描述 |
---|---|---|
SIGNAL | -1 | 表示当前节点的后续节点被阻塞或即将被阻塞,当前节点释放或取消后需要唤醒后续节点。一般是后续节点来设置前驱节点的。 |
CANCELLED | 1 | 表示当前节点超时或被中断,需要移出等待队列 |
CONDITION | -2 | 表示当前节点在Condition 队列中,阻塞等待某个条件唤醒 |
PROPAGATE | -3 | 适用于共享模式(连续的操作节点依次进入临界区),用于将唤醒后续线程传递下去,为了完善和增强锁的唤醒机制。 |
INIT | 0 | 节点初始创建处于该状态 |
通过
Node
可以实现两个队列
- 通过
prev
、next
实现双向队列- 通过
nextWaiter
实现Condition
的单向等待队列
ConditionObject
用于实现
Condition
功能的内部类,直接作用于线程,对线程进行调度
1 |
|
由Node
组成的单向队列。
AQS原理解析
子类实现方法
AQS
的设计是基于模板方法模式(定义基本功能后,将一些实现延迟到子类)的,所以其中一些方法必须交由子类去实现。
isHeldExclusively()-是否独占资源
该线程是否正在独占资源。只有需要用到Condition
才需要去实现该方法
tryAcquire()-获取独占资源
独占方式获取资源,成功获取返回true
,失败返回false
tryRelease()-释放独占资源
独占方式释放资源,成功释放返回true
,失败返回false
tryAcquireShared()-获取共享资源
共享方式获取资源
- 返回
负数
,表示资源获取失败 - 返回
0
,表示获取成功,但没有多余资源可获取 - 返回
>0
,表示获取成功,且有剩余资源
tryReleaseShared()-释放共享资源
共享方式释放资源
- 释放资源后,允许唤醒后续等待节点,返回
true
- 释放资源后,没有后续等待节点,返回
false
子类主要实现上述几个方法,主要逻辑还是在AQS
内部进行实现。
获取资源-独占模式
获取资源的入口是acquire(int arg)
。arg
是要获取资源的个数
独占模式
:arg = 1共享模式
:arg >= 0
1 |
|
tryAcquire(int arg)
子类实现的模板方法,在介绍ReentrantLock
时会分析内部实现
addWaiter(Node.EXCLUSIVE)
只有在tryAcquire()
获取资源失败时,才会执行到该方法,将当前线程初始化为一个Node
节点,加入到等待队列
中。其中Node.EXCLUSIVE
表示当前锁是独占的。
1 |
|
这一步的操作是为了,在等待队列的尾部插入新Node
节点,但是可能存在多个线程同时争夺资源的情况,因为在插入节点时需要做线程安全操作,这里就是通过CAS
保证线程操作的安全性。
- 执行
tryAcquire()
失败后,将当前线程初始化为一个Node
节点,加入到AQS
等待队列中-调用addWaiter()
- 第一次加入等待队列,此时尚未初始化完成,
head
,tail
都为null
- 就需要在执行
enq()
将等待队列初始化,并插入Node
节点,头节点为空线程 - 后续再有新的申请进来后,
Node
节点直接插入到等待队列的尾部
为什么头节点为空线程?
此处的头节点
head
起到了一个哨兵的作用,免去后续查找过程中的越界判断
。
acquireQueued(node,arg)
经过addWaiter()
之后,线程加入到等待队列中,但是线程还没有被挂起等待,而acquireQueued()
去执行线程挂起的相关操作。
1 |
|
若前一个节点是head
,那么再次调用tryAcquire()
去竞争锁;竞争失败了,就执行shouldParkAfterFailedAcquire()
判断是否将自己的线程挂起
1 |
|
线程能否挂起的判断条件:
前一个节点的
waitStatus
必须是SIGNAL(-1)
,因为后面unlock()
会去唤醒waitStatus
为SIGNAL
的线程去争夺锁。
若shouldParkAfterFailedAcquire()
判断需要将当前线程挂起,则继续执行parkAndCheckInterrupt()
挂起当前线程。
1 |
|
parkAndCheckInterrupt()
内部调用到了LockSupport.park()
,该方法主要用于中断一个线程。
LockSupport
是Java 6
后引入的一个类,提供了基本的线程同步原语
。内部实际调用了
Unsafe
的函数。主要提供了两个方法:
park()
:阻塞当前线程unpark(thread)
:使thread
停止阻塞
在后续新增的节点进入AQS等待队列
后,是通过LockSupport.park()
使线程进入阻塞状态。
LockSupport.park()
遇到以下情况时,会立即中断阻塞状态
- 其他线程调用了
unpark()
停止了当前线程的阻塞状态 - 其他线程中断了当前线程
1 |
|
结合以上代码的运行结果可知以下几点:
- 当一个线程
park()
时,其他线程中断该线程时,线程会立即恢复,且中断标记为true
还不会抛出InterruptedException
异常 - 当一个线程的中断标记为
true
时,调用park()
无法挂起线程
所以这就是为什么parkAndCheckInterrupt()
返回了Thread.interrupted()
去重置中断标记。
interrupt()
:打一个中断标记,但不会中断当前线程
isInterrupted()
:返回当前线程的中断标记,如果执行过interrupt()
则返回true
,表示当前线程被中断过
interrupted()
:返回当前线程的中断标记,如果执行过interrupt()
则返回true
,表示当前线程被中断过。但是多执行了一步复位操作,后续调用isInterrupted()
返回false
。
若不执行线程复位
操作,后续对当前线程执行LockSupport.park()
时,挂起操作无法生效,就会导致发生死循环,耗尽资源。
简单文字概述
AQS-获取资源过程
- 尝试获取资源——
tryAcquire()
- 获取资源失败,请求入队列——
addWaiter(Node.EXCLUSIVE)
- 根据传入的模式(
EXCUSIVE
)创造节点(Node
)- 判断尾节点(
tail
)是否存在,不存在使用enq(node)
初始化节点head、tail
;存在tail
,请求节点插入尾部- 使用
CAS自旋
插入请求到尾端,插入失败的话,调用enq(node)
自旋插入直到成功- 请求入队列后,需要不断去获取资源——
acquireQueued(node)
- 不断获取当前节点的上一个节点是否为
head
,若是,则表示当前节点为请求节点
- 若是
请求节点
,不断的调用tryAcquire()
获取资源,获取成功执行setHead()
- 若当前非
head
后的第一个请求节点
或者tryAcquire()
请求资源失败,需要通过shouldParkAfterFailedAcquire()
判断当前节点是否需要阻塞(判断前一个节点waitStatus == NODE.SIGNAL
)- 若需要阻塞则执行
parkAndCheckInterrupt()
实质执行LockSupport.park()
cancelAcquire()
acquireQueued()
执行到finally
时就会执行该方法
1 |
|
//TODO 补齐流程分析
释放资源-独占模式
释放资源的入口是release(int arg)
,arg
为释放资源的个数
1 |
|
tryRelease()
子类实现的模板方法,在介绍ReentrantLock
时会分析内部实现
unparkSuccessor()
tryRelease()
解锁成功后,执行该方法
1 |
|
如果不存在后续节点或后续节点被取消,就会从AQS等待队列
的末尾从后往前遍历,就是为了避免找不到节点的情况,有可能在构造节点时,尚未构造next
的值,导致无法继续向后遍历,但是向前的话一开始节点构造时就会设置prev
节点数据。
找到了需要被唤醒的节点(waitStatus == SIGNAL(-1)
)后,执行LockSupport.unpark()
唤醒节点对应线程。
acquireQueued()
上面的方法执行到LockSupport.unpark()
后,就会唤醒对应的线程
1 |
|
此时parkAndCheckInterrupt()
会继续执行,代码执行回到acquireQueued()
的for循环中
此时资源已被释放,后续线程执行tryAcquire()
就会获取资源成功,向下执行到setHead()
并跳出了当前的循环
1 |
|
setHead()
重置了一下head
节点的属性,将当前节点置为了head
节点,原先的就移出队列,等待回收。
return interrupted
继续回到上层方法acquire()
中,中断掉当前线程,release()
执行完毕。
简单文字描述AQS-资源释放过程
- 通过
tryRelease()
释放资源,返回true
表示资源已经被释放了,通知其他节点可以获取资源- 释放成功后,执行
unparkSuccessor()
取消其他线程的阻塞状态- 通过
从后往前遍历(入队列采用尾插法)
直到找到一个有效节点(waitStatus<=0)
,在执行LockSupport.unpark()
取消对应节点thread
的阻塞状态
获取资源-共享模式
获取共享资源的入口是acquireShared()/acquireSharedInterruptibly()
1 |
|
其中acquireShared()
和acquireSharedInterruptibly()
的区别在于后者可以响应中断,请求线程被中断时,就会抛出异常结束请求。
tryAcquireShared()
子类实现的模板方法,在介绍CountdownLatch
时会分析内部实现
doAcquireShared()
只有在tryAcquireShared()
返回值小于0(获取共享资源失败
)时执行,tryAcquireShared()
有三种返回结果:
小于0
:获取共享资源失败等于0
:获取共享资源成功,但后续节点无法获取共享资源大于0
:获取共享资源成功,后续节点也可能继续获取共享资源。需要检查后续节点请求的可用性
1 |
|
获取共享资源失败后,先调用addWaiter(Node.SHARED)
添加共享节点到等待队列,在循环中不断判断preNode == head
,如果符合继续尝试获取共享资源,若获取成功,执行setHeadAndPropagate()
去设置头节点并唤醒后续节点;获取失败,则当前线程判断是否需要挂起(preNode.waitStatus == Node.SIGNAL(-1)
),需要挂起执行LockSupport.park()
。
setHeadAndPropagate()
获取到共享资源后调用该方法,主要的作用是设置当前节点为头节点,同时唤醒后续节点
1 |
|
propagate > 0
是tryAcquireShared()
的返回值,>0
表示后续节点可以继续获取资源
waitStatus < 0
此时存在两种情况
waitStatus == SIGNAL(-1)
下一个节点可以被唤醒waitStatus == PROPAGATE(-3)
继续传播状态
doReleaseShared()
获取共享资源后且tryAcquireShared()> 0
表示后续节点也可以获取资源,并且waitStatus < 0 即 -1
可以唤醒后续等待的线程
1 |
|
在等待队列存在后续线程的情况下,继续唤醒后续线程(unparkSuccessor()
)。或者由于多个线程同时释放,导致head.waitStatus==0
,需要设置waitStatus
为PROPAGATE
将唤醒状态继续向下传递,保证后续其他线程执行setHeadAndPropagate()
时可以继续释放等待线程。
简单文字描述AQS-获取共享资源
- 通过
tryAcquireShared()
尝试获取资源- 若
tryAcquireShared()
返回值<0
表示获取资源失败,向下继续调用doAcquireShared()
- 请求入队列执行
addWaiter(Node.SHARED)
,操作步骤同AQS获取资源过程
- 请求入队列后,需要不断去获取资源
- 不断获取当前节点的上一个节点是否为
head
,若是,则表示当前节点为请求节点
- 若是
请求节点
,不断调用tryAcquireShared()
继续获取共享资源
- 获取成功,执行
setHeadAndPropagate()
去设置头节点,并且唤醒后续节点——doReleaseShared()
- 获取失败,执行
LockSupport.unpark()
挂起当前线程
释放资源-共享模式
释放共享资源的入口是releaseShared()
1 |
|
tryReleaseShared()
子类实现的模板方法,在介绍CountdownLatch
时会分析内部实现
doReleaseShared()
Condition
Java-AQS-Condition原理及解析总结
AQS到底是什么?
AQS
内部维护一个CLH队列(FIFO)
来管理锁,将当前线程(thread)以及等待状态信息(waitStatus)
封装成一个Node节点
添加到等待队列
中。提供了
tryAcquire(),tryRelease(),tryAcquireShared(),tryReleaseShare()
等模板方法交由子类实现,去控制资源的获取与释放
。AQS
默认实现子类获取/释放资源后的操作,包括Node节点的出入队列
。AQS获取资源失败的操作
线程尝试获取锁失败后,,将
当前线程(thread)以及等待状态信息(waitStatus)
封装成一个Node节点
添加到等待队列
中。接着会不断循环尝试获取锁(前置节点为head
),如果不是进入阻塞状态,直至被唤醒。AQS等待队列数据结构
CLH队列
:- CLH锁是一个自旋锁,可以保证无饥饿性,提供
FIFO
的公平性。基于链表实现。 - 不断轮询
前置节点
的状态,如果前置节点被释放就结束自旋。
- CLH锁是一个自旋锁,可以保证无饥饿性,提供
AQS等待队列插入节点顺序
尾插法
addWaiter(node)
就是插入节点的主方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15> private Node addWaiter(Node mode) {
> Node node = new Node(Thread.currentThread(), mode);
> // Try the fast path of enq; backup to full enq on failure
> Node pred = tail;
> if (pred != null) {
> node.prev = pred;//node.prev = tail
> if (compareAndSetTail(pred, node)) { //tail = node 大致如此
> pred.next = node;
> return node;
> }
> }
> enq(node);
> return node;
> }
>先执行的是
node.prev = pred(实际为tail)
,然后再是CAS操作,这是由于CAS在执行过程中可能存在一瞬间的需要替换的值为null,会使得一瞬间的队列数据不一致。
参考链接
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!