掘金 后端 ( ) • 2024-04-26 14:07

前言

虚拟线程(Virtual Thread)(Go语言里叫协程),是Java 19引入的一种轻量级线程,在Java21 转正。

在理解虚拟线程前,我们先回顾一下线程的特点:

  • 线程是由操作系统创建并调度的资源;
  • 线程切换会耗费大量CPU时间;
  • 一个系统能同时调度的线程数量是有限的,通常在几百至几千级别。

因此,我们说线程是一种重量级资源,我们为了增加系统的吞吐量,要不断增加线程的数量,但机器的线程是昂贵的、可用线程数量也是有限的。即使我们使用了各种线程池来最大化线程的性价比,但是线程往往会在 CPU、网络或者内存资源耗尽之前成为我们应用程序的性能提升瓶颈,不能最大限度的释放硬件应该具有的性能。

虚拟线程就是为了解决以上问题,最大限度释放硬件性能,但虚拟线程最适合具有高延迟的任务,例如 I/O 操作、等待锁或线程将花费大量时间等待的任何其他操作,而需要连续计算的CPU密集型场景,并不适合虚拟线程。

以下是平台线程与虚拟线程的关系图 image.png

创建虚拟线程的方式

三种方式效果均一样

/**
 * 方式1:传入Runnable实例并立刻运行
 */
@Test
public void test1() {
    // 方式1:传入Runnable实例并立刻运行:
    Thread vt = Thread.startVirtualThread(() -> {
        System.out.println("Start virtual thread...");
    });

    //这样也行
    Thread.ofVirtual().start(() -> {
        System.out.println("Start virtual thread...");
    });
}

/**
 * 方式2:传入Runnable实例并后置运行
 */
@Test
public void test2() {
    Thread vt = Thread.ofVirtual().unstarted(() -> {
        System.out.println("Start virtual thread...");
    });
    vt.start();
}

/**
 * 方式3:创建ThreadFactory
 */
@Test
public void test3() {
    ThreadFactory tf = Thread.ofVirtual().factory();
    Thread vt = tf.newThread(() -> {
        System.out.println("Start virtual thread...");
    });
    vt.start();
}


/**
 * 方式4:使用ExecutorService  (但是每次submit仍然是新创建虚拟线程 java.util.concurrent.ThreadPerTaskExecutor)
 */
@Test
public void test4() {
    ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

    executor.submit(() -> {
        System.out.println("Start virtual thread...");
        return true;
    });
}

使用虚拟线时还需要线程池吗

答案是,不需要,使用虚拟线程时,每次都创建新的即可,以下是官方文档描述

Do not pool virtual threads

Developers will typically migrate application code to the virtual-thread-per-task ExecutorService from a traditional thread-pool based ExecutorService. A thread pool, like any resource pool, is intended to share expensive resources, but virtual threads are not expensive so there is never a need to pool them.

Developers sometimes use thread pools to limit concurrent access to limited resources. For example, if a service cannot handle more than 20 concurrent requests then making all requests to the service via tasks submitted to a thread pool of size 20 will ensure that. This idiom has become ubiquitous because the high cost of platform threads has made thread pools ubiquitous, but do not be tempted to pool virtual threads in order to limit concurrency. Instead use constructs specifically designed for that purpose, such as semaphores.

In conjunction with thread pools, developers sometimes use thread-local variables to share expensive resources among multiple tasks that share the same thread. For example, if a database connection is expensive to create then you can open it once and store it in a thread-local variable for later use by other tasks in the same thread. If you migrate code from using a thread pool to using a virtual thread per task, be wary of usages of this idiom since creating an expensive resource for every virtual thread may degrade performance significantly. Change such code to use alternative caching strategies so that expensive resources can be shared efficiently among a very large number of virtual threads.

这里有一篇更详细的例子说明:https://zhuanlan.zhihu.com/p/671154148

最佳实现

  1. 若IO密集型的项目中使用到CompletableFuture,可以直接将自定义线程池替换成Executors.newVirtualThreadPerTaskExecutor()
  2. 使用虚拟线程时,synchronized 改为 ReentrantLock,以减少虚拟线程被固定到平台线程
  3. 使用虚拟线程时,不需要池化
  4. 虚拟线程池适合IO密集型应用,CPU密集型还是需要用平台线程池
  5. 结合java21的结构化并发写法,代码更具可读性,StructuredTaskScope底层用的就是虚拟线程
try (var scope = new StructuredTaskScope<>()) {
    // 使用fork方法派生线程来执行子任务
    StructuredTaskScope.Subtask<String> future1 = scope.fork(() -> "111");
    StructuredTaskScope.Subtask<Integer> future2 = scope.fork(() -> 222);

    // 等待线程完成
    scope.join();
    // 结果的处理可能包括处理或重新抛出异常

    System.out.println(future1.get());
    System.out.println(future2.get());

} // close
catch (InterruptedException e) {
    throw new RuntimeException(e);
}

参考文章: https://mp.weixin.qq.com/s/yyApBXxpXxVwttr01Hld6Q