一、介绍
AQS 是AbstractQueuedSynchronizer
(抽象队列同步器)的简称,AQS 对应的类在 java.util.concurrent.locks
包下的三个抽象类:
AbstractOwnableSynchronizer
:下面2个的公共父类AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer
:通常 AQS 指的是这个
AQS
是用来实现锁或其他同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个 JUC 的基石,主要用于解决锁分配给谁的问题,整体就是一个抽象的 FIFO 队列来完成资源获取线程的排队工作,并通过,一个 int 类型的变量表示锁的状态(state)。
AQS
为实现阻塞锁和相关同步器提供一个框架,它是依赖于先进先出的等待,依靠单原子 int 值来表示状态,通过占用和释放方法,改变状态值。
- 锁:面向锁的使用者(定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可)
- 同步器:面向锁的实现者(比如Java并发大神Douglee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。)
加锁会导致阻塞、有阻塞就需要排队,实现排队必然需要队列
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及 LockSuport.park()
的方式,维护state变量的状态,使并发达到同步的效果
以下是用到 AQS 的案例
二、详细说明
AbstractQueuedSynchronizer
的官方介绍
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里volatile是核心关键词,具体volatile的语义,在此不述。state的访问方式有三种:
- getState
- setState
- compareAndSetState
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively
:该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)
:独占方式。尝试获取资源,成功则返回true,失败则返回false。tryRelease(int)
:独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。当state值为正数时,代表重入次数。A线程lock时,会调用tryAcquire独占该锁并将state+1。此后,其他线程再tryAcquire时就会失败,直到A线程unlock到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark主调用线程,然后主调用线程就会从await函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
子类应该被定义为用于实现其外围类的同步属性的非公共内部helper类。类AbstractQueuedSynchronizer不实现任何同步接口。相反,它定义了诸如acquireinterruptible这样的方法,具体的锁和相关的同步器可以适当地调用这些方法来实现它们的公共方法。
该类支持默认独占模式和共享模式。当以独占模式获取时,其他线程试图获取是无法成功的。多个线程获取的共享模式可能(但不一定)成功。这个类不“理解”这些区别,除了在机械意义上,当一个共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。在不同模式下等待的线程共享相同的FIFO队列。通常,实现子类只支持其中一种模式,但这两种模式都可以发挥作用,例如在ReadWriteLock中。只支持独占模式或仅支持共享模式的子类不需要定义支持未使用模式的方法。
这个类定义了一个嵌套的AbstractQueuedSynchronizer。支持独占模式的子类可以使用ConditionObject类作为条件实现,对于这种模式,方法isheldexexclusive报告是否针对当前线程独占保持同步,用当前getState值调用的方法release完全释放这个对象,并获得这个保存的状态值,最终将这个对象恢复到它之前获得的状态。否则,不会有AbstractQueuedSynchronizer方法创建这样的条件,因此如果不能满足此约束,就不要使用它。AbstractQueuedSynchronizer的行为。条件对象当然依赖于它的同步器实现的语义。
这个类提供了内部队列的检查、检测和监视方法,以及条件对象的类似方法。可以根据需要使用AbstractQueuedSynchronizer导出它们的同步机制到类中。
这个类的序列化只存储底层的原子整数维护状态,因此反序列化的对象具有空的线程队列。需要序列化的典型子类将定义一个readObject方法,在反序列化时将其恢复到已知的初始状态。
使用
要使用这个类作为同步器的基础,可以使用getState、setState和/或compareAndSetState检查和/或修改同步状态,重新定义以下方法:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
这些方法在默认情况下都会抛出UnsupportedOperationException。这些方法的实现必须是内部线程安全的,并且通常应该很短而不是阻塞。定义这些方法是使用该类的唯一受支持的方法。所有其他方法都被声明为final,因为它们不能被独立地改变。
你可能还会发现从AbstractOwnableSynchronizer继承的方法对于跟踪拥有一个独占同步器的线程很有用。我们鼓励您使用它们——这使得监控和诊断工具能够帮助用户确定哪些线程持有锁。
尽管这个类基于内部FIFO队列,但它不会自动执行FIFO获取策略。独占同步的核心形式为:
Acquire(获取):
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release(释放):
if (tryRelease(arg))
unblock the first queued thread;
(共享模式类似,但可能涉及级联信号。)
因为获取中的检入是在排队之前调用的,所以一个新获取的线程可能会在其他被阻塞和排队的线程之前插入。但是,如果需要,您可以定义tryAcquire和/或tryacquiresred,通过内部调用一个或多个检查方法来禁用barging,从而提供一个公平的FIFO获取顺序。特别是,大多数公平同步器可以定义tryAcquire在hasqueuedcursors(一个专门为公平同步器使用而设计的方法)返回true时返回false。其他变化也是可能的。
对于默认的barging(也称为贪婪、放弃和车队回避)策略,吞吐量和可伸缩性通常是最高的。虽然不能保证这是公平的或不挨饿的,但允许较早队列的线程在较晚队列的线程之前重新竞争,并且每次重新竞争都有公平的机会成功地战胜传入的线程。此外,虽然acquire在通常意义上不会“旋转”,但在阻塞之前,它们可能会执行多次tryAcquire调用,其间穿插着其他计算。当独占同步只是短暂地进行时,这就提供了自旋的大部分好处,而当它没有进行时,则没有大部分的缺点。如果需要,您可以通过前面的调用来获得具有“快速路径”检查的方法来增强这一点,可能会预先检查hasContended和/或hasQueuedThreads,以便仅在同步器可能不存在争用的情况下才这样做。
这个类为同步提供了一个有效的、可伸缩的基础,部分是通过将其使用范围专门用于可以依赖int状态、获取和释放参数的同步器,以及一个内部FIFO等待队列。当这还不够时,您可以使用原子类、您自己的定制java.util.Queue类和LockSupport阻塞支持从较低级别构建同步器。
用法示例
下面是一个不可重入的互斥锁类,它使用值0表示解锁状态,1表示锁定状态。虽然不可重入锁并不严格要求记录当前所有者线程,但这个类这样做是为了更容易监控使用情况。它还支持条件并公开一些检测方法:
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Reports whether in locked state
public boolean isLocked() {
return getState() != 0;
}
public boolean isHeldExclusively() {
// a data race, but safe due to out-of-thin-air guarantees
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Provides a Condition
public Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isLocked(); }
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
三、AQS
源码
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
/**
* 创建一个厨师同步状态为 0 的实例
*/
protected AbstractQueuedSynchronizer() { }
/**
* 等待队列的头节点, 懒加载。除了初始化,它只通过方法setHead修改。
* 注意:如果head存在,它的waitStatus不为CANCELLED(取消)
*/
private transient volatile Node head;
/**
* 等待队列的尾部,惰性初始化。修改仅通过方法enq添加新的等待节点。
*/
private transient volatile Node tail;
/**
* 同步状态
*/
private volatile int state;
/**
* CAS
*/
protected final boolean compareAndSetState(int expect, int update){...}
/**
* 将节点插入队列
*/
private Node enq(Node node){...}
/**
* 内部类 - 节点
*/
static final class Node{...}
/**
* 独占模式下持有锁的线程
*/
private transient Thread exclusiveOwnerThread;
...
}
四、Node 内部类
static final class Node {
/** 标记,表示节点在共享模式下等待 */
static final Node SHARED = new Node();
/** 标记,表示节点在独占模式下等待 */
static final Node EXCLUSIVE = null;
/** waitStatus 定义的常量值,表示线程已取消 */
static final int CANCELLED = 1;
/** waitStatus 定义的常量值,表示后继线程需要唤醒 */
static final int SIGNAL = -1;
/** waitStatus 定义的常量值,表示线程在等待condition唤醒 */
static final int CONDITION = -2;
/** waitStatus 定义的常量值,共享式同步状态获取将会无条件的传播下去 */
static final int PROPAGATE = -3;
/** 初始为 0 ,状态是上面几种 */
volatile int waitStatus;
/** 上一个节点 */
volatile Node prev;
/** 下一个节点 */
volatile Node next;
/** 当前节点对应的线程 */
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
...
}
1、waitStatus
这些值按数字排列以简化使用。非负值意味着节点不需要发出信号(有效状态)。所以,大多数代码不需要检查特定的值,只需要检查符号。
对于正常同步节点,该字段初始化为0,对于条件节点,该字段初始化为CONDITION。它使用CAS修改(或者在可能的情况下,使用无条件的volatile写入)。
- SIGNAL
SIGNAL: The successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels. To avoid races, acquire methods must first indicate they need a signal, then retry the atomic acquire, and then, on failure, block.
该节点的后继节点被(或即将被)阻塞(通过park),因此当前节点释放或取消时必须解除后继节点。为了避免竞争,获取方法必须首先表明它们需要一个信号,然后重试原子获取,然后在失败时阻塞。
- CANCELLED
CANCELLED: This node is cancelled due to timeout or interrupt.Nodes never leave this state. In particular,a thread with cancelled node never again blocks.
该节点因超时或中断被取消。节点永远不会离开这个状态。特别是,具有取消节点的线程不会再次阻塞。
- CONDITION
This node is currently on a condition queue.It will not be used as a sync queue node until transferred, at which time the status will be set to 0. (Use of this value here has nothing to do with the other uses of the field, but simplifies mechanics.)
此节点当前处于条件队列中。在传输之前,它不会被用作同步队列节点,此时状态将被设置为0。(在此使用该值与该领域的其他用途无关,但可以简化机制。)
- PROPAGATE
A releaseShared should be propagated to other nodes. This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.
应该将 releasshared
传播到其他节点。这是在 doreleasshared
中设置的(仅针对头节点),以确保传播继续,即使其他操作已经介入。
五、ReentrantLock源码
0、要点
ReentrantLock是 AQS 独占模式的实现之一
state初始值为0,表示未锁定状态。当state值为正数时,代表重入次数。
acquire(int)
和release(int)
方法的入参均为 1在调用
acquire(int)
时,state+1,重入次数加一;release(int)
时,state-1
1、构造方法
public class ReentrantLock implements Lock, java.io.Serializable {
// 提供所有执行机制的同步器
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
// 非公平锁的同步器
static final class NonfairSync extends Sync{...}
// 公平锁的同步器
static final class FairSync extends Sync {...}
/** 构造方法,创建非公平锁 */
public ReentrantLock() {
sync = new NonfairSync();
}
/** 构造方法,参数:true,公平锁;false,非公平锁 */
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
}
构造方法通过 private final Sync sync;
抽象类,根据传入的参数的不同创建不同的 Sync 实现类并赋值给 sync
字段。
2、加锁acquire(int)
- 公平锁:线程在获取锁时,如果这个锁的等待队列中已经有线程在等待了,那么当前线程就会进入等待队列
- 非公平锁:不管是否有等待队列,如果可以获取到锁,则立即占用锁对象。也就是说队列的第一个排队线程苏醒后,不一定能获取到锁,他还是需要竞争锁,可以理解为后来的线程(非在队列中的线程)可能插队。
非公平锁
当调用 lock.lock();
方法时,实际上是调用 Sync 对应实现类中的 lock()
方法
/**
* 用于非公平锁的同步对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 尝试获取锁,失败立即返回
*/
final void lock() {
// 通过 CAS 将 state 的状态值改为 1,已被占用状态
if (compareAndSetState(0, 1))
// 如果 state 设值成功,则将当前线程设置为独占模式同步的所有者。
setExclusiveOwnerThread(Thread.currentThread());
else
// CAS 失败,立即返回,内部其实是调用下面的 tryAcquire(1)
acquire(1);
}
/**
* 该方法具体实现取决于实现类
* 参数arg:这个值被传递给tryRelease,但是没有被解释,可以表示任何你喜欢的东西。
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取状态值,默认为0,未加锁
int c = getState();
if (c == 0) {
//当未加锁时,通过cas设置状态值
if (compareAndSetState(0, acquires)) {
//如果设置值成功,则将独占模式的拥有者设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//如果当前状态不为0,即已加锁,并且锁的持有者是当前线程
//则状态值加一,即重入次数加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// 内部其实是调用下面的 tryAcquire(1)
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果 AQS state为0,即处于为锁定状态
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可以看出公平锁和非公平锁的 lock()
方法唯一的区别就在于公平锁在获取同步状态时,多了一个限制条件,hasQueuedPredecessors()
,该方法是公平锁加锁时判断等待队列中是否存在有效节点的方法。
acquire(int)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock的语义,当然不仅仅只限于lock。获取到资源后,线程就可以去执行其临界区代码了。
函数流程如下:
tryAcquire
尝试直接去获取资源,如果成功则直接返回;addWaiter
将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued
使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。- 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt
,将中断补上。
(1)tryAcquire(int)
此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
上面方法是定义在 AbstractQueuedSynchronizer
中,具体的实现在需要查看其实现类-具体看上面的 5-2 加锁
(2)addWaiter(Node)
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) {
//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//获取等待队列最后一个节点,如果最后一个节点为空,则队列为空,直接入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队。
enq(node);
return node;
}
Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
- CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
- SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
- CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
- PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
- 0状态:值为0,代表初始化状态。
AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。
enq(Node)
此方法用于将node加入队尾。
private Node enq(final Node node) {
//CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) {
// 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {
//正常流程,放入队尾
//双向链表的操作,设置前后节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
队列为空时,添加的空节点称为哨兵节点/虚拟节点,作用是占位。
(3)acquireQueued(Node, int)
通过 tryAcquire
和 addWaiter
,该线程获取资源失败,已经被放入等待队列尾部了。
线程下一步就是,进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功拿到资源
boolean failed = true;
try {
//标记等待过程中是否被中断过
boolean interrupted = false;
// 自旋
for (;;) {
//拿到前一个节点
final Node p = node.predecessor();
//如果前一个节点是head,即该结点已成老二,那么便有资格去尝试获取资源
//(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
if (p == head && tryAcquire(arg)) {
//拿到资源后,将head指向该结点。
//所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
setHead(node);
// setHead中node.prev已置为null,此处再将head.next置为null,
//就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
p.next = null; // help GC
failed = false;
//返回等待过程中是否被中断过
return interrupted;
}
//如果自己可以休息了,就进入waiting状态,直到被unpark
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()){
//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结下该函数的具体流程:
- 结点进入队尾后,检查状态,找到安全休息点;
- 调用park进入waiting状态,等待unpark或interrupt唤醒自己;
- 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。
shouldParkAfterFailedAcquire(Node, Node)
此方法主要用于检查状态,看看自己是否真的可以去休息了,万一队列前边的线程都放弃了只是瞎站着。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//拿到前驱的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
return true;
if (ws > 0) {
/*
* 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
* 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被(GC回收)!
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,前驱可能刚刚释放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。
parkAndCheckInterrupt
如果线程找好安全休息点后,那就可以安心中断了。此方法就是让线程去中断,真正进入等待状态。
private final boolean parkAndCheckInterrupt() {
//调用park使线程进入waiting状态
LockSupport.park(this);
//如果被唤醒,查看自己是不是被中断的。
return Thread.interrupted();
}
park会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark;2)被interrupt。需要注意的是,Thread.interrupted
会清除当前线程的中断标记位。
3、解锁release(int)
此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock的语义,当然不仅仅只限于unlock。下面是release的源码:
public class ReentrantLock implements Lock, java.io.Serializable {
...
public void unlock() {
sync.release(1);
}
...
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒等待队列里的下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 该方法具体实现取决于实现类
* 参数arg:这个值被传递给tryRelease,但是没有被解释,可以表示任何你喜欢的东西。
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
}
逻辑并不复杂。它调用tryRelease来释放资源。有一点需要注意的是,它是根据tryRelease的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease的时候要明确这一点!!
(1)tryRelease(int)
此方法尝试去释放指定量的资源。
public class ReentrantLock implements Lock, java.io.Serializable {
...
abstract static class Sync extends AbstractQueuedSynchronizer {
...
protected final boolean tryRelease(int releases) {
//getState()=0时,即为未加锁
int c = getState() - releases;
//如果当前线程不为独占模式的持有者,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 设置当前拥有独占访问权的线程。空参数表示没有线程拥有访问权限。
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
}
跟 tryAcquire
一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease
都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg
),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release是根据 tryRelease
的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0
),要返回true,否则返回false。
(2)unparkSuccessor(Node)
此方法用于唤醒等待队列中下一个线程。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//这里,node一般为当前线程所在的结点。
private void unparkSuccessor(Node node) {
/*
* 如果状态为负(有效节点)(即可能需要信号),尝试清除信号。
* 如果失败了,或者状态被等待线程改变了,这是可以的。
*/
int ws = node.waitStatus;
if (ws < 0)
//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
/*
* 要取消驻留的线程保存在后继节点中,后继节点通常是下一个节点。
* 但如果被取消或明显为空,则从tail向后遍历以找到实际未被取消的后继对象。
*/
//找到下一个需要唤醒的结点s
Node s = node.next;
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//从这里可以看出,<=0的结点,都是还有效的结点。
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
}
这个函数并不复杂。一句话概括:用unpark唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire也返回了!