
一道题理清 JUC 底层原理
前置知识:
理解多线程/并发/锁的知识,但是对底层原理不理解的。
Java 内存模型(堆/栈模型)
synchronized
本文主要探讨:
volatile
/ CAS /AtomicInteger
LockSupport
/ AQS
0. LeetCode1114 按序打印
https://leetcode.cn/problems/print-in-order/
给你一个类:
public class Foo {
public void first() { print("first"); }
public void second() { print("second"); }
public void third() { print("third"); }
}
三个不同的线程 A、B、C 将会共用一个 Foo
实例。
线程 A 将会调用
first()
方法线程 B 将会调用
second()
方法线程 C 将会调用
third()
方法
请设计修改程序,以确保 second()
方法在 first()
方法之后被执行,third()
方法在 second()
方法之后被执行。
提示:
尽管输入中的数字似乎暗示了顺序,但是我们并不保证线程在操作系统中的调度顺序。
你看到的输入格式主要是为了确保测试的全面性。
示例 1:
输入:nums = [1,2,3]
输出:"firstsecondthird"
解释:
有三个线程会被异步启动。输入 [1,2,3] 表示线程 A 将会调用 first() 方法,线程 B 将会调用 second() 方法,线程 C 将会调用 third() 方法。正确的输出是 "firstsecondthird"。
示例 2:
输入:nums = [1,3,2]
输出:"firstsecondthird"
解释:
输入 [1,3,2] 表示线程 A 将会调用 first() 方法,线程 B 将会调用 third() 方法,线程 C 将会调用 second() 方法。正确的输出是 "firstsecondthird"。
提示:
nums
是[1, 2, 3]
的一组排列
class Foo {
public Foo() {
}
public void first(Runnable printFirst) throws InterruptedException {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
}
public void second(Runnable printSecond) throws InterruptedException {
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
}
public void third(Runnable printThird) throws InterruptedException {
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
1. 从内存可见性入手
1.1 初探想法
题目这个 Foo
类有三个方法,主程序会开三个线程分别执行这三个方法。
好,那我们就在主程序里直接让这三个方法同步执行不就好了?
thread1.start();
thread1.join();
thread2.start();
thread2.join();
thread3.start();
thread3.join();
然后一下笔…… 不对啊,这里也没有主方法啊?
再定睛一看,好家伙,原来是题目里已经把主程序设定好三个线程会被异步启动了。
主方法类似这样:
public class Main {
public static void main(String[] args) {
// 创建 Foo 的实例
Foo foo = new Foo();
// 创建三个线程
Thread thread1 = new Thread(() -> foo.first(() -> System.out.print("first")));
Thread thread2 = new Thread(() -> foo.second(() -> System.out.print("second")));
Thread thread3 = new Thread(() -> foo.third(() -> System.out.print("third")));
// 启动线程
thread1.start();
thread2.start();
thread3.start();
// 等待线程完成执行
try {
thread1.join();
thread2.join();
thread3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
也就是说,这三个方法一定是被同时执行的。
那怎么办呢?
我们可以设置一个共享变量 flag, 类似于一个“开关”,值为0时执行first,值为1时执行second,值为2时执行third。
这样,我们先让flag=0,first执行完时设置为1去执行second,second执行完时设置为2去执行third;
而在flag值没有匹配时,使用while死循环把线程拦住不要继续执行:
class Foo {
private int flag = 0;
public void first(Runnable printFirst) throws InterruptedException {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
flag++;
}
public void second(Runnable printSecond) throws InterruptedException {
while(flag != 1) {}
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
flag++;
}
public void third(Runnable printThird) throws InterruptedException {
while (flag != 2) {}
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
IDEA一运行,没问题,保存到LeetCode提交……哎,超时了??
1.2 堆中的内存不可见问题
为啥会超时呢?
因为flag值被first方法修改为1之后,second值并没有看到flag值被修改为1,如果我们能从控制台打印,它依然是0;
哎奇怪了,堆的内存对所有线程应该是共享的啊,为什么会有线程不可见的问题呢?
这是因为现代计算机为了高效,往往会在高速缓存区中缓存共享变量,因为cpu访问缓存区比访问内存要快得多。
线程之间的共享变量存在主内存中,每个线程都有一个私有的本地内存,存储了该线程以读、写共享变量的副本。本地内存是Java内存模型的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器等。
从图中可以看出: 1. 所有的共享变量都存在主内存中。 2. 每个线程都保存了一份该线程使用到的共享变量的副本。 3. 如果线程A与线程B之间要通信的话,必须经历下面2个步骤: 1. 线程A将本地内存A中更新过的共享变量刷新到主内存中去。 2. 线程B到主内存中去读取线程A之前已经更新过的共享变量。
所以,线程A无法直接访问线程B的工作内存,线程间通信必须经过主内存。
注意,根据JMM的规定,线程对共享变量的所有操作都必须在自己的本地内存中进行,不能直接从主内存中读取。
所以线程B并不是直接去主内存中读取共享变量的值,而是先在本地内存B中找到这个共享变量,发现这个共享变量已经被更新了,然后本地内存B去主内存中读取这个共享变量的新值,并拷贝到本地内存B中,最后线程B再读取本地内存B中的新值。
那么怎么知道这个共享变量的被其他线程更新了呢?这就是JMM的功劳了,也是JMM存在的必要性之一。JMM通过控制主内存与每个线程的本地内存之间的交互,来提供内存可见性保证。
而上述题目超时的原因就是,【这个共享变量被其它线程更新】 和 【线程A知道这个共享变量被其他线程更新】,中间会有一段时间,正是这一段时间会可能让程序超时。
1.3 volatile
那么问题的解决办法就很清晰了,就是让【这个共享变量被更新】时,立刻让【所有线程知道这个共享变量被更新】。
volatile
就是用来做这个用的。
private volatile int flag = 0;
当一个线程对volatile
修饰的变量进行写操作时,JMM会立即把该线程对应的本地内存中的共享变量的值刷新到主内存;当一个线程对volatile
修饰的变量进行读操作时,JMM会把立即该线程对应的本地内存置为无效,从主内存中读取共享变量的值。
我们把共享变量加上这个关键字,在题目里运行:
class Foo {
private volatile int flag = 0;
public void first(Runnable printFirst) throws InterruptedException {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
flag++;
}
public void second(Runnable printSecond) throws InterruptedException {
while(flag != 1) {}
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
flag++;
}
public void third(Runnable printThird) throws InterruptedException {
while (flag != 2) {}
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
1.4 拓展:synchronized 的可见性
我们也可以用之前学过的 synchronized 解决这个问题。
我们之前知道synchronized
是有原子性的,多个线程同时执行这个同步代码块,只有一个线程能够持有该同步块的锁,因此它对共享变量的操作是原子的,其他线程必须等到当前线程释放锁才能继续执行。
除此之外,其实 synchronized
也有可以保证可见性的机制。
当你使用 synchronized
来同步代码块时,Java 会确保进入和退出同步代码块的线程之间的内存可见性。也就是说:
当一个线程进入一个同步块并对共享变量进行写操作时,这个修改会被刷新到主内存中。
当另一个线程进入同一个同步块并读取共享变量时,它将看到同步块中所有变量的最新值。
因此,即使这个共享变量没有声明为 volatile
,在 synchronized
语句块内进行的读写操作也能够保证该变量的可见性。这是因为 synchronized
会确保在进入同步块之前,线程会从主内存中加载数据,并且在退出同步块时,线程会将数据刷新回主内存。
(你同步执行这块代码肯定要保证这里的共享变量都得读主存里的)
于是我尝试了给每个方法都加上 synchronized
class FooS {
private final Object lock = new Object();
private int flag = 0;
public void first(Runnable printFirst) throws InterruptedException {
synchronized (lock) {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
flag++;
}
}
public void second(Runnable printSecond) throws InterruptedException {
synchronized (lock) {
while (flag != 1) {
}
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
flag++;
}
}
public void third(Runnable printThird) throws InterruptedException {
synchronized (lock) {
while (flag != 2) {
}
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
}
一运行,又超时了?
这是因为它的同步机制会让这三个方法只能有一个竞争到锁,而非确定是第一个先竞争到,如果第二个或第三个先竞争到了,因为flag的值不对,会卡死在死循环。
我们可以用 wait()
和 notifyAll()
这些机制进行锁的释放和重新竞争:
class Foo {
private final Object lock = new Object();
private int flag = 0;
public void first(Runnable printFirst) throws InterruptedException {
synchronized (lock) {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
flag++;
lock.notifyAll();
}
}
public void second(Runnable printSecond) throws InterruptedException {
synchronized (lock) {
while (flag != 1) {
lock.wait();
}
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
flag++;
lock.notifyAll();
}
}
public void third(Runnable printThird) throws InterruptedException {
synchronized (lock) {
while (flag != 2) {
lock.wait();
}
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
}
// 这里为什么必须是 notifyAll 呢?
// 线程顺序问题:
//
//比如 first() 执行后,通过 notify() 唤醒一个线程,假设唤醒的是 third(),而 second() 仍然在等待 firstFinished == true。这种情况下,third() 线程会被唤醒,但它会立刻重新进入 wait(),直到条件满足,这就浪费了 notify() 的唤醒机会,导致其他线程的唤醒时机可能不符合预期。
//死锁风险:
//
//如果线程被唤醒时,它发现自己的条件没有满足(例如 firstFinished == false 或 secondFinished == false),它会再次进入等待。这样就可能发生一个线程在错误的顺序上被唤醒并进入阻塞状态,导致其他线程无法继续执行。
所以我们可以用 synchronized 机制解决这样的问题。
好的,这道题我们就可以先解决到这里了,但是关于JUC,还有更多的事情可以做……
2. CAS 与原子类
2.1 volatile 的缺陷
我们换一种执行方式,如果我们在 main
中换一种执行方法:三个异步线程,两个线程执行 first,另外一个线程执行 second:
public class Main {
public static void main(String[] args) {
// 同步执行20w次
for (int i = 0; i < 200000; i++) {
// 创建 Foo 的实例
FooV foo = new FooV();
Thread thread1 = new Thread(() -> {
foo.first(() -> {});
});
Thread thread2 = new Thread(() -> {
foo.first(() -> {});
});
// 启动线程
thread1.start();
thread2.start();
// 等待线程完成执行
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if(foo.flag != 2){
System.out.println("ohNo! flag = " + foo.flag);
}
}
}
}
考虑上述 volatile 的实现(flag暂时变成public):
class Foo {
public volatile int flag = 0;
public void first(Runnable printFirst) {
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
flag++;
}
}
我们预期的结果是,first-1 会把 flag++,first-2 再把 flag++,此时flag应该会从0加到2,所以那一句“ohNo”肯定不会输出。
然而,在这20w运行的次数中,我们惊讶的发现,控制台输出了3次“ohNo”:
ohNo! flag = 1
ohNo! flag = 1
ohNo! flag = 1
Process finished with exit code 0
明明两个线程对flag加了两次,为什么最后结果还是1呢?volatile
关键字在 Java 中确保了 内存可见性,即当一个线程修改了 volatile
变量的值,其他线程能够立即看到这个修改。然而,volatile
并不能保证 原子性,也就是说,它不能保证一个线程在修改 flag
时的操作是不可分割的(原子操作)。
在你的代码中,每个线程都在调用 first()
方法并执行 flag++
,这个 flag++
操作并不是原子性的。flag++
实际上是三个步骤:
读取
flag
的当前值。修改
flag
的值。写入 更新后的值到内存。
由于 flag
是共享变量,且没有同步机制来保证这些步骤的原子性,所以多个线程可能在相同时间读取相同的 flag
值,并且在没有互斥锁的情况下同时进行修改。这样就会发生竞态条件。
举个例子:
线程 1 读取
flag
的值为 0。线程 2 也读取
flag
的值为 0。线程 1 增加
flag
,写回 1。线程 2 也增加
flag
,写回 1。
最后,flag
只增加了 1,而不是 2。这是因为两个线程的操作并没有互斥地进行,它们都看到 flag
为 0,然后分别写回了值 1,导致第二次修改的值覆盖了第一次修改的值。
解决它很简单,加 synchronized
就行了:
class Foo {
public volatile int flag = 0;
public synchronized void first(Runnable printFirst) {
printFirst.run();
flag++;
}
}
不过这次我们要引用一点更轻量级一点的方式。
2.2 CAS
看下面这个实现:
import sun.misc.Unsafe;
import java.lang.reflect.Field;
class Foo {
public volatile int flag = 0; // 需要进行 CAS 操作的变量
// 获取 Unsafe 实例
private static final Unsafe UNSAFE;
private static final long FLAG_OFFSET;
static {
try {
// 获取 Unsafe 实例
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
UNSAFE = (Unsafe) field.get(null);
// 获取 flag 字段的偏移量
FLAG_OFFSET = UNSAFE.objectFieldOffset(Foo.class.getDeclaredField("flag"));
} catch (Exception e) {
throw new Error(e);
}
}
// 使用 CAS 方法实现 flag++
public void first(Runnable printFirst) {
printFirst.run();
// CAS 操作
int oldFlag;
do {
oldFlag = flag;
} while (!UNSAFE.compareAndSwapInt(this, FLAG_OFFSET, oldFlag, oldFlag + 1));
}
}
这……这他妈是啥?别急,让我们来看一看。
让我们注意这个地方:
// 原方法
public void first(Runnable printFirst) {
printFirst.run();
flag++;
}
// 使用 CAS 方法实现 flag++
public void first(Runnable printFirst) {
printFirst.run();
// CAS 操作
int oldFlag;
do {
oldFlag = flag;
} while (!UNSAFE.compareAndSwapInt(this, FLAG_OFFSET, oldFlag, oldFlag + 1));
}
flag++ 怎会变成如此这样?这就得从CAS这个操作(就是上面的 compareAndSwapInt
)解释起:
2.2.1 CAS 的概念
CAS的全称是:比较并交换(Compare And Swap)。在CAS中,有这样三个值:
V:要更新的变量(var)
E:预期值(expected)
N:新值(new)
比较并交换的过程如下:
判断V是否等于E,如果等于,将V的值设置为N;如果不等,说明已经有其它线程更新了V,则当前线程放弃更新,什么都不做。
所以这里的预期值E本质上指的是“旧值”。
我们以一个简单的例子来解释这个过程:
如果有一个多个线程共享的变量
i
原本等于5,我现在在线程A中,想把它设置为新的值6;我们使用CAS来做这个事情;
首先我们用i去与5对比,发现它等于5,说明没有被其它线程改过,那我就把它设置为新的值6,此次CAS成功,
i
的值被设置成了6;如果不等于5,说明
i
被其它线程改过了(比如现在i
的值为2),那么我就什么也不做,此次CAS失败,i
的值仍然为2。
在这个例子中,i
就是V,5就是E,6就是N。
那有没有可能我在判断了i
为5之后,正准备更新它的新值的时候,被其它线程更改了i
的值呢?
不会的。因为CAS是一种原子操作,它是一种系统原语,是一条CPU的原子指令,从CPU层面保证它的原子性
当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。
2.2.2 Unsafe 类
前面提到,CAS是一种原子操作。那么Java是怎样来使用CAS的呢?我们知道,在Java中,如果一个方法是native的,那Java就不负责具体实现它,而是交给底层的JVM使用c或者c++去实现。
在Java中,有一个Unsafe
类,它在sun.misc
包中。它里面是一些native
方法,其中就有几个关于CAS的:
boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);
当然,他们都是public native
的。
Unsafe 中对 CAS 的实现是C++写的,它的具体实现和操作系统、CPU都有关系。
Linux的X86下主要是通过cmpxchgl
这个指令在CPU级完成CAS操作的,但在多处理器情况下必须使用lock
指令加锁来完成。当然不同的操作系统和处理器的实现会有所不同,大家可以自行了解。
当然,Unsafe类里面还有其它方法用于不同的用途。比如支持线程挂起和恢复的park
和unpark
, LockSupport类底层就是调用了这两个方法。还有支持反射操作的allocateInstance()
方法。
2.2.3 源码解析
// 使用 CAS 方法实现 flag++
public void first(Runnable printFirst) {
printFirst.run();
// CAS 操作
int oldFlag;
do {
oldFlag = flag;
} while (!UNSAFE.compareAndSwapInt(this, FLAG_OFFSET, oldFlag, oldFlag + 1));
}
我们来一步步解析这段源码。
对象 o
首先,对象o
是this
,也就是一个Foo
对象。
偏移量 offset
然后offset
是一个常量VALUE
。这个常量是在static
块中声明的:
// 获取 flag 字段的偏移量
FLAG_OFFSET = UNSAFE.objectFieldOffset(Foo.class.getDeclaredField("flag"));
同样是调用的Unsafe
的方法。从方法名字上来看,是得到了一个对象字段偏移量。
用于获取某个字段相对Java对象的“起始地址”的偏移量。
一个java对象可以看成是一段内存,各个字段都得按照一定的顺序放在这段内存里,同时考虑到对齐要求,可能这些字段不是连续放置的,
用这个方法能准确地告诉你某个字段相对于对象的起始内存地址的字节偏移量,因为是相对偏移量,所以它其实跟某个具体对象又没什么太大关系,跟class的定义和虚拟机的内存模型的实现细节更相关。
expected 和 x
继续看源码。前面我们讲到,CAS是“无锁”的基础,它允许更新失败。所以经常会与while循环搭配,在失败后不断去重试。
这里声明了一个 oldFlag ,被赋为原来的值,作为 expected
传入CAS,而新的值是oldFlag + 1
,作为 x
传入CAS;
循环(也叫自旋)
这里使用的是do-while循环。这种循环不多见,它的目的是保证循环体内的语句至少会被执行一遍。
再回到循环条件上来,可以看到它是在不断尝试去用CAS更新。如果更新失败,就继续重试。
那为什么要把获取“旧值”v的操作放到循环体内呢?其实这也很好理解。前面我们说了,CAS如果旧值V不等于预期值E,它就会更新失败。说明旧的值发生了变化。那我们当然需要返回的是被其他线程改变之后的旧值了,因此放在了do循环体内。
我们再执行10w甚至60w,甚至变成三个线程,就都不会出问题了。
public class Main {
public static void main(String[] args) {
boolean f = false;
// 创建 Foo 的实例
for (int i = 0; i < 600000; i++) {
FooV foo = new FooV();
Thread thread1 = new Thread(() -> {
foo.first(() -> {});
});
Thread thread2 = new Thread(() -> {
foo.first(() -> {});
});
Thread thread3 = new Thread(() -> {
foo.first(() -> {});
});
// 启动线程
thread1.start();
thread2.start();
thread3.start();
// 等待线程完成执行
try {
thread1.join();
thread2.join();
thread3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if(foo.flag != 3){
System.out.println("ohNo! flag = " + foo.flag);
f = true;
}
}
System.out.println("是否发生了意外情况 = " + f);
}
}
是否发生了意外情况 = false
Process finished with exit code 0
2.2.4 拓展:乐观锁悲观锁
像这样,类似 CAS 的“无锁”机制,其实是一种乐观锁。
悲观锁阻塞,乐观锁自旋。 —— 刘硕
悲观锁举例(后面会着重实现):
public class PessimisticLockExample {
private final ReentrantLock lock = new ReentrantLock();
public void doSomething() {
lock.lock(); // 获取锁
try {
// 临界区代码
} finally {
lock.unlock(); // 释放锁
}
}
}
乐观锁举例:
public class OptimisticLockExample {
private final AtomicInteger value = new AtomicInteger(0);
public void increment() {
int oldValue;
int newValue;
do {
oldValue = value.get(); // 读取当前值
newValue = oldValue + 1; // 计算新值
} while (!value.compareAndSet(oldValue, newValue)); // CAS 更新
}
}
2.3 原子类
可是……假如我又要可见性,又要乐观锁,我自己就要使用 Unsafe
类手搓这么一堆东西吗?
别急,Java 提供了一些打包好的原子类,在java.util.concurrent.atomic
包下面。在JDK 11中,有如下17个类:
其中本题可以使用的 AtomicInteger
的源码和上述十分类似,可以看这篇文章:https://redspider.gitbook.io/concurrent/di-er-pian-yuan-li-pian/10#id-10.4-yuan-zi-cao-zuo-atomicinteger-lei-yuan-ma-jian-xi
import java.util.concurrent.atomic.AtomicInteger;
class Foo {
public AtomicInteger flag = new AtomicInteger(0);
public void first(Runnable printFirst) {
printFirst.run();
flag.incrementAndGet(); // 原子地增加 flag
}
}
public class Main {
public static void main(String[] args) {
boolean f = false;
// 创建 Foo 的实例
for (int i = 0; i < 600000; i++) {
Foo foo = new Foo();
Thread thread1 = new Thread(() -> {
foo.first(() -> {});
});
Thread thread2 = new Thread(() -> {
foo.first(() -> {});
});
Thread thread3 = new Thread(() -> {
foo.first(() -> {});
});
// 启动线程
thread1.start();
thread2.start();
thread3.start();
// 等待线程完成执行
try {
thread1.join();
thread2.join();
thread3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if(foo.flag.get() != 3){
System.out.println("ohNo! flag = " + foo.flag.get());
f = true;
}
}
System.out.println("是否发生了意外情况 = " + f);
}
}
LeetCode1114原题的官方题解给出的就是用原子类方法:
class Foo {
private AtomicInteger firstJobDone = new AtomicInteger(0);
private AtomicInteger secondJobDone = new AtomicInteger(0);
public Foo() {}
public void first(Runnable printFirst) throws InterruptedException {
// printFirst.run() outputs "first".
printFirst.run();
// mark the first job as done, by increasing its count.
firstJobDone.incrementAndGet();
}
public void second(Runnable printSecond) throws InterruptedException {
while (firstJobDone.get() != 1) {
// waiting for the first job to be done.
}
// printSecond.run() outputs "second".
printSecond.run();
// mark the second as done, by increasing its count.
secondJobDone.incrementAndGet();
}
public void third(Runnable printThird) throws InterruptedException {
while (secondJobDone.get() != 1) {
// waiting for the second job to be done.
}
// printThird.run() outputs "third".
printThird.run();
}
}
3. LockSupport 阻塞机制
刚才我们用 CAS 实现了乐观锁自旋,而等待线程变多时,每个线程不断获取锁会使得 CPU 消耗过高。
这时候我们就需要用阻塞机制,对应的就是 LockSupport 工具类,主要用于线程的阻塞和唤醒操作。它提供了 park()
和 unpark()
两个核心方法,分别用于阻塞当前线程和唤醒指定线程。(我们暂时不考虑 synchronized)
3.1 LockSupport 核心功能
LockSupport 可以基于线程对象来对线程进行管理。
线程阻塞与唤醒:
park()
:阻塞当前线程,直到该线程被unpark()
唤醒或线程被中断。unpark(Thread thread)
:唤醒指定的线程。如果该线程已经被阻塞,则解除阻塞;如果该线程尚未被阻塞,则下次调用park()
时不会阻塞。
许可机制:
每个线程都与一个许可(permit)关联,初始值为 0。调用
unpark()
会将许可设置为 1,而park()
会检查许可是否为 1,如果是则立即返回,否则阻塞线程。许可的语义是“是否持有许可”,0 表示否,1 表示是。
LockSupport 的设计基于许可证机制,类似于高速公路的通行卡。线程在调用 park()
时,如果许可为 0,则阻塞;调用 unpark()
时,许可被设置为 1,唤醒线程。这种机制使得 unpark()
可以在 park()
之前调用,避免了传统 wait()
和 notify()
的顺序限制。
import java.util.concurrent.locks.LockSupport;
public class LockSupportExample {
public static void main(String[] args) {
Thread thread = new Thread(() -> {
System.out.println("Thread is running...");
LockSupport.park(); // 阻塞当前线程
System.out.println("Thread is resumed.");
});
thread.start();
try {
Thread.sleep(2000); // 主线程等待2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(thread); // 唤醒被阻塞的线程
}
}
LeetCode 1114 也可以使用 LockSupport 控制线程来解决:
class Foo {
private volatile Thread thread1, thread2, thread3; // 不加volatile线程读取不到其他线程的写修改导致超时
public Foo() {
thread1 = null;
thread2 = null;
thread3 = null;
}
public void first(Runnable printFirst) throws InterruptedException {
thread1 = Thread.currentThread();
// printFirst.run() outputs "first". Do not change or remove this line.
printFirst.run();
// 先实例化,再 unpark
while (thread2 == null){}
LockSupport.unpark(thread2);
}
public void second(Runnable printSecond) throws InterruptedException {
thread2 = Thread.currentThread();
LockSupport.park();
// printSecond.run() outputs "second". Do not change or remove this line.
printSecond.run();
while (thread3 == null){}
LockSupport.unpark(thread3);
}
public void third(Runnable printThird) throws InterruptedException {
thread3 = Thread.currentThread();
LockSupport.park();
// printThird.run() outputs "third". Do not change or remove this line.
printThird.run();
}
}
3.2 悲观锁的实现
我们再次回到 2.1 部分的代码:
public class Main {
public static void main(String[] args) {
boolean f = false;
int n = 60;
for (int i = 0; i < 60000; i++) {
// 创建 Foo 的实例
FooLockSupport foo = new FooLockSupport();
List<Thread> threads = new ArrayList<>();
for (int j = 0; j < n; j++) {
threads.add(new Thread(foo::first));
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if(foo.getCount() != n){
System.out.println("ohNo! count = " + foo.getCount());
f = true;
}
}
System.out.println("是否发生了意外情况 = " + f);
}
}
class FooLockSupport {
private volatile int count = 0;
public int getCount(){
return count;
}
public void first() {
LockSupport.park();
count++;
// 当前线程执行完 count++ 了,如何唤醒其他线程?
}
}
此时,三个线程需要原子性地对 count 进行自增操作,而 LockSupport 可以对没有获取到锁的线程进行阻塞,从而实现悲观锁。
我们可以在执行 count++ 之前对当前线程进行阻塞,执行完 count++ 后,再唤醒其他线程。这种方式有两个问题:
如果没有线程竞争,会被阻塞
当前线程执行完 count++ 了,如何唤醒其他线程?
class FooLockSupport {
private volatile int count = 0;
public int getCount(){
return count;
}
public void first() {
// 如果没有线程竞争,不能被阻塞
LockSupport.park();
count++;
// 当前线程执行完 count++ 了,如何唤醒其他线程?
}
}
针对第一个问题,我们需要多引入一个 AtomicBoolean 变量充当“锁”,线程需要通过一次 CAS 操作尝试修改变量,若第一次成功,就直接执行count++操作,没有成功就直接阻塞:
class FooLockSupport {
private volatile int count = 0;
// 引入一个 AtomicBoolean 变量充当“锁”
private final AtomicBoolean locked = new AtomicBoolean(false);
public int getCount(){
return count;
}
public void first() {
// 线程需要通过一次 CAS 操作尝试获得修改变量
if (!locked.compareAndSet(false, true)) {
LockSupport.park();
}
count++;
// 将变量修改回去
locked.compareAndSet(true, false);
// 当前线程执行完 count++ 了,如何唤醒其他线程?
}
}
针对第二个问题就会更复杂,我们需要用一个数据结构来存储正在阻塞的线程对象。
class FooLockSupport {
private volatile int count = 0;
public int getCount(){
return count;
}
// 引入一个 AtomicBoolean 变量充当“锁”
private final AtomicBoolean locked = new AtomicBoolean(false);
// 使用一个数据结构暂存正在阻塞的线程对象(该数据结构仅为示例,会出现并发安全问题)
private final Deque<Thread> blockedThreads = new ArrayDeque<>();
public void first() {
if (!locked.compareAndSet(false, true)) {
blockedThreads.add(Thread.currentThread());
LockSupport.park();
}
count++;
locked.compareAndSet(true, false);
// 执行完相关逻辑后,从这个数据结构中取出来线程,唤醒这个线程
LockSupport.unpark(blockedThreads.pollFirst());
}
}
这个数据结构面临以下问题:
使用一个什么样的数据结构?
双向链表,有一个 dummy 头节点。
释放锁时,需要释放几个线程?
释放一个线程。如果释放所有线程,会依然出现线程竞争,加大 CPU 使用率。
释放锁时,释放哪一个线程?
可以释放最先加入的线程,我们管这样的锁叫公平锁。反之,就是非公平锁(抢占式)。
引用的变化也是一个线程不安全的操作,如何实现线程安全的引用变化?
使用
AtomicReference
原子类。
首先我们定义一个 Node 类(每个属性应该加上 volatile,这里简化了):
class Node {
Node next;
Node prev;
Thread thread;
}
// head 节点是 dummy 节点
private final AtomicReference<Node> head = new AtomicReference<>(new Node());
// tail 节点是最新加入的节点
private final AtomicReference<Node> tail = new AtomicReference<>(head.get());
针对没有拿到锁,被阻塞的线程,需要 CAS 自旋加入到这个链表当中:
if (!locked.compareAndSet(false, true)) {
// current 当前线程要加入的节点
Node current = new Node();
current.thread = Thread.currentThread();
while(true) {
// 当前线程每次 cas 时,都需要拿到最新的尾节点
Node tailCurrent = tail.get();
if (tail.compareAndSet(tailCurrent, current)) {
System.out.println(Thread.currentThread().getName() + "加入到了链表尾");
current.prev = tailCurrent;
tailCurrent.next = current;
break;
}
}
// 然后阻塞
LockSupport.park();
}
// 阻塞结束,执行业务逻辑
count++;
业务逻辑执行完后,我们需要把最先加入的节点取出,唤醒这个线程:
count++;
// 执行完相关逻辑后,从这个数据结构中取出来一个线程
Node headNode = head.get();
Node next = headNode.next;
// 先把 locked 的原子变量设为 false
// 执行到这段代码时一定是单线程的,所以不用cas
locked.set(false);
if (next!= null) {
// 把最先加入的节点取出,唤醒这个线程
System.out.println(Thread.currentThread().getName() + "唤醒了" + next.thread);
LockSupport.unpark(next.thread);
}
线程唤醒了,但是这个线程Node还没有从数据结构里取出,所以应该在LockSupport.park()下面再进行补充:
if (!locked.compareAndSet(false, true)) {
// current 当前线程要加入的节点
Node current = new Node();
current.thread = Thread.currentThread();
while(true) {
// 当前线程每次 cas 时,都需要拿到最新的尾节点
Node tailCurrent = tail.get();
if (tail.compareAndSet(tailCurrent, current)) {
System.out.println(Thread.currentThread().getName() + "加入到了链表尾");
current.prev = tailCurrent;
tailCurrent.next = current;
break;
}
}
// 然后阻塞
LockSupport.park();
// 阻塞结束,把当前线程 Node 从数据结构中取出,并且自己也要 cas 到 lock 变量
if (current.prev == head.get() && locked.compareAndSet(false, true)) {
head.set(current);
current.prev.next = null;
current.prev = null;
break;
}
}
// 阻塞结束,执行业务逻辑
count++;
如果当前线程阻塞后,再也没有其他线程进行唤醒,那么当前线程就会永远阻塞在这里了,所以我们可以继续优化,先让线程自己尝试唤醒自己一次,如果自己拿到了锁,那就直接不阻塞了:
if (!locked.compareAndSet(false, true)) {
Node current = new Node();
current.thread = Thread.currentThread();
while(true) {
Node tailCurrent = tail.get();
if (tail.compareAndSet(tailCurrent, current)) {
System.out.println(Thread.currentThread().getName() + "加入到了链表尾");
current.prev = tailCurrent;
tailCurrent.next = current;
break;
}
}
while (true) {
if (current.prev == head.get() && locked.compareAndSet(false, true)) {
System.out.println(Thread.currentThread().getName() + "唤醒了自己" + current);
head.set(current);
current.prev.next = null;
current.prev = null;
break;
}
LockSupport.park();
}
}
完整代码如下:
class FooLockSupport {
private volatile int count = 0;
public int getCount(){
return count;
}
// 引入一个 AtomicBoolean 变量充当“锁”
private final AtomicBoolean locked = new AtomicBoolean(false);
// 使用一个数据结构暂存正在阻塞的线程对象
private final AtomicReference<Node> head = new AtomicReference<>(new Node());
private final AtomicReference<Node> tail = new AtomicReference<>(head.get());
class Node {
Node next;
Node prev;
Thread thread;
}
public void first() {
if (!locked.compareAndSet(false, true)) {
Node current = new Node();
current.thread = Thread.currentThread();
while(true) {
Node tailCurrent = tail.get();
if (tail.compareAndSet(tailCurrent, current)) {
System.out.println(Thread.currentThread().getName() + "加入到了链表尾");
current.prev = tailCurrent;
tailCurrent.next = current;
break;
}
}
while (true) {
if (current.prev == head.get() && locked.compareAndSet(false, true)) {
System.out.println(Thread.currentThread().getName() + "唤醒了自己" + current);
head.set(current);
current.prev.next = null;
current.prev = null;
break;
}
LockSupport.park();
}
}
// 执行业务逻辑
count++;
// 执行完相关逻辑后,从这个数据结构中取出来一个线程
Node headNode = head.get();
Node next = headNode.next;
locked.set(false);
if (next!= null) {
System.out.println(Thread.currentThread().getName() + "唤醒了" + next.thread);
LockSupport.unpark(next.thread);
}
}
}
这种实现中,有可能会出现后到的线程先拿到锁的情况,因此我们可以再拓展下公平锁的实现。
3.3 公平锁的实现
要想实现公平锁很简单,我们把所有的线程都先放进等待链表中,然后按照链表的顺序拿出来输出,就达到了先来先处理。
代码层面删除一开始的if条件即可。
public void first() {
// 公平锁的实现
// if (!locked.compareAndSet(false, true)) {
Node current = new Node();
current.thread = Thread.currentThread();
while(true) {
Node tailCurrent = tail.get();
if (tail.compareAndSet(tailCurrent, current)) {
System.out.println(Thread.currentThread().getName() + "加入到了链表尾");
current.prev = tailCurrent;
tailCurrent.next = current;
break;
}
}
while (true) {
if (current.prev == head.get() && locked.compareAndSet(false, true)) {
System.out.println(Thread.currentThread().getName() + "唤醒了自己" + current);
head.set(current);
current.prev.next = null;
current.prev = null;
break;
}
LockSupport.park();
}
// }
count++;
// 执行完相关逻辑后,从这个数据结构中取出来一个线程
Node headNode = head.get();
Node next = headNode.next;
locked.set(false);
if (next!= null) {
System.out.println(Thread.currentThread().getName() + "唤醒了" + next.thread);
LockSupport.unpark(next.thread);
}
}
3.4 可重入锁的实现
class FooLockSupport {
private volatile int count = 0;
private final AtomicBoolean locked = new AtomicBoolean(false);
private final AtomicReference<Node> head = new AtomicReference<>(new Node());
private final AtomicReference<Node> tail = new AtomicReference<>(head.get());
private volatile Thread owner = null; // 记录锁的持有者
private volatile int holdCount = 0; // 记录重入次数
public int getCount() {
return count;
}
public void first() {
Thread currentThread = Thread.currentThread();
// 如果当前线程是锁的持有者,则重入
if (owner == currentThread) {
holdCount++;
count++;
return;
}
// 尝试获取锁
if (!locked.compareAndSet(false, true)) {
// 锁已被其他线程持有,加入等待队列
Node current = new Node();
current.thread = currentThread;
while (true) {
Node tailCurrent = tail.get();
if (tail.compareAndSet(tailCurrent, current)) {
current.prev = tailCurrent;
tailCurrent.next = current;
break;
}
}
// 等待被唤醒
while (true) {
if (current.prev == head.get() && locked.compareAndSet(false, true)) {
head.set(current);
current.prev.next = null;
current.prev = null;
break;
}
LockSupport.park();
}
}
// 获取锁成功,设置持有者和重入次数
owner = currentThread;
holdCount = 1;
count++;
// 释放锁
releaseLock();
}
private void releaseLock() {
Thread currentThread = Thread.currentThread();
if (owner != currentThread) {
throw new IllegalMonitorStateException("当前线程未持有锁");
}
holdCount--;
if (holdCount == 0) {
// 完全释放锁
owner = null;
locked.set(false);
// 唤醒下一个等待线程
Node headNode = head.get();
Node next = headNode.next;
if (next != null) {
LockSupport.unpark(next.thread);
}
}
}
class Node {
Node next;
Node prev;
Thread thread;
}
}
4. 拓展:AQS
刚才用 LockSupport + 队列实现了一个锁,那有没有被封装好的数据结构呢?这就是 AbstractQueuedSynchronizer(AQS) —— 抽象队列同步器。
抽象:抽象类,只实现一些主要逻辑,有些 protected 方法由子类实现;
队列:使用先进先出(FIFO)队列存储数据;
同步:实现了同步的功能。
许多同步类实现都依赖于它,如常用的 ReentrantLock/Semaphore/CountDownLatch...
另外,关于 AQS 为什么使用抽象类而不是接口:
需要共享状态:
AQS 的核心是state
变量,这是一个共享状态,抽象类可以定义成员变量,而接口不能。提供默认实现:
AQS 提供了队列管理、线程阻塞等通用逻辑,这些逻辑对所有同步器都是通用的,抽象类可以定义具体方法,而接口在 Java 8 之前不能。单继承:
AQS 的设计目标是为同步器提供一个基础框架,而不是定义一组行为规范,因此使用抽象类更合适。
4.1 AQS 框架机制
public abstract class AbstractQueuedSynchronizer {
private volatile int state;
// 构造方法等...
// 双向队列
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
// 构造方法等...
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
// 模板方法模式 其他所有方法都是 final
protected abstract boolean tryAcquire(int arg); // 其实有抛出异常的实现,这里仅为方便说明
protected abstract boolean tryRelease(int arg);
protected abstract int tryAcquireShared(int arg);
protected abstract boolean tryReleaseShared(int arg);
// 重要 public 方法:独占模式资源的获取与释放
public final void acquire(int arg) { ... }
public final boolean release(int arg) { ... }
}
AQS 的核心是一个 双向队列(CLH 队列),用于管理等待获取资源的线程。每个线程会被封装成一个 Node 节点,节点中包含了线程的状态(如是否被取消、是否在等待等)以及前驱和后继节点的引用。
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 进一步优化实现的。
AQS 将每条请求共享资源的线程封装成一个 CLH 变体队列的一个结点(Node)来实现锁的分配。在 CLH 变体队列中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
AQS 通过一个 int 类型的 state 变量 来表示资源的状态。state 的具体含义由子类定义:
对于独占锁(如 ReentrantLock),state 通常表示锁的持有状态(0 表示未锁定,1 表示锁定)。
对于共享锁(如 Semaphore),state 通常表示可用资源的数量。
当线程尝试获取资源时,AQS 会调用子类实现的 tryAcquire
方法(独占模式)或 tryAcquireShared
方法(共享模式):
如果获取成功,线程继续执行。
如果获取失败,线程会被封装成 Node 节点,加入等待队列,并进入阻塞状态。
当线程释放资源时,AQS 会调用子类实现的 tryRelease
方法(独占模式)或 tryReleaseShared
方法(共享模式):
如果释放成功,AQS 会从等待队列中唤醒一个或多个线程,使其重新尝试获取资源。
AQS 的等待队列是一个双向链表,通过以下机制管理线程的阻塞和唤醒:
入队:当线程获取资源失败时,会被封装成 Node 节点并加入队列尾部。
出队:当资源被释放时,AQS 会从队列头部唤醒一个或多个线程,使其重新尝试获取资源。
AQS Node 中的 waitStatus
状态类似于 状态机 ,通过不同状态来表明 Node 节点的不同含义,并且根据不同操作,来控制状态之间的流转。
4.2 节点 waitStatus 状态
waitStatus 大致分三种状态:
等于 0:加入队列的新节点的初始状态;
小于 0(如
SIGNAL
):节点处于有效状态,表示节点需要某种形式的处理,处理完成后释放锁,需要唤醒后继结点。大于 0(如
CANCELLED
):节点由于超时、中断等原因被取消处理。
此外,waitstatus 这个变量也需要加上 volatile,除了可能的可见性问题(其实正常的 CLH 队列是不涉及可见性的问题的),我们还要解决重排序问题。JMM 的 Happens-Before(先行发生)规则有一条针对 volatile 关键字的规则:“volatile 变量的写操作发生在该变量的后续读之前”。
4.3 acquire() 独占获取资源
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
在 acquire()
中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:
tryAcquire()
:尝试获取锁(模板方法),AQS
不提供具体实现,由子类实现。addWaiter()
:如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。acquireQueued()
:对线程进行阻塞、唤醒,并调用tryAcquire()
方法让队列中的线程尝试获取锁。
4.3.1 tryAcquire()
模板方法(抽象方法),供子类实现。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
如 ReentrantLock 的实现中,是通过 CAS 更新 state 变量。state == 0 表示资源没有被占用。state > 0 表示资源被占用(此时 state 表示重入次数),如果线程更新 state 变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。
我们重点来看没有获取到资源的情况。
4.3.2 addWaiter()
在通过 tryAcquire() 方法尝试获取资源失败之后,会调用 addWaiter() 方法将当前线程封装为 Node 节点加入 AQS 内部的队列中。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果 pred != null,则证明 tail 节点已经被初始化,直接将 Node 节点加入队列即可。
if (pred != null) {
node.prev = pred;
// 通过 CAS 控制并发安全。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若队列为空,则初始化队列,并将新创建的 Node 节点加入队列。
enq(node);
return node;
}
在执行 addWaiter() 时,如果发现 pred == null ,即 tail 指针为 null,则证明队列没有初始化,需要调用 enq() 方法初始化队列,并将 Node 节点加入到初始化后的队列中,代码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 通过 CAS 操作保证队列初始化的并发安全
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 与 addWaiter() 方法中节点入队的操作相同
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4.3.3 acquireQueued()
在 acquireQueued()
方法中,主要做两件事情:
尝试获取资源: 当前线程加入队列之后,如果发现前继节点是
head
节点,说明当前线程是队列中第一个等待的节点,于是调用tryAcquire()
尝试获取资源。阻塞当前线程 :如果尝试获取资源失败,就需要阻塞当前线程,等待被唤醒之后获取资源。
// AQS:令队列中的节点尝试获取锁,并且对线程进行阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 1、尝试获取锁。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 返回线程的中断状态
return interrupted;
}
// 2、判断线程是否可以阻塞,如果可以,则阻塞当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3、如果获取锁失败,就会取消获取锁,将节点状态更新为 CANCELLED。
if (failed)
cancelAcquire(node);
}
}
1、尝试获取资源
在 acquireQueued()
方法中,尝试获取资源总共有 2 个步骤:
p == head
:表明当前节点的前继节点为head
节点。此时当前节点为 AQS 队列中的第一个等待节点。tryAcquire(arg) == true
:表明当前线程尝试获取资源成功。
在成功获取资源之后,就需要将当前线程的节点 从等待队列中移除 。移除操作为:将当前等待的线程节点设置为 head
节点(head
节点是虚拟节点,并不参与排队获取资源)。
2、阻塞当前线程
在 AQS
中,当前节点的唤醒需要依赖于上一个节点。如果上一个节点取消获取锁,它的状态就会变为 CANCELLED
,CANCELLED
状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过 CANCELLED
状态的节点。
通过 shouldParkAfterFailedAcquire()
方法来判断当前线程节点是否可以阻塞:
// AQS:判断当前线程节点是否可以阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 1、前继节点状态正常,直接返回 true 即可。
if (ws == Node.SIGNAL)
return true;
// 2、ws > 0 表示前继节点的状态异常,即为 CANCELLED 状态,需要跳过异常状态的节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 3、如果前继节点的状态不是 SIGNAL,也不是 CANCELLED,就将状态设置为 SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
4.3.4 线程中断状态
在 AQS
中,acquire
方法通过调用 acquireQueued
实现独占资源的获取。acquireQueued
方法会在自旋等待资源的过程中检查线程的中断状态。如果线程被中断,它会执行以下逻辑:
中断标志设置:
如果线程在等待资源时被中断,
acquireQueued
会设置线程的中断标志(Thread.interrupted()
),并返回true
。
中断处理:
acquire
方法会根据acquireQueued
的返回值判断是否需要处理中断。如果需要处理中断,它会调用selfInterrupt()
方法重新设置线程的中断标志。
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取资源
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 加入等待队列并自旋等待
selfInterrupt(); // 处理中断
}
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 尝试获取资源
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 检查是否需要阻塞
parkAndCheckInterrupt()) // 阻塞并检查中断
interrupted = true; // 设置中断标志
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 阻塞线程
return Thread.interrupted(); // 检查并清除中断标志
}
static void selfInterrupt() {
Thread.currentThread().interrupt(); // 重新设置中断标志
}
自旋等待时中断:
如果线程在自旋等待资源时被中断,
parkAndCheckInterrupt()
会返回true
,并将中断标志传递给acquireQueued
。acquireQueued
会设置interrupted
标志,并继续尝试获取资源。
成功获取资源后中断:
如果线程在获取资源后被中断,
acquireQueued
会返回true
,acquire
方法会调用selfInterrupt()
重新设置线程的中断标志。
中断的意义:
中断标志的设置是为了让调用方知道线程在等待资源时被中断,从而可以决定是否需要处理中断逻辑(例如取消任务、清理资源等)。
4.4 release() 独占释放资源
public final boolean release(int arg) {
// 1、尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 2、唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在唤醒后继节点之前,需要判断是否可以唤醒后继节点,判断条件为: h != null && h.waitStatus != 0
。这里解释一下为什么要这样判断:
h == null
:表明head
节点还没有被初始化,也就是 AQS 中的队列没有被初始化,因此无法唤醒队列中的线程节点。h != null && h.waitStatus == 0
:表明头节点刚刚初始化完毕(节点的初始化状态为 0),后继节点线程还没有成功入队,因此不需要对后续节点进行唤醒。(当后继节点入队之后,会将前继节点的状态修改为SIGNAL
,表明需要对后继节点进行唤醒)h != null && h.waitStatus != 0
:其中waitStatus
有可能大于 0,也有可能小于 0。其中> 0
表明节点已经取消等待获取资源,< 0
表明节点处于正常等待状态。
参考
【强烈推荐】深入浅出 Java 多线程:https://redspider.gitbook.io/concurrent
【【不背八股】80行代码手写AQS实现Lock】 https://www.bilibili.com/video/BV1afAKeBEJR/?share_source=copy_web&vd_source=0acc90ba529bf1b28fdeb3351912e2f2
JavaGuide https://javaguide.cn/java/concurrent/aqs.html
ChatGPT / 腾讯元宝 / DeepSeek 与 小凡雪梨