掘金 后端 ( ) • 2024-04-14 15:19

为什么需要使用这个技术?

  1. 因为目前我们公司出现出现个别接口变慢的情况,有些是为了执行大量 IO 操作和调用其他的服务,具有 I/O 密集型特点。

    1. 执行 IO 操作会导致程序堵塞,CPU 可能会处于空闲状态,因为 CPU 在等待数据到来或者写入的过程中没有其他计算任务。

如下图

sequenceDiagram    
    当前线程 ->>+ 服务 1: 请求数据
    服务 1 ->> 当前线程: 返回数据: 300ms

    当前线程 ->> 查询数据: 请求数据
    查询数据 ->> 当前线程: 返回数据: 300ms
    当前线程 ->> 服务 2: 请求数据
    服务2 ->> 当前线程: 返回数据:200ms
    

上面请求完成需要至少 300ms + 300ms + 200ms = 800ms

如果改用并行方式大概需要 max(300ms, 300ms, 200ms) = 300ms 左右(因为CPU有上下文切换的消耗)

CompletableFuture 与 Future 关系是什么?

简单理解:其实他们的关系就差个 Completable

Completable:这个词强调了 CompletableFuture 与普通 Future 的不同之处。"Completable" 意味着这个对象不仅可以代表一个异步操作的结果,还可以在操作完成时附加额外的动作或计算。

  • 它提供了更丰富的操作,比如可以附加多个回调函数(通过 thenApply, thenAccept, thenRun 等方法),这些回调会在异步操作完成时执行。
  • 它支持更复杂的组合和链接操作,可以通过 thenCompose 方法将多个 CompletableFuture 对象串联起来,形成一个复杂的异步处理流程。
  • 它还提供了异常处理的能力,可以通过 exceptionallyhandle 方法来处理异步操作中发生的异常。

快速开始

无依赖任务

描述

无依赖任务是指多个任务间不存在依赖关系即每个任务输入均不依赖与其他任务的输出。

stateDiagram-v2 任务1 任务2 任务3

实现

CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            return "我是任务1返回值";
});

        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            return "我是任务2返回值";
});

CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "我是任务3返回值";
});

// 它能够等待所有传入的 CompletableFuture 任务完成
CompletableFuture.allOf(task1, task2, task3).thenAccept(v -> {
         System.out.println("任务1返回值:" + task1.join());
         System.out.println("任务2返回值:" + task2.join());
         System.out.println("任务3返回值:" + task3.join());
}).join();

或者

public static String getTask(String task) {
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "我是任务" + task + "返回值";
}

public static void main(String[] args) {
        List<String> allResult = Stream.of(1, 2, 3)
                .map(i -> CompletableFuture.supplyAsync(() -> getTask(String.valueOf(i))))
                .collect(Collectors.toList())
                .stream().map(CompletableFuture::join)
                .collect(Collectors.toList());
        System.out.println(allResult);
}

存在依赖任务

存在依赖任务是指多个任务间依赖关系即某一个或某几个任务的输出是另外的任务的输入

stateDiagram-v2
    任务1 --> 任务2: 结果
    任务3 --> 任务4: 结果
    任务2 --> 任务4: 结果
    
    
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
       System.out.println("任务1");
       return "我是任务1返回值";
});

CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
      System.out.println("任务3");
      return "我是任务3返回值";
});


CompletableFuture<String> task2 = task1.thenApply(res -> {
     System.out.println("任务2");
     return "我是任务2返回值";
});

// task4
CompletableFuture.allOf(task2, task3)
      .thenAccept((v) -> {
           System.out.println("任务1返回值:" + task1.join());
           System.out.println("任务2返回值:" + task2.join());
           System.out.println("任务3返回值:" + task3.join());
}).join();

CompletableFuture 原因&原理

CompletableFuture 出现原因

  • Future 用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8 之前若要设置回调一般会使用 guava 的 ListenableFuture,回调的引入又会导致臭名昭著的回调地狱。
  • CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。

CompletableFuture 原理

completableFutureClass.png

CompletableFuture 实现了两个接口(如上图所示):Future、CompletionStage。Future 表示异步计算的结果,CompletionStage 用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个 CompletionStage 触发的,随着当前步骤的完成,也可能会触发其他一系列 CompletionStage 的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage 接口正是定义了这样的能力,我们可以通过其提供的 thenAppy、thenCompose 等函数式编程方法来组合编排这些步骤。

异常处理

由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过 try\catch 捕获异常。

CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
	  log.info("任务1")
      return "我是任务1返回值";
}).exceptionally(e -> {
	// 通过 exceptionally 捕获异常,打印日志并返回默认值
      log.error("任务1异常:", e)
      return "";
});

使用时注意点

尽可能不要使用默认线程池

  1. 多线程需要编写的时候最需要清楚当前写的程序跑在哪一个线程上面
  2. 默认线程池也没有办法做到线程池的隔离

线程死锁问题

在父级任务和子级任务用同一个线程池即: 线程池循环引用会导致死锁.

ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
}, threadPool1);

当父任务同时来了 10 个, 这时候没有空闲线程而子任务因没有可执行线程被堵塞, 父任务就会一直等待.

最后

如果想一起交流请加我微信号: -xiaou- 加后发送 技术群, 拉你进一个无广告无推销的技术交流群.