多线程【最新】
写在前面
源于[河北王校长]的学习笔记,图片发烧友,全是PPT的内容,其中有一些自发的笔记内容或者示例和体会,实际字数会有些多
然而质量很高,亦有深度,相信看完一定会有更深的领悟和体会[膜拜大佬😄]
之前笔者也有对多线程的文章,但是之前学习深度有限,仅限于混个眼熟,最近沉淀学习过程中逐渐有了深度,强推这篇作为入门和深入多线程的文章
什么是不可变对象,对写并发有什么帮助
不可变对象(Immutable Object)是指一旦创建就不能被修改的对象。在 Java 中,不可变对象的属性值在对象创建后不可更改,所有的字段都是 final 的,并且没有提供任何修改字段值的方法。
不可变对象具有以下特点:
- 线程安全性:不可变对象天生是线程安全的,因为它的状态不可变,不会出现数据竞争和并发修改的问题。
- 无需同步:由于不可变对象是线程安全的,所以在多线程环境下无需进行同步操作,可以减少锁的竞争和线程切换的开销,提高程序的性能。
- 易于缓存和重用:由于不可变对象的状态不可变,可以安全地在多个线程之间共享和重用,不需要担心对象状态被修改导致程序出现问题。
- 安全性:不可变对象不会被意外修改,可以避免程序中出现一些意外的错误和 bug,提高程序的稳定性和可靠性。
- 优化机会:不可变对象可以在编译期间进行优化,例如可以对对象进行常量折叠、逃逸分析等优化,提高程序的执行效率。
在写并发代码时,使用不可变对象可以大大简化程序的编写和调试,避免了因为状态共享而引入的复杂的同步和锁机制,降低了代码的复杂性和出错的可能性。因此,不可变对象对于写并发代码是非常有帮助的。
线程-概述
说说并发和并行的区别
- 并发(Concurrency): 并发是指在同一时间段内,多个任务交替执行的现象。这些任务可能在时间上有重叠,但在任意时刻只能有一个任务在执行。并发通常用于处理多个任务之间的交互和通信,提高系统资源利用率和响应速度。在单核处理器上,通过操作系统的时间片轮转调度实现任务的并发执行;在多核处理器上,则可以通过线程或进程的并发执行来利用多个核心。
- 并行(Parallelism): 并行是指在同一时间段内,多个任务同时执行的现象。这些任务在不同的处理器核心或计算单元上并行运行,每个任务都可以独立执行,无需等待其他任务的完成。并行通常用于加速任务的执行速度,提高系统的吞吐量和性能。在多核处理器上,通过将任务分配给不同的核心并行执行来实现并行计算。
简而言之,并发是指多个任务之间可能交替执行,而并行是指多个任务在同一时刻同时执行。并发更注重于任务之间的交互和通信,适用于处理多任务的场景;而并行更注重于任务的加速和效率提升,适用于需要高性能计算的场景。在实际应用中,通常会同时考虑并发和并行,以达到更好的系统性能和资源利用率。
说说进程和线程的区别
进程:运行中的程序,是动态的,一个进程包含多条线程。
线程:进程中的一条执行路径\子任务,线程更轻量级,能减少并发执行的时间和空间开销。
把进程当做资源分配的基本单元,把线程当做CPU调度\执行的基本单元,同一个进程的多个线程之间共享资源(代码段、数据段、打开的文件等)。
什么是线程调度器和时间分片
线程调度器(Thread Scheduler)是操作系统中的一部分,负责管理和调度系统中的线程,决定何时、如何执行哪个线程。线程调度器通常根据一定的调度策略来安排线程的执行顺序,以便实现最优的系统性能和资源利用率。
时间分片(Time Slicing)是线程调度器的一种调度策略,也称为抢占式调度(Preemptive Scheduling)。在时间分片调度策略中,每个线程被分配一个时间片(Time Slice),当线程的时间片用完时,调度器会暂停当前线程的执行,并将 CPU 时间分配给其他可运行的线程。这样,系统中的多个线程可以交替执行,实现了多任务并发执行的效果。
时间分片调度策略的优点是能够充分利用 CPU 时间,提高系统的响应速度和吞吐量。同时,由于每个线程都有自己的时间片,因此可以避免某个线程长时间占用 CPU 而导致其他线程无法执行的情况,提高了系统的公平性和稳定性。
需要注意的是,时间分片调度策略是操作系统层面的调度机制,因此不同的操作系统可能有不同的实现方式和调度算法。在 Java 中,线程调度器通常由 JVM 和操作系统共同管理,具体的调度策略可能受到底层操作系统的影响。
JAVA 中用到的线程调度算法是什么
在Java中,采用的是抢占式调度模型。
抢占式调度算法允许操作系统在任意时刻中断当前正在执行的线程,并根据一定的调度策略选择另一个优先级更高的线程执行。这样可以确保高优先级的任务能够及时执行,避免低优先级的任务长时间占用 CPU。
线程优先级和守护线程
线程状态转换
什么情况线程会进入 WAITING 状态
线程在 Java 中可能因为多种原因进入 WAITING 状态。WAITING 状态表示线程正在等待某个特定的条件发生,而不是等待某个资源的释放。以下是一些导致线程进入 WAITING 状态的常见情况:
- 调用了 Object.wait() 方法: 当线程调用某个对象的 wait() 方法时,它会进入 WAITING 状态,直到其他线程调用相同对象的 notify() 或 notifyAll() 方法唤醒它。
- 调用了 Thread.join() 方法: 当一个线程调用另一个线程的 join() 方法时,它会进入 WAITING 状态,直到被等待的线程执行完成。
- 调用了 LockSupport.park() 方法: 当线程调用 LockSupport.park() 方法时,它会进入 WAITING 状态,直到被其他线程调用 unpark() 方法唤醒。
- 调用了 Condition.await() 方法: 当线程调用 Condition.await() 方法时,它会进入 WAITING 状态,直到被关联的锁的条件满足。
- 调用了 Object.wait(long timeout) 方法: 当线程调用带有超时参数的 wait() 方法时,在等待超过指定的超时时间后,它会自动唤醒并离开 WAITING 状态。
- 调用了 Thread.sleep(long millis) 方法: 当线程调用 sleep() 方法时,它会进入 TIMED_WAITING 状态,而不是 WAITING 状态。但如果指定的睡眠时间到期,线程将从 TIMED_WAITING 状态恢复到 RUNNABLE 状态,如果在此期间调用 interrupt() 方法,线程将从 TIMED_WAITING 状态恢复到 RUNNABLE 状态。
总之,WAITING 状态表示线程正在等待某个特定的条件发生,而不是等待某个资源的释放。这些条件包括等待其他线程的通知、等待其他线程执行完成、等待条件满足等。
线程状态源码
上图的线程状态,实际是Thread类中内部的枚举类,每一个状态对应一个枚举,而源码中只有六个枚举,其中RUNNABLE包含了图中的READY和RUNNING
然而,上图中的枚举值还需一些特别的关注,主要看注释信息
- BLOCKED
这里着重看BLOCKED,可以发现【阻塞状态】原来只针对于【monitor lock】也就是监视器锁,只有用【synchronized】关键字的加锁释放锁才会进入这种状态
WAITING
详细注释信息,进入WAITING状态正在等待【特定操作】去唤醒
TIMEWAITING
有三个线程T1,T2,T3,如何保证顺序执行
- **通信:**join()、CountDownLatch、CyclicBarrier、Semaphore
- **排队:**线程池、CompletableFuture
public class SequentialExecution {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println("Thread T1 is executing");
});
Thread t2 = new Thread(() -> {
try {
t1.join(); // 等待线程 t1 执行完成
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread T2 is executing");
});
Thread t3 = new Thread(() -> {
try {
t2.join(); // 等待线程 t2 执行完成
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread T3 is executing");
});
t1.start();
t2.start();
t3.start();
}
}
线程的init解析
概述
触发时机
当通过new Thread()
创建线程对象时,会调用Thread
类的构造方法,而构造方法内部会进一步调用init()
方法来完成线程的初始化工作。这是线程对象创建的核心流程。init
方法的作用- 继承父线程属性:
init
方法会从父线程(即创建该线程的当前线程)继承Daemon
状态、优先级、上下文类加载器(ContextClassLoader
),以及可继承的ThreadLocal
变量(即inheritableThreadLocals
)。 - 初始化线程特有字段:例如线程名称、线程组、线程ID等。若未显式指定线程组,默认使用父线程的线程组。
- 处理
InheritableThreadLocal
:如果父线程的inheritableThreadLocals
不为空,会通过ThreadLocal.createInheritedMap
复制到子线程,实现父子线程间的变量传递。
- 继承父线程属性:
关键代码逻辑
init
方法的调用链如下:javapublic Thread() { init(null, null, "Thread-" + nextThreadNum(), 0); // 默认构造方法触发init } private void init(ThreadGroup g, Runnable target, String name, long stackSize) { init(g, target, name, stackSize, null, true); // 最终调用重载的init方法 }
其中,
inheritThreadLocals
参数默认为true
,表示继承父线程的InheritableThreadLocal
变量。与线程启动的关系
init
仅完成线程对象的初始化,而线程的实际执行需要通过调用start()
方法启动。start()
会触发本地方法start0()
,最终由JVM创建系统级线程并调用run()
方法。
解析
解释:“一个新构造的线程对象,由其parent线程来进行空间分配”
线程的start解析
start的含义
synchronized相关
synchronized的实现原理
无论是ACC_SYNCHRONIZED还是monitorenter、monitorexit都是基于Monitor实现的
- 同步方法:ACC_SYNCHRONIZED
同步方法的常量池中存在 ACC_SYNCHRONIZED 标志。
如果线程想要访问同步方法时,会检查是否有 ACC_SYNCHRONIZED,如果有设置,需要先获取监视器锁,同时执行完成需要释放监视器锁;当其他线程来执行同步方法时,会因无法获得监视器锁而阻塞。
对于获得监视器锁的线程发生异常时,如果内部未处理,会在异常抛到方法外之前,自动释放监视器锁。
- 同步代码块:monitorenter 和 monitorexit
同步代码块使用 monitorenter 和 monitorexit 两个指令实现。
monitorenter 指令指向同步代码块的开始位置(加锁),monitorexit 指令指向的结束位置(释放锁)
每个对象维护着一个记录被锁次数的计数器:
未被锁定的对象的计数器为 0,当一个线程获得锁\执行monitorenter后,该计数器自增变为 1 ,当同一个线程再次获得该对象的锁的时候\可重入锁,计数器再次自增。当同一个线程释放锁\执行monitorexit 后,计数器再自减。当计数器为 0 的时候。锁将被释放,其他线程便可以获得锁。
说说synchronized与ReentrantLock的区别
(1)相同点:两者都是可重入锁
可重入锁:在一个线程中可以多次获取同一把锁
(2)不同点:
- synchronized 依赖于 JVM 层面,而 ReentrantLock 依赖于 API 层面
sychronized是⼀个关键字,ReentrantLock是⼀个类
sychronized是⾃动的加锁与释放锁,ReentrantLock需要程序员⼿动加锁与释放锁
- synchronized只有非公平锁,而ReentrantLock 可以设置公平和非公平锁
- Sychronized通过 监视锁Monitor 控制,ReentrantLock通过 同步状态state 来标识锁的状态
- ReentrantLock 比 Synchronized 增加了一些高级功能:响应中断、超时等待
等待可中断:正在等待的线程可以选择放弃等待,改为处理其他事情;通过lock.Interruptibly()实现。
可实现选择性通知:在使用notify()/notifyAll()方法进行通知时,被通知的线程是由 JVM 选择的,用ReentrantLock类结合Condition实例可以实现“选择性通知”。
synchronized-字节码角度概览
锁升级-Mark Word
- 无锁和偏向锁区别在于【是否是偏向锁】
- 轻量级和重量级锁区别在于【锁标志位】
锁升级-偏向锁
使用偏向锁需要用【使用前提】,如果不遵守前提则无法使用偏向锁
关于【使用前提】的叙述
锁升级-从偏向锁开始可能会发生什么(偏向锁的撤销以及升级)
面试牢记【synchronized锁只能升级,不能降级,但是有锁的释放状态】
针对上图的第四点(不需要背,这只是一个示例,面试的时候别扯这些,很可能会被认为你是错误的或者较真),其实我们可以领悟到,锁的升级过程是在【线程运行过程当中以及争抢过程当中】发生的,一些线程死亡或者说竞争结束以后相当于又开始了新的一轮
总结
面试的时候秉持【只要在偏向锁的过程中有竞争就升级为轻量级锁】,但是如果深挖就主要考虑上面的1~3点,问就是很复杂。
锁升级-轻量级锁的加锁和解锁
锁升级-轻量级锁膨胀成为重量级锁
...
死锁的避免方式
::: tips 前置 PID的作用: 前面显示的数字是Java进程的唯一标识符(PID),与操作系统中的进程ID一致 在死锁场景中,通过jps
快速定位目标Java进程的PID,再用jstack
分析该进程的线程状态 :::
经典答案(过于理论化)
接地气的话术
死锁检查示例
使用命令“jps”输出当前的堆栈信息,如果有死锁的话可以发现
比如这里12472发生了死锁,可以使用“jstack 12472”
ObjectMonitor的属性
synchronized锁(也就是这个Object),也即监视器锁,其中有属性分析如下:
比如waitset,里面放的就是通过这个锁对象调用wait以后进入WAITING状态的线程,他们需要被其他线程调用notify或者notifyall去唤醒
下面来看一个经典的控制两个线程打印奇偶数
public class OddEvenPrinter {
private static final Object lock = new Object();
private static int count = 1;
private static final int MAX = 10;
public static void main(String[] args) {
// 奇数线程
new Thread(() -> {
synchronized (lock) {
while (count <= MAX) {
if (count % 2 == 1) {
System.out.println("奇数线程: " + count);
count++;
lock.notify(); // 唤醒偶数线程
} else {
try {
lock.wait(); // 让出锁并等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}).start();
// 偶数线程
new Thread(() -> {
synchronized (lock) {
while (count <= MAX) {
if (count % 2 == 0) {
System.out.println("偶数线程: " + count);
count++;
lock.notify(); // 唤醒奇数线程
} else {
try {
lock.wait(); // 让出锁并等待
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}).start();
}
}
解析:
- 锁释放与重新获取:- 调用
wait()
时,线程会立即释放锁,允许其他线程进入同步块。被notify()
唤醒后,线程需**重新竞争锁 **,成功获取锁后才会从wait()
返回,继续执行后续代码 - 上下文恢复: 线程唤醒后,JVM 会根据程序计数器恢复执行位置,同时重新检查
while
循环条件(防止虚假唤醒),确保逻辑正确性
为什么需要while
而非if
【虚假唤醒机制】
- 虚假唤醒:即使没有
notify()
,线程也可能被唤醒(如系统调度或中断)。while
循环会重复检查条件,避免因虚假唤醒导致逻辑错误 - 数据一致性:条件可能在被唤醒后再次变化(如多个消费者线程竞争资源),
while
能保证条件最终满足
CPU的用户态和内核态
Java内存模型-JMM
说说你对JMM内存模型的理解
1. 线程的内存使用原理
JMM规定了所有的变量都存储在主内存(Main Memory)中。每个线程还有自己的工作内存,线程的工作内存中保存了该线程使用到的变量的主内存的副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量(volatile变量仍然有工作内存的拷贝,但是由于它特殊的操作顺序性规定,所以看起来如同直接在主内存中读写访问一般)。不同的线程之间也无法直接访问对方工作内存中的变量,线程之间值的传递都需要通过主内存来完成。 特殊点:
- volatile变量虽有工作内存副本,但因内存屏障机制,操作效果等同于直接读写主内存
- 工作内存是JMM抽象概念,实际可能对应CPU缓存、寄存器等硬件结构
2. JMM的设计目的
解决多线程共享内存时的三大问题:
- 可见性问题:线程修改变量后其他线程无法及时感知
- 有序性问题:编译器/处理器优化导致指令执行顺序与代码顺序不一致
- 原子性问题:非原子操作可能被线程切换打断
3. JMM三大特征
(1)原子性
定义:一组操作要么全部执行,要么全部不执行
实现方式:
monitorenter
/monitorexit
字节码指令(对应synchronized
关键字)java.util.concurrent.atomic
包的原子类(基于CAS)- Lock接口的实现类(如ReentrantLock)
(2)可见性
定义:线程修改变量后,新值对其他线程立即可见
实现方式:
volatile
关键字(强制读写主内存)synchronized
同步块(解锁前将变量刷回主内存)final
变量(初始化完成后保证可见性)
(3)有序性
定义:程序执行顺序符合代码的先后顺序
保障机制:
volatile
禁止指令重排序happens-before
原则(包括锁规则、volatile规则、线程启动规则等8项)final
变量的初始化安全
4. happens-before原则(关键规则)
- 程序顺序规则:同一线程内,书写在前面的操作happens-before后面的操作
- 锁规则:解锁操作happens-before后续的加锁操作
- volatile规则:写操作happens-before后续的读操作
- 线程启动规则:线程的start()调用happens-before该线程的所有操作
- 传递性规则:若A happens-before B,B happens-before C,则A happens-before C
JMM-基础
说白了就是每一个线程都有一个用于自己【读取和操作】共享内存的本地副本,而JMM规定了【读取和操作】本地副本时与主内存中共享变量的关系
JMM-指令重排序【重排序的三种类型是核心】
指令重排序的三种类型:
- 编译器优化的重排序
- 指令级并行的重排序
- 内存系统的重排序
JMM-内存屏障
JMM-Happens-Before原则以及as-if-serial语义
总结:
- 两者都是为了在不改变程序的执行结果的大前提下,尽可能提高程序执行的并行度。
- 前者是保证同步的多线程程序,后者是保证单线程程序
JMM-锁的获取与释放对于内存的语义【核心理解】
简而言之:
获得锁就是要把共享变量从主内存中刷新到自己的本地内存(目的是获取最新的值),所以也是获取共享资源然后操作;
释放锁就是要把自己对该共享变量的操作刷回住内存中去(保证其他线程读取到最新的数据),所以也是操作完共享资源以后刷回主存释放资源
volatile关键字
这里要注意的关键点:
- 缓冲行:缓存可以分配的最小存储单位,一个缓存行里面可能会有多个变量
保证可见性
总结:
- 加有volatile关键字的变量在汇编以后会有
lock
的前缀指令 - 将当前处理器缓存行的数据写回到主内存【数据写回】
- 数据写回的操作会使在其他CPU里缓存了该内存地址的数据无效【置缓存无效,强制重新读取最新数据】
原理:
volatile关键字
- 主要作用:
- 保证变量的可见性(一个线程修改后,其他线程能立即看到)
- 防止指令重排序
- 实现原理:通过插入内存屏障(Memory Barrier)指令,强制CPU从主内存读取/写入数据
- 主要作用:
嗅探技术(Snooping)
- 是硬件层面的缓存一致性协议
- 主要作用:在多核处理器系统中维护各个CPU核心缓存的一致性
- 工作原理:每个CPU核心监听(snoop)总线上的缓存操作,当发现其他核心修改了共享数据时,会使自己的缓存行失效
两者关系
volatile
在软件层面提供内存可见性保证,而嗅探是硬件层面实现缓存一致性的机制volatile
的实现可能依赖于底层硬件的嗅探机制- 嗅探技术为
volatile
等同步机制提供了硬件支持 - 在支持嗅探的系统中,
volatile
的性能开销相对较小,因为硬件已经提供了高效的缓存一致性维护
禁止指令重排序
记忆方式:借鉴下图方式
- volatile读(load操作)
- volatile写(store操作)
总结:
- volatile写操作:写前写后
- volatile读操作:读后读后
原理:
实际情况:上述是原则性,但实际情况根据情景删去不必要添加的屏障,下面为例,看实际操作去看即可,但是对于不可预期的操作就得按照原则来添加屏障
内存语义增强
先看之前【普通读写操作和volatile读写操作间没有禁止指令重排序时】可能存在的问题如下
在添加相关规则后如下
双重检查锁问题根源和解决方式
下图关键是:通过new关键字创建一个对象时对应三条指令,但这三条指令之间是有顺序以来的如果发生了指令重排就会有问题
- 分配对象的内存空间
- 初始化对象
- 设置instance指向刚分配的内存地址
针对如上问题,有两种解决方式
- 单例关键字直接加volatile
2.基于类初始化的解决方案(静态内部类)
关键【JVM初始化期间会获取一个锁🔒】
Lock
自己实现一把锁,代码如下,收获颇丰
自定义SelfLock锁如下:
package practice.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 自己实现一把锁
* 1. 实现Lock接口
* 2. 添加继承自AQS的内部类,重写tryAcquire和tryRelease
*/
public class SelfLock implements Lock {
/**
* 锁
*/
private final Sync sync = new Sync();
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 加锁
*
* @param acquires the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return
*/
@Override
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 解锁
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return
*/
@Override
protected boolean tryRelease(int arg) {
// 当前线程释放锁,说明当前线程是持有锁的
if (getState() == 0) {
// 非法监视器状态
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 获取condition对象
*
* @return
*/
Condition newCondition() {
return new ConditionObject();
}
public boolean isLocked() {
return getState() == 1;
}
}
@Override
public void lock() {
/**
* 模板方法,这里回调用tryAcquire
* public final void acquire(int arg) {
* if (!tryAcquire(arg) &&
* acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
* selfInterrupt();
* }
*/
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
/**
* 模板方法,这里回调用tryRelease
* public final boolean release(int arg) {
* if (tryRelease(arg)) {
* Node h = head;
* if (h != null && h.waitStatus != 0)
* unparkSuccessor(h);
* return true;
* }
* return false;
* }
*/
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
/**
* 模板方法,执行tryAcquire
* public final void acquireInterruptibly(int arg)
* throws InterruptedException {
* if (Thread.interrupted())
* throw new InterruptedException();
* if (!tryAcquire(arg))
* doAcquireInterruptibly(arg);
* }
*/
sync.acquireInterruptibly(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isLocked();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
}
测试类如下:
package practice.lock;
import java.util.concurrent.locks.Lock;
/**
* 自定义锁测试类
*/
public class SelfLockTest {
private static final Lock lock = new SelfLock();
public static void main(String[] args) {
Thread threadA = new Thread(() -> tryLock());
threadA.setName("A");
Thread threadB = new Thread(() -> tryLock());
threadB.setName("B");
threadA.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
threadB.start();
}
public static void tryLock() {
System.out.println(Thread.currentThread().getName() + ": I want to in");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + ": I got lock");
while (true) {
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
测试如下:
总结:AQS可以实现扩展性非常强的锁,通过自定义的过程可以总结如下,具体参照上面自定义锁的实现
- 先定义自己的锁类,需要实现Lock接口的方法,这些方法的具体实现需要依靠AQS子类的具体逻辑
- 定义【继承自AQS】的成员,自定义锁的本质是操作AQS其中的state,而具体逻辑则完全自定义
Lock 锁由来及特性和常用API
关键:
- 可以尝试非阻塞获取锁,获取到就持有锁否则也不会阻塞
- 可以响应中断,当获取锁的线程被中断时,中断异常将会被抛出然后锁被释放
- 获取锁可以设置超时时间而不是一直阻塞
说下对AQS的理解
AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。是一个用来构建锁和同步器的抽象框架,只需要继承 AQS 就可以很方便的实现我们自定义的多线程同步器、锁
(1)实现原理
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作,使用CAS对该同步状态进行原子操作实现对其值的修改。
当一个线程释放锁或者同步器时,AQS会通过LockSupport.unpark()方法将等待队列中的第一个线程唤醒,并让其重新尝试获取锁或者同步器。
LockSupport 相比Object 类中的 wait()、notify()、notifyAll()区别 :
- wait/notify/notifyAll 必须在 synchronized 中使用。
- LockSupport 操作更精准,可以准确地唤醒某一个线程。
Java LockSupport.park("线程阻塞的原因"); LockSupport.unpark(new Thread()A);
(4)同步队列使用双向链表的原因
主要在于单链表和双向链表的区别:查询、修改
- 支持中断 acquireInterruptibly()允许线程在做资源竞争时中断
对于中断,将线程节点从AQS同步队列中移除,十分适合双向链表,修改节点的prev和next即可,如果是单向链表,需要从头来寻找删除节点的前驱节点。
- 挂起支持
没有竞争到锁的线程加入到同步队列,并且阻塞等待,前提是当前线程所在节点的前置节点是正常状态\等待状态,这样设计是为了避免链表中存在异常线程导致无法唤醒后续线程的问题,如果没有指针指向前置节点,就需要从Head 节点开始遍历,性能非常低。
- 线程判断 isQueued(Thread thread) 判断thread线程是否正在AQS的等待队列中等待获取同步状态
从队列的队尾开始向前遍历,检查节点所代表的线程是不是传入的线程
选择队尾的原因在于:类似于公平锁,新加入的线程会被添加到队列尾部,从尾部开始可能更快地找到最近加入的线程,从后往前便利确实可能提前返回的概率更大一些
- 减少并发竞争
在AQS中,很多方法的在遍历这个FIFO的队列的时候,是从尾部开始的
在高并发环境中,AQS中的队列的头部一定是最频繁访问和修改的区域,因为头部节点是释放同步状态或是被唤醒的线程的首选位置,从尾部开始遍历可以减少在头部节点上的竞争,尤其是在执行那些不需要立即修改头部节点状态的遍历操作时。
- 支持条件队列
条件队列需要能够从等待队列中移动节点到条件队列,以及反向操作,双向链表使得操作更加直接和高效
队列同步器-AQS 底层原理
最直接的
基于模板方法模式
同步器:双向链表构成的双端队列
- 如果获取同步状态成功,这时只有这一个线程操作,不需要cas操作,直接set
- 如果获取同步状态失败,这些失败的节点都要插入到尾部,存在线程安全问题
源码解读 ReentrantLock-lock()
ReentrantLock 中默认使用非公平锁,下面基于 AQS(AbstractQueuedSynchronizer,抽象队列同步器 )来说明其加锁逻辑:
- 初始尝试获取锁 非公平锁的
lock
方法首先会进行一次CAS
(Compare And Swap,比较并交换)操作尝试获取锁:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
通过compareAndSetState(0, 1)
尝试将 AQS 中的状态变量state
从0改为1。如果修改成功,代表获取锁成功,通过 setExclusiveOwnerThread(Thread.currentThread())
将当前线程设置为锁的独占线程。(表示当前拥有锁的线程是当前线程)
- 未获取到锁时的处理 若初始
CAS
操作失败,即state
不为0,说明锁已被其他线程持有,此时调用acquire(1)
方法(看上面的代码):
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 再次尝试获取锁(
tryAcquire
),tryAcquire
方法最终会调用nonfairTryAcquire
:- 若
state
为0,说明之前持有锁的线程已释放锁,再次通过CAS
尝试获取锁,成功则将当前线程设为独占线程并返回true
- 若
state
不为0,且当前线程是锁的独占线程(可重入情况),将state
加上acquires
(一般为1),更新state
后返回true
- 其他情况返回
false
,表示获取锁失败
- 若
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果刚才占有锁的线程已经释放了,状态即恢复为0
if (c == 0) {
// 如果是公平锁,这里唯一的区别就是增加条件 !hasQueuedPredecessors()
// 也就是当前等待队列没有节点时可以直接cas
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果占有锁的线程恰好是自己,那说明是可重入的逻辑
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 以上条件均不符合,说明仍有其他线程占有锁,尝试获取即失败
return false;
}
- 加入等待队列( addWaiter ): 如果
tryAcquire
第二次尝试获取锁失败,通过addWaiter(Node.EXCLUSIVE)
将当前线程封装成一个独占模式的Node节点 ,尝试加入等待队列尾部。- 先判断队列尾节点
tail
是否为null,不为null时,通过CAS
将当前节点设为队尾 - 若
tail
为null或CAS
操作失败,调用enq
方法,通过自旋方式初始化队列并将节点加入队列
- 先判断队列尾节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 在队列中等待并尝试获取锁(
acquireQueued
): 节点入队后,调用acquireQueued
方法让线程在队列中持续等待获取锁:- 不断检查当前节点的前驱节点是否为头节点
head
,若是且再次尝试获取锁(tryAcquire
)成功 ,将当前节点设为头节点并返回 - 若前驱节点不是
head
或tryAcquire
失败,通过shouldParkAfterFailedAcquire
方法判断是否该阻塞当前线程( 即挂起当前线程),满足条件则调用parkAndCheckInterrupt
阻塞线程,等待被唤醒后再次尝试获取锁。 shouldParkAfterFailedAcquire
:检查是否需要阻塞(避免忙等待),并清理无效的前驱节点parkAndCheckInterrupt
:通过LockSupport.park()
挂起线程,进入阻塞状态(此时线程卡在死循环内,但不再消耗 CPU)- 被唤醒后(例如前驱节点释放锁时唤醒),线程会继续循环,再次尝试获取锁
- 不断检查当前节点的前驱节点是否为头节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断检查标志
boolean interrupted = false;
// 死循环
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结
非公平锁在加锁时,不遵循先来后到顺序,线程启动后直接尝试获取锁,有机会“插队”获取锁,减少了线程调度开销,但可能导致部分线程等待时间过长。
源码解读 ReentrantLock-lock()公平锁逻辑【核心方法】
有了上面对非公平锁的逻辑源码解读之后,对于公平锁我们就很快上手了,下面是非公平锁和公平锁的核心区别,即 hasQueuedPredecessors
方法,表示当前队列中是否有等待的线程,只要这个条件符合,那么当前线程就不能尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前锁状态(0 表示未占用,>0 表示被占用)
if (c == 0) { // 锁未被占用
if (!hasQueuedPredecessors() && // 关键:检查是否有线程在排队(公平性核心)
compareAndSetState(0, acquires)) { // CAS 尝试获取锁
setExclusiveOwnerThread(current); // 设置当前线程为锁持有者
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 锁已被当前线程持有(可重入)
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // 溢出检查(理论上不可能)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 更新重入次数
return true;
}
return false; // 获取锁失败
}
下面即这个核心方法,关键在于return的条件,下面给出分情况表述
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
h != t
- 判断头节点
head
和尾节点tail
是否相等。 - 如果
h == t
,说明队列为空(或只有一个虚拟节点),没有其他线程在排队,直接返回false
。 - 如果
h != t
,继续检查后续条件。
- 判断头节点
(s = h.next) == null
- 获取头节点的后继节点
s
(即队列中的第一个实际等待线程)。 - 如果
s == null
,说明有其他线程正在竞争入队(可能是并发操作导致中间状态),此时保守认为存在前驱节点,返回true
。
- 获取头节点的后继节点
s.thread != Thread.currentThread()
- 如果
s
不为null
,检查s
节点关联的线程是否是当前线程。 - 如果不是当前线程,说明队列中有其他线程更早等待,返回
true
(需要排队)。 - 如果是当前线程,说明当前线程已经是队列中的第一个等待者,可以尝试获取锁,返回
false
。
- 如果
ReentrantReadWriteLock原理
下图中的【锁降级】不是我们学synchronized的时候【锁升级】的反转意义,我们知道对于synchronized来说没有锁降级这一说,这里的【降级】特别指的是【写锁降级成为读锁】
ReentrantReadWriteLock-读写状态设计【state】
一个state
变量如何区分读写两种状态的锁呢?如下图,高低位区分,写锁低位,读锁高位
因此在ReentrantReadWriteLock里一定有专门针对高效 读取高低位的方法,区分读写锁的状态
ReentrantReadWriteLock-写锁的获取与释放
主要理解的一点:写锁除了对自己以外的线程都是互斥的。这是因为对于自己这个线程来说是【可重入性】,对于其他写线程互斥;而对于读锁来说【 要保证写锁的操作对读锁的可见性】,因此对于读锁也是互斥的
ReentrantReadWriteLock-读锁的获取与释放
ReentrantReadWriteLock-锁降级
LockSupport工具类【使用即可】
这个工具类的实现直接使用Unsafe类的操作,源码上没有什么复杂内容,必要时使用即可
针对LockSupport的叙述
- LockSupport 是什么?
LockSupport
是 Java 并发包中的一个底层线程阻塞/唤醒工具类,它直接基于 Unsafe
类实现,提供以下核心方法:【直接操作线程】
park()
:阻塞当前线程(除非有许可证)。unpark(Thread thread)
:唤醒指定线程(发放许可证)。parkNanos(long nanos)
:限时阻塞。
特点:
- 无锁机制:不依赖
synchronized
或ReentrantLock
。 - 精准控制:直接针对线程操作(类似
Thread.suspend/resume
的安全版)。 - 许可证机制:
unpark
先于park
调用时,park
不会阻塞。
- 基本使用示例
Thread thread = new Thread(() -> {
System.out.println("线程即将阻塞");
LockSupport.park(); // 阻塞当前线程
System.out.println("线程被唤醒");
});
thread.start();
Thread.sleep(1000);
LockSupport.unpark(thread); // 唤醒指定线程
输出:
线程即将阻塞
线程被唤醒
- 实际使用示例
ReentrantLock
的底层阻塞/唤醒(通过 AQS)间接依赖 LockSupport
,例如:
- 阻塞:
AQS.acquire()
最终调用LockSupport.park()
。 - 唤醒:
AQS.release()
调用LockSupport.unpark()
。
- 总结
LockSupport
:
适合需要直接控制线程阻塞/唤醒的场景(如实现自定义同步器)。ReentrantLock
:
基于LockSupport
和 AQS 实现的高级别锁,提供更丰富的功能(可重入、公平性、条件变量)。
使用建议:
- 优先用
ReentrantLock
或AQS
衍生的工具类(如CountDownLatch
)。 - 仅在实现底层同步组件时直接使用
LockSupport
。
Condition
使用案例【模拟生产者消费者模型】
package com.boot.jdk;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockCondition<T> {
private Object[] items;
// 添加的下标,删除的下标和数组当前数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock(); //创建锁
private Condition notEmpty = lock.newCondition();//创建condition。 第1个等待队列
private Condition notFull = lock.newCondition(); //创建condition 第2个等待队列
// 阻塞队列都是如此实现?
// 队列头和队列尾都是分别创建了一个condition,为了将队列的双端的等待队列进行区分,互不影响
public LockCondition(int size) {
items = new Object[size];
}
// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"【生产者】
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) // 如果生产已满
notFull.await(); //生产者暂定生产,去等待队列吧
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal(); //告诉消费者线程开始消费吧
} finally {
lock.unlock();
}
}
// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素【消费者】
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); //没东西了,消费者的线程无法进行消费,进入等待队列
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();// 通知生产者线程赶紧工作。
return (T) x;
} finally {
lock.unlock();
}
}
}
Condition原理概述
有了对AQS的理解,这里看Condition会很容易
Condition和ReentrantLock的关系【核心原理,看完包加深理解】
Condition
是一个接口,其默认实现(如 ConditionObject
)是 AQS 的内部类。例如:
public class ReentrantLock {
private final Sync sync; // Sync 是 AQS 的子类
public Condition newCondition() {
return sync.newCondition(); // 返回 AQS 内部的 ConditionObject
}
}
“ReentrantLock加锁的队列是一个AQS的同步队列,Condition的await加入的是Condition的等待队列,这两个队列机制是一样的【因为Condition的实现类往往是AQS的内部类】,但是不是同一个队列【指向不同的引用】”
- 两个独立的队列,但机制相似
- AQS 同步队列(锁队列):当线程调用
lock.lock()
竞争锁失败时,会进入这个双向链表结构的队列等待锁释放(比如ReentrantLock
的排队机制)。 - Condition 等待队列:当线程持有锁后调用
condition.await()
,会释放锁并进入这个单向链表 【为什么是单向考虑到了场景和性能等,不需要纠结】结构的队列(ConditionObject
内部维护的队列),等待被signal()
唤醒。
- AQS 同步队列(锁队列):当线程调用
- 协作流程(关键!)
线程A 持有锁,调用
condition.await()
:线程A会释放锁,并把自己包装成节点加入Condition
的等待队列。其他线程(如线程B)此时可以抢锁。线程B 持有锁,调用
condition.signal()
:AQS 会把Condition
等待队列中的头节点(比如线程A)转移到 AQS 同步队列的尾部,让该A线程等待重新竞争锁。线程B释放锁后,线程A才有机会从await()
处唤醒并继续执行。
- 为什么需要两个队列?
- 职责分离:
- AQS 同步队列管锁的竞争(谁拿到锁谁执行)。
- Condition 队列管条件的等待(满足条件再唤醒,避免忙等)。
- 灵活性:一个锁可以关联多个
Condition
(例如生产者消费者模型中的“非空”“非满”两个条件)。
- 职责分离:
原子操作类【掌握使用即可】
经常使用的是原子的基本类型和引用类型,对于数组类型和字段修改基本不会用到。
对于数组引用类,只是复制了传参的array对象,操作并不对array有影响,只是一个备份数据
public class ArrayAtomicTest {
static int[] array = new int[2];
public static void main(String[] args) {
array[0] = 10;
array[1] = 20;
AtomicIntegerArray aia = new AtomicIntegerArray(array);
aia.compareAndSet(0, 10, 11);
System.out.println(aia.get(0));//11
System.out.println(array[0]);//10
}
}
Java并发工具类
CountdownLatch
CountDownLatch
的设计初衷是用于线程间的同步协调,不需要显式加锁(如 synchronized
或 ReentrantLock
),其内部已通过 AQS(AbstractQueuedSynchronizer) 实现了线程安全的计数器操作。
注意点:
- 不可重置性:
CountDownLatch
的计数器归零后无法重置,若需重复使用,应选择CyclicBarrier
- 异常处理:确保
countDown()
在finally
块中调用,避免线程异常导致计数器未递减
可以看到CoundownLatch依然是依靠AQS实现,state
在这里就是初始化的CountdownLatch的倒计数
CyclicBarrier
CyclicBarrier和CountdownLatch在理论上能完成一样的需求,但是在选型上有区别,使用示例: 【总结来说就是CyclicBarrier可以做更复杂的场景,在线程全部到达屏障的时候可以执行一个Runnable】
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 创建CyclicBarrier,参数1:等待的线程数,参数2:所有线程到达后执行的任务
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("\n所有线程已完成计算,开始汇总结果...");
System.out.println("汇总完成!\n");
});
// 创建并启动3个线程
for (int i = 1; i <= 3; i++) {
new Thread(new Worker(barrier, i), "线程-" + i).start();
}
}
static class Worker implements Runnable {
private final CyclicBarrier barrier;
private final int threadNum;
public Worker(CyclicBarrier barrier, int threadNum) {
this.barrier = barrier;
this.threadNum = threadNum;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 开始计算...");
// 模拟计算耗时
Thread.sleep((long) (Math.random() * 2000));
int result = threadNum * 100;
System.out.println(Thread.currentThread().getName() + " 计算完成..");
// 等待其他线程
barrier.await();
// 所有线程到达屏障后继续执行
System.out.println(Thread.currentThread().getName() + " 继续执行后续任务...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
Semaphore【信号量机制】
核心方法:acquire()
,release()
,基于AQS(AbstractQueuedSynchronizer)实现,通过CAS操作减少许可计数。若计数为0,线程加入等待队列【类似锁】
信号量,非常重要,可以做很多场景,比如数据库连接限量,限流器的实现,生产消费模式,限制并发访问数这些场景
// [简单的限制并发线程数场景]
import java.util.concurrent.Semaphore;
public class BasicSemaphoreExample {
private static final Semaphore semaphore = new Semaphore(3); // 允许3个线程同时访问
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new Task(i)).start();
}
}
static class Task implements Runnable {
private final int taskId;
public Task(int id) {
this.taskId = id;
}
@Override
public void run() {
try {
System.out.println("任务 " + taskId + " 等待获取许可");
semaphore.acquire(); // 获取许可
System.out.println("任务 " + taskId + " 已获取许可,开始执行");
Thread.sleep(2000); // 模拟任务执行
System.out.println("任务 " + taskId + " 执行完成,释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}
}
}
// [生产者消费者模型示例]
import java.util.concurrent.Semaphore;
public class ProducerConsumer {
private static final int BUFFER_SIZE = 10;
private final Semaphore empty = new Semaphore(BUFFER_SIZE); // 空槽信号量
private final Semaphore full = new Semaphore(0); // 满槽信号量
private final Semaphore mutex = new Semaphore(1); // 互斥信号量
private int[] buffer = new int[BUFFER_SIZE];
private int in = 0, out = 0;
public void produce(int item) throws InterruptedException {
empty.acquire(); // 等待空槽
mutex.acquire(); // 获取互斥锁
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
System.out.println("生产: " + item);
mutex.release();
full.release(); // 增加满槽
}
public int consume() throws InterruptedException {
full.acquire(); // 等待满槽
mutex.acquire(); // 获取互斥锁
int item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
System.out.println("消费: " + item);
mutex.release();
empty.release(); // 增加空槽
return item;
}
}
Exchanger
Exchanger作为线程间数据交换的通道,使用上利用exchange()
的阻塞特性,
// [数据交换示例]
import java.util.concurrent.Exchanger;
public class BasicExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread thread1 = new Thread(() -> {
String data1 = "来自线程1的数据";
System.out.println("线程1正在等待交换数据...");
try {
String received = exchanger.exchange(data1);
System.out.println("线程1 收到了数据: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread thread2 = new Thread(() -> {
String data2 = "来自线程2的数据";
System.out.println("线程2正在等待交换数据...");
try {
String received = exchanger.exchange(data2);
System.out.println("线程2 收到了数据: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread1.start();
thread2.start();
}
}
线程安全的队列【核心:阻塞队列,铺垫线程池】
ConcurrentLinkedQueue【非阻塞队列,了解即可】
注意点
入队和出队都是类似的原理 tail
是一个近似指针,并不严格指向队列的实时尾节点,而是“接近”尾节点。入队操作优先尝试快速路径(直接链接到 tail.next
),失败时退化为遍历找尾节点。这种设计通过牺牲部分读操作(遍历)来减少写操作(CAS 更新 tail
),从而提升高并发下的性能。
- 如果
tail
是当前尾节点(即tail.next == null
),直接将新节点链接到tail.next
。 - 如果
tail
不是尾节点(即tail.next != null
),则通过遍历找到真正的尾节点(可能是tail.next
或更后面的节点),再链接新节点。
阻塞队列【核心,重点】
JDK 7提供了7个阻塞队列:
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
下图是阻塞队列的一个一般API表述,根据不同的实现可能具体方法有一些偏差,这里要注意的是put
和take
阻塞式的插入或者移除
ArrayBlockingQueue 与 LinkedBlockingQueue
不同点:
- 锁的使用:前者使用了一把全局锁【无论
take
还是put
】,读写相互影响;后者对take
和put
有单独的锁,存取互不影响 。因此从这个角度来看,后者更适合高并发和高吞吐量,单位时间内处理更多读写请求。 - 数据结构:前者是Object数组,内存空间连续不需要额外的空间,后者是Node节点串联的链表,因此对jvm内存空间消耗需要垃圾回收,并且地址不连续
- 基于数据结构有会产生影响:前者是数组,初始化需要指定长度;后者基于链表,不需要初始化长度
效率?如何选择?【没有办法界定,要看实际情况】
- 有条件的话,对两种队列进行压测【性能测试】
- 写多读少:建议ArrayQueue【因为数据量多】
- 读多写少:建议LinkedQueue【因为数据量少】
- 队列定长:建议ArrayQueue
PriorityBlockingQueue 和 DelayQueue
前者基于优先级队列【实现一些接口去排序】,使用上和前面说的ArrayBlockingQueue差不多,而DelayQueue
是这里要说的重点
Delayed接口和DelayQueue是Java并发包中用于实现延迟任务调度的核心组件,广泛应用于定时任务、缓存过期、订单超时等场景。
一、Delayed接口解析
Delayed接口是DelayQueue队列中元素必须实现的接口,它定义了延迟任务的核心行为。Delayed接口继承自Comparable接口,定义如下:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
getDelay(TimeUnit unit)
该方法返回对象的剩余延迟时间:
- 返回值为0或负数表示延迟时间已到
- 正数表示还需要等待的时间
- 参数TimeUnit指定返回值的单位
compareTo(Delayed o)
从Comparable接口继承,用于定义延迟任务的排序规则:
- 通常按照到期时间(executeTime)排序
- 到期时间越早的任务优先级越高
典型实现示例:
public class DelayTask implements Delayed {
private long executeTime; // 执行时间戳
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayTask)o).executeTime);
}
}
实现时需要注意:
- 必须同时实现getDelay()和compareTo()方法
- compareTo()要保持与getDelay()一致的排序逻辑
二、DelayQueue实现原理
DelayQueue是一个支持延迟获取元素的无界阻塞队列,内部基于PriorityQueue和ReentrantLock实现。
1.有如下特性:
- 延迟获取:元素只有到期后才能被取出
- 无界队列:没有容量限制(但需注意内存问题)
- 线程安全:内部使用锁机制保证线程安全
- 自动排序:元素按到期时间排序,最早到期的在队首
- 关键内部结构
- lock:可重入锁,保证线程安全
- q:优先队列,实际存储元素并排序
- leader:采用Leader-Follower模式优化,减少不必要竞争
- available:条件变量,用于线程等待/唤醒
public class DelayQueue<E extends Delayed> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
使用案例
// 1. 定义延迟任务
public class DelayTask implements Delayed, Runnable {
private String taskName;
private long executeTime;
public DelayTask(String name, long delay, TimeUnit unit) {
this.taskName = name;
this.executeTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayTask)o).executeTime);
}
@Override
public void run() {
System.out.println("执行任务: " + taskName);
}
}
// 2. 使用DelayQueue
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<DelayTask> queue = new DelayQueue<>();
// 添加延迟任务
queue.put(new DelayTask("Task1", 2, TimeUnit.SECONDS));
queue.put(new DelayTask("Task2", 5, TimeUnit.SECONDS));
queue.put(new DelayTask("Task3", 1, TimeUnit.SECONDS));
// 消费者线程
new Thread(() -> {
while (true) {
try {
DelayTask task = queue.take();
task.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
SynchronousQueue
SynchronousQueue是一种零容量的阻塞队列,它不存储任何元素
- 直接传递机制:每个插入操作(put)必须等待一个对应的移除操作(take),反之亦然
- 零容量:内部没有任何存储元素的能力,size()方法总是返回0,peek()总是返回null
- 线程安全:通过乐观锁(compare-and-set)实现线程安全
- 公平性策略:支持公平和非公平两种模式
LinkedTransferQueue
LinkedBlockingDeque
Fork/Join框架
Fork/Join框架,与传统线程池有何不同
Fork/Join框架是Java 7中引入的一个并行执行框架,用于解决递归划分任务并行执行的问题。它与传统的线程池有几点不同之处:
- 任务分解方式: Fork/Join框架采用的是“分治”的思想,即将一个大任务划分为多个小任务并行执行,直到任务足够小无法再分解为止。而传统的线程池通常是将多个任务提交到线程池中,并由线程池中的工作线程进行执行。
- 工作窃取算法: Fork/Join框架中的工作线程采用了一种称为“工作窃取”(Work-Stealing)的算法。每个工作线程都维护着一个自己的双端队列,当自己的队列为空时,它会去其他线程的队列中“窃取”任务执行,以提高并行执行效率。传统线程池中的工作线程则通常采用固定的分配方式,任务分配给哪个线程由线程池的调度器决定。
- 任务类型限制: Fork/Join框架适用于任务之间存在明显的父子关系,即一个大任务可以划分为多个子任务并行执行,子任务执行完毕后可以合并结果。而传统线程池可以执行任意类型的任务,没有限制。
- 任务提交方式: Fork/Join框架中的任务是通过ForkJoinPool的invoke()方法提交的,而传统的线程池通常是通过ExecutorService的submit()方法或execute()方法提交任务的。
总的来说,Fork/Join框架适用于解决大型递归任务并行执行的问题,它采用了特殊的任务分解方式和工作窃取算法,能够提高任务的并行度和执行效率。传统的线程池则更加灵活,适用于执行各种类型的任务。
工作窃取算法
使用Fork/Join框架
线程池及Executor框架
线程池中核心线程数量大小怎么设置
公式只是用于参考,影响因素有很多,真实的业务场景还是要根据业务需求和压测反馈去调整参数【所以推荐走后门的方式设置监控,以及配置中心动态的调整参数】
- IO密集型应用:大部分的耗时是用来进行I/O操作 最佳线程数 = CPU核心数*2 + 1
- CPU密集型应用:大部分时间都在进行CPU计算 最佳线程数 = CPU核心数+1
判断线程池任务执行完成的方式
- 使用 Future:
- 如果任务是通过 submit() 方法提交给线程池的,可以通过 Future 对象来判断任务是否完成。调用 Future 对象的 isDone() 方法可以判断任务是否执行完成。
- 使用 CompletionService:
- CompletionService 是 Java 提供的用于异步执行任务并获取结果的工具类。可以通过 CompletionService 的 poll() 或 take() 方法来获取已完成的任务。
- 监控线程池状态:
- 可以通过监控线程池的状态来判断任务是否执行完成。例如,通过线程池的 getActiveCount() 方法获取活动线程数,如果活动线程数为 0,则说明所有任务已执行完成。
- 使用计数器:
- 可以使用计数器(如 CountDownLatch)来统计任务的完成情况。每个任务执行完成时,计数器减一,当计数器值为 0 时,说明所有任务已执行完成。
- 周期性检查任务状态:
- 可以周期性地检查任务的执行状态,例如使用定时任务或者轮询方式。当所有任务都执行完成时,可以根据需要执行后续操作。
线程池中submit()和execute()方法的区别
在Java线程池中,submit()
和execute()
都是用于向线程池提交任务的方法,但它们有几个关键区别:
主要区别
返回值:
execute()
方法没有返回值(void)submit()
方法返回一个Future
对象,可以用来获取任务执行结果或检查任务状态
异常处理:
execute()
方法中抛出的异常会直接由执行任务的线程抛出submit()
方法会将异常封装在Future
对象中,需要通过Future.get()
方法获取
接受参数类型:
execute()
只接受Runnable
接口的实现submit()
可以接受Runnable
或Callable
接口的实现
使用示例
ExecutorService executor = Executors.newFixedThreadPool(2);
// 使用execute()
executor.execute(() -> {
System.out.println("Task executed via execute()");
});
// 使用submit() with Runnable
Future<?> future1 = executor.submit(() -> {
System.out.println("Task executed via submit() with Runnable");
});
// 使用submit() with Callable
Future<String> future2 = executor.submit(() -> {
return "Result from Callable";
});
try {
System.out.println(future2.get()); // 获取Callable的返回结果
} catch (Exception e) {
e.printStackTrace();
}
何时使用哪个
- 当不需要任务执行结果时,可以使用
execute()
- 当需要获取任务执行结果或需要处理任务抛出的异常时,使用
submit()
- 当任务有返回值时,必须使用
submit()
并传入Callable
对象
两者都会将任务添加到线程池的工作队列中,由线程池中的线程执行,核心区别在于对任务结果和异常的处理方式。
JDK提供的预设线程池【建议自定义】
为什么建议自定义还要有预设的呢???单纯是为了让我们创建线程池的时候方便一些,不能代表他们的实用性。
Fixed
有界队列和无界队列的区别确实与容量限制有关,但关键在于是否严格限制容量,而不是是否传入参数。
- 有界队列(Bounded Queue)
- 必须指定固定容量,队列大小严格受限。
- 示例:
new ArrayBlockingQueue<>(100)
→ 队列最多容纳100个元素,插入第101个元素时会阻塞或报错。
- 无界队列(Unbounded Queue)
- 无需严格限制容量,可以动态扩容(理论上限通常是
Integer.MAX_VALUE
)。 - 示例:
new LinkedBlockingQueue<>()
→ 默认允许添加最多Integer.MAX_VALUE
个元素,几乎不会因容量满而阻塞。
- 注意事项
- 无界队列的风险:
虽然使用方便,但可能因生产者速度过快导致内存溢出(OOM),需谨慎使用。 - 显式指定容量:
即使使用LinkedBlockingQueue
,也可以通过new LinkedBlockingQueue<>(100)
将其变为有界队列。
- 无界队列的风险:
Single【Fixed的特殊情况】
Cached【根据需要创建新的线程】
- “传球手队列”:SynchronousQueue
- 核心线程数为0:有任务的时候直接创建新线程
Scheduled【定时场景】
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}
使用【延迟队列】,之前我们对延迟队列有很深的理解,说白了就是根据延迟时间做优先级排列,队首元素是最先到期的任务。