无题 发表于 2025-10-08 更新于 2025-10-08
长沙
无题 John Doe 2025-10-08 2025-10-08 Java虚拟线程的内幕
1. java虚拟线程的创建方法 1. 通过虚拟线程工厂创建 1 2 3 4 ThreadFactory factory = Thread.ofVirtual() .name("work-virtual-" , 1 ) .factory(); factory.newThread(() -> {});
2. 通过线程的静态方法创建 1 Thread.startVirtualThread(() -> {});
3. 通过线程池对象 1 2 3 4 5 6 7 8 ThreadFactory factory = Thread.ofVirtual() .name("work-virtual-" , 1 ) .factory(); factory.newThread(() -> {}); ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();ExecutorService executorService1 = Executors.newThreadPerTaskExecutor(factory);
2. 虚拟线程的原理 1. 虚拟线程的创建流程 不论外部暴露的是何种虚拟线程创建接口,底层都是调用 java.lang.ThreadBuilders#newVirtualThread 方法创建虚拟线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { super (name, characteristics, false ); Objects.requireNonNull(task); if (scheduler == null ) { Thread parent = Thread.currentThread(); if (parent instanceof VirtualThread vparent) { scheduler = vparent.scheduler; } else { scheduler = DEFAULT_SCHEDULER; } } this .scheduler = scheduler; this .cont = new VThreadContinuation (this , task); this .runContinuation = this ::runContinuation; }
DEFAULT_SCHEDULER : 默认使用的是 ForkJoinPool 来对虚拟线程任务的执行
VThreadContinuation : 继承自 Continuation ,协程对象
2. 虚拟线程的启动流程 1. 线程的 start() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void start (ThreadContainer container) { if (!compareAndSetState(NEW, STARTED)) { throw new IllegalThreadStateException ("Already started" ); } assert threadContainer () == null ; setThreadContainer(container); boolean addedToContainer = false ; boolean started = false ; try { container.onStart(this ); addedToContainer = true ; inheritScopedValueBindings(container); submitRunContinuation(); started = true ; } finally { if (!started) { setState(TERMINATED); afterTerminate(addedToContainer, false ); } } }
2. runContinuation 运行协程的方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private void runContinuation () { if (Thread.currentThread().isVirtual()) { throw new WrongThreadException (); } int initialState = state(); if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { setParkPermit(false ); } else { return ; } notifyJvmtiMount(true ); try { cont.run(); } finally { if (cont.isDone()) { afterTerminate(); } else { afterYield(); } } }
3. jdk.internal.vm.Continuation#run 协程运行方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 public final void run () { while (true ) { mount(); JLA.setScopedValueCache(scopedValueCache); if (done) throw new IllegalStateException ("Continuation terminated" ); Thread t = currentCarrierThread(); if (parent != null ) { if (parent != JLA.getContinuation(t)) throw new IllegalStateException (); } else this .parent = JLA.getContinuation(t); JLA.setContinuation(t, this ); try { boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope()); if (!isStarted()) { enterSpecial(this , false , isVirtualThread); } else { assert !isEmpty(); enterSpecial(this , true , isVirtualThread); } } finally { fence(); try { assert isEmpty () == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this )); JLA.setContinuation(currentCarrierThread(), this .parent); if (parent != null ) parent.child = null ; postYieldCleanup(); unmount(); if (PRESERVE_SCOPED_VALUE_CACHE) { scopedValueCache = JLA.scopedValueCache(); } else { scopedValueCache = null ; } JLA.setScopedValueCache(null ); } catch (Throwable e) { e.printStackTrace(); System.exit(1 ); } } assert yieldInfo == null || yieldInfo instanceof ContinuationScope; if (yieldInfo == null || yieldInfo == scope) { this .parent = null ; this .yieldInfo = null ; return ; } else { parent.child = this ; parent.yield0((ContinuationScope)yieldInfo, this ); parent.child = null ; } } }
4. native层调用该方法执行协程 jdk.internal.vm.Continuation#enter 1 2 3 4 5 6 7 8 9 10 try { c.enter0(); } finally { c.finish(); } vthread.run(task);
5. java.lang.VirtualThread#run(java.lang.Runnable) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private void run (Runnable task) { assert state == RUNNING; mount(); notifyJvmtiStart(); if (VirtualThreadStartEvent.isTurnedOn()) { var event = new VirtualThreadStartEvent (); event.javaThreadId = threadId(); event.commit(); } Object bindings = scopedValueBindings(); try { runWith(bindings, task); } catch (Throwable exc) { dispatchUncaughtException(exc); } finally { try { StackableScope.popAll(); if (VirtualThreadEndEvent.isTurnedOn()) { var event = new VirtualThreadEndEvent (); event.javaThreadId = threadId(); event.commit(); } } finally { notifyJvmtiEnd(); unmount(); setState(TERMINATED); } } }
3. 虚拟线程的park流程 1. Thread.sleep(); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 void sleepNanos (long nanos) throws InterruptedException { assert Thread.currentThread() == this && nanos >= 0 ; if (getAndClearInterrupt()) throw new InterruptedException (); if (nanos == 0 ) { tryYield(); } else { try { long remainingNanos = nanos; long startNanos = System.nanoTime(); while (remainingNanos > 0 ) { parkNanos(remainingNanos); if (getAndClearInterrupt()) { throw new InterruptedException (); } remainingNanos = nanos - (System.nanoTime() - startNanos); } } finally { setParkPermit(true ); } } } void parkNanos (long nanos) { assert Thread.currentThread() == this ; if (getAndSetParkPermit(false ) || interrupted) return ; if (nanos > 0 ) { long startTime = System.nanoTime(); boolean yielded = false ; Future<?> unparker = scheduleUnpark(this ::unpark, nanos); setState(PARKING); try { yielded = yieldContinuation(); } finally { assert (Thread.currentThread() == this ) && (yielded == (state() == RUNNING)); if (!yielded) { assert state () == PARKING; setState(RUNNING); } cancel(unparker); } if (!yielded) { long deadline = startTime + nanos; if (deadline < 0L ) deadline = Long.MAX_VALUE; parkOnCarrierThread(true , deadline - System.nanoTime()); } } }
2. LockSupport.park(); 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void park () { assert Thread.currentThread() == this ; if (getAndSetParkPermit(false ) || interrupted) return ; boolean yielded = false ; setState(PARKING); try { yielded = yieldContinuation(); } finally { assert (Thread.currentThread() == this ) && (yielded == (state() == RUNNING)); if (!yielded) { assert state () == PARKING; setState(RUNNING); } } if (!yielded) { parkOnCarrierThread(false , 0 ); } }
4. 虚拟线程的unpark流程 1. Thread.sleep(); 休眠时间到的unpark操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Future<?> unparker = scheduleUnpark(this ::unpark, nanos); void unpark () { Thread currentThread = Thread.currentThread(); if (!getAndSetParkPermit(true ) && currentThread != this ) { int s = state(); if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) { if (currentThread instanceof VirtualThread vthread) { vthread.switchToCarrierThread(); try { submitRunContinuation(); } finally { switchToVirtualThread(vthread); } } else { submitRunContinuation(); } } else if (s == PINNED) { synchronized (carrierThreadAccessLock()) { Thread carrier = carrierThread; if (carrier != null && state() == PINNED) { U.unpark(carrier); } } } } }
2. LockSupport.unpark();
5.