无题

Java虚拟线程的内幕

1. java虚拟线程的创建方法

1. 通过虚拟线程工厂创建

1
2
3
4
ThreadFactory factory = Thread.ofVirtual()
.name("work-virtual-", 1)
.factory();
factory.newThread(() -> {});

2. 通过线程的静态方法创建

1
Thread.startVirtualThread(() -> {});

3. 通过线程池对象

1
2
3
4
5
6
7
8
ThreadFactory factory = Thread.ofVirtual()
.name("work-virtual-", 1)
.factory();
factory.newThread(() -> {});

// 每个任务对应一个虚拟线程
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
ExecutorService executorService1 = Executors.newThreadPerTaskExecutor(factory);

2. 虚拟线程的原理

1. 虚拟线程的创建流程

不论外部暴露的是何种虚拟线程创建接口,底层都是调用 java.lang.ThreadBuilders#newVirtualThread 方法创建虚拟线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
super(name, characteristics, /*bound*/ false);
Objects.requireNonNull(task);

// choose scheduler if not specified
if (scheduler == null) {
Thread parent = Thread.currentThread();
if (parent instanceof VirtualThread vparent) { // 父线程为虚拟线程则使用父线程的调度器
scheduler = vparent.scheduler;
} else {
scheduler = DEFAULT_SCHEDULER; // 使用系统默认的调度器
}
}

this.scheduler = scheduler; // 设置虚拟线程的调度器
this.cont = new VThreadContinuation(this, task); // 将创建的虚拟线程对象和任务包装成一个协程对象
this.runContinuation = this::runContinuation; // 定义一个执行协程任务的方法
}
  • DEFAULT_SCHEDULER : 默认使用的是 ForkJoinPool 来对虚拟线程任务的执行
  • VThreadContinuation : 继承自 Continuation ,协程对象

2. 虚拟线程的启动流程

1. 线程的 start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 重写了线程的start方法
void start(ThreadContainer container) { // 线程根容器, 记录java进程中线程信息,方便追踪java虚拟线程执行的问题
if (!compareAndSetState(NEW, STARTED)) { // 变更当前线程状态为已启动
throw new IllegalThreadStateException("Already started");
}

// bind thread to container
assert threadContainer() == null; // 确保当前虚拟线程不存在线程容器,避免重复追踪线程信息
setThreadContainer(container); // 设置当前的线程容器为传入的线程容器

// start thread
boolean addedToContainer = false;
boolean started = false;
try {
container.onStart(this); // 调用线程容器的启动方法注册当前虚拟线程到线程容器中
addedToContainer = true;

// scoped values may be inherited
inheritScopedValueBindings(container); // 线程本地变量的继承, 类似于 InheritableThreadLocal

// submit task to run thread
submitRunContinuation(); // 把虚拟线程创建中的 runContinuation 可执行Runnable 放在线程池调度器 scheduler 中执行
started = true;
} finally {
if (!started) {
setState(TERMINATED); // 虚拟线程启动失败则终止虚拟线程
afterTerminate(addedToContainer, /*executed*/false); // 终止之后的通知 jvm 卸载虚拟线程 notifyJvmtiUnmount, 并将虚拟线程从容器中移除
}
}
}

2. runContinuation 运行协程的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void runContinuation() {
// the carrier must be a platform thread
if (Thread.currentThread().isVirtual()) { // 虚拟线程才可进行协程调用逻辑
throw new WrongThreadException();
}

// set state to RUNNING
int initialState = state(); // 获取当前虚拟线程的状态
if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
// first run
// 第一次执行则变更虚拟线程状态为 RUNNING[正在 运行]
} else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
// consume parking permit
setParkPermit(false);
} else {
// not runnable
return;
}

// notify JVMTI before mount
notifyJvmtiMount(/*hide*/true); // 通知挂载虚拟线程

try {
cont.run(); // 协程 jdk.internal.vm.Continuation#run 执行协程
} finally {
if (cont.isDone()) {
afterTerminate(); // 协程执行结束善后工作
} else {
afterYield(); // 协程未执行结束保留现场
}
}
}

3. jdk.internal.vm.Continuation#run 协程运行方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public final void run() {
while (true) {
mount(); // 设置协程为已挂在状态
JLA.setScopedValueCache(scopedValueCache); // 设置域值缓存

if (done) // 协程执行结束则抛出异常
throw new IllegalStateException("Continuation terminated");

Thread t = currentCarrierThread(); // 获取当前虚拟线程所在的java线程
if (parent != null) {
if (parent != JLA.getContinuation(t))
throw new IllegalStateException();
} else
this.parent = JLA.getContinuation(t);
JLA.setContinuation(t, this); // 设置协程到java线程中

try {
boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope());
if (!isStarted()) { // is this the first run? (at this point we know !done) // 是否第一次执行协程
enterSpecial(this, false, isVirtualThread); // 执行协程 - native方法
} else {
assert !isEmpty();
enterSpecial(this, true, isVirtualThread); // 执行协程,从上次运行的位置开始执行 - native方法 , 该方法最关键,执行该方法中,native层会调用java方法 jdk.internal.vm.Continuation#enter,执行协程中的真正逻辑
}
} finally {
fence(); // 内存屏障,避免指令重排
try {
assert isEmpty() == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this));
JLA.setContinuation(currentCarrierThread(), this.parent); // 将父协程设置到java线程中
if (parent != null)
parent.child = null; // 重置父协程的孩子为空

postYieldCleanup(); // 如果协程执行结束则清除当前协程的底栈块为空

unmount(); // 设置协程为未挂在状态
if (PRESERVE_SCOPED_VALUE_CACHE) { // 是否保存协程的域值
scopedValueCache = JLA.scopedValueCache();
} else {
scopedValueCache = null;
}
JLA.setScopedValueCache(null); // 设置当前线程的域值为空
} catch (Throwable e) { e.printStackTrace(); System.exit(1); }
}
// we're now in the parent continuation



// 当前协程片段执行结束之后
assert yieldInfo == null || yieldInfo instanceof ContinuationScope;
if (yieldInfo == null || yieldInfo == scope) {
this.parent = null;
this.yieldInfo = null;
return;
} else { // 继续执行父协程 Continuation ,下面会详细介绍
parent.child = this;
parent.yield0((ContinuationScope)yieldInfo, this);
parent.child = null;
}
}
}

4. native层调用该方法执行协程 jdk.internal.vm.Continuation#enter

1
2
3
4
5
6
7
8
9
10
// This method runs in the "entry frame".
// A yield jumps to this method's caller as if returning from this method.
try {
c.enter0(); // 该方法调用虚拟线程的run方法执行虚拟线程逻辑
} finally {
c.finish();
}

// 虚拟线程的run执行业务逻辑
vthread.run(task);

5. java.lang.VirtualThread#run(java.lang.Runnable)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void run(Runnable task) {
assert state == RUNNING;

// first mount
mount(); // 将当前虚拟线程挂载到java 线程
notifyJvmtiStart(); // 通知java虚拟机工具接口虚拟线程启动执行

// emit JFR event if enabled
// 是否需要发送虚拟线程启动事件
if (VirtualThreadStartEvent.isTurnedOn()) {
var event = new VirtualThreadStartEvent();
event.javaThreadId = threadId();
event.commit();
}

Object bindings = scopedValueBindings(); // 获取当前线程绑定的域值
try {
runWith(bindings, task); // 真正执行虚拟线程任务逻辑
} catch (Throwable exc) {
dispatchUncaughtException(exc); // 调用虚拟线程异常处理逻辑
} finally {
try {
// pop any remaining scopes from the stack, this may block
StackableScope.popAll(); // 弹出所有当前线程的作用域堆栈

// emit JFR event if enabled
if (VirtualThreadEndEvent.isTurnedOn()) { // 发送虚拟线程执行结束事件
var event = new VirtualThreadEndEvent();
event.javaThreadId = threadId();
event.commit();
}

} finally {
// last unmount
notifyJvmtiEnd(); // 通知java虚拟机工具接口虚拟线程结束
unmount(); // 将当前虚拟线程从java 线程卸载

// final state
setState(TERMINATED); // 设置虚拟线程状态终止
}
}
}

3. 虚拟线程的park流程

1. Thread.sleep();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void sleepNanos(long nanos) throws InterruptedException {
assert Thread.currentThread() == this && nanos >= 0;
if (getAndClearInterrupt()) // 清除打断标记
throw new InterruptedException();
if (nanos == 0) {
tryYield(); // 暂停该Continuation执行,并将当前运行虚拟线程的java线程栈帧改变,恢复到Continuation.run()的下一行继续执行java线程正在执行的任务
} else {
// park for the sleep time
try {
long remainingNanos = nanos;
long startNanos = System.nanoTime();
while (remainingNanos > 0) {
parkNanos(remainingNanos); // 根据时间进行park操作
if (getAndClearInterrupt()) {
throw new InterruptedException();
}
remainingNanos = nanos - (System.nanoTime() - startNanos); // 避免虚拟线程虚假唤醒
}
} finally {
// may have been unparked while sleeping
setParkPermit(true); // 设置允许park操作
}
}
}


// 根据时间进行park操作
void parkNanos(long nanos) {
assert Thread.currentThread() == this;

// complete immediately if parking permit available or interrupted
if (getAndSetParkPermit(false) || interrupted)
return;

// park the thread for the waiting time
if (nanos > 0) {
long startTime = System.nanoTime();

boolean yielded = false;
Future<?> unparker = scheduleUnpark(this::unpark, nanos); // 添加定时任务, nanos毫秒后执行虚拟线程的park操作,恢复虚拟线程继续执行
setState(PARKING);
try {
yielded = yieldContinuation(); // may throw // 暂停该Continuation执行,并将当前运行虚拟线程的java线程栈帧改变,恢复到Continuation.run()的下一行继续执行java线程正在执行的任务
} finally {
// 运行到下面这里说明协程被唤醒,继续执行虚拟线程任务
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
}
cancel(unparker); // 取消定时任务
}

// park on carrier thread for remaining time when pinned
if (!yielded) { // 暂停Continuation失败
long deadline = startTime + nanos;
if (deadline < 0L)
deadline = Long.MAX_VALUE;
parkOnCarrierThread(true, deadline - System.nanoTime()); // park java线程,这里java线程阻塞
}
}
}

2. LockSupport.park();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 底层调用虚拟线程的java.lang.VirtualThread#park方法
void park() {
assert Thread.currentThread() == this;

// complete immediately if parking permit available or interrupted
if (getAndSetParkPermit(false) || interrupted)
return;

// park the thread
boolean yielded = false;
setState(PARKING);
try {
yielded = yieldContinuation(); // may throw // 暂停该Continuation执行,并将当前运行虚拟线程的java线程栈帧改变,恢复到Continuation.run()的下一行继续执行java线程正在执行的任务
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
}
}

// park on the carrier thread when pinned
if (!yielded) {
parkOnCarrierThread(false, 0); // park java线程,这里java线程阻塞
}
}

4. 虚拟线程的unpark流程

1. Thread.sleep(); 休眠时间到的unpark操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// sleep的休眠时间会将unprk操作放在定时调度器里面,时间到了自动unpark操作
// 如下代码
Future<?> unparker = scheduleUnpark(this::unpark, nanos); // 添加定时任务, nanos毫秒后执行虚拟线程的park操作,恢复虚拟线程继续执行


void unpark() {
Thread currentThread = Thread.currentThread(); // 获取当前线程
if (!getAndSetParkPermit(true) && currentThread != this) { // 设置当前虚拟线程允许park操作,解释:当前虚拟线程是从park中唤醒执行,并且执行该虚拟线程的线程不是该虚拟线程
int s = state(); // 获取该虚拟线程的状态
if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) { // 当前线程的状态是 PARKED, 将虚拟线程的状态设置为 RUNNABLE 状态
if (currentThread instanceof VirtualThread vthread) { // 如果是虚拟线程执行虚拟线程的unpark操作
vthread.switchToCarrierThread(); // 切换回java线程执行,隐藏虚拟线程的栈帧
try {
submitRunContinuation(); // 提交运行Continuation
} finally {
switchToVirtualThread(vthread); // java线程切换回虚拟线程线程执行,显示虚拟线程的栈帧
}
} else {
submitRunContinuation(); // unpark执行的线程是java线程则直接提交运行Continuation
}
} else if (s == PINNED) { // 如果为PINNED状态 -- 虚拟线程park,并且java线程也park
// unpark carrier thread when pinned.
synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread;
if (carrier != null && state() == PINNED) {
U.unpark(carrier); // unpark java线程
}
}
}
}
}

2. LockSupport.unpark();

1
// 底层方法与上面的相同

5.