掘金 后端 ( ) • 2024-04-25 14:04

多线程应用&异步任务编排执行

初识多线程

在Java中,多线程的使用场景是比较丰富的,包括Web服务器,ORM框架等,我们每天都是使用的一些框架和组件上都存在多线程的身影,一般使用多线程都会使用线程连接池进行使用,而并非直接通过new Thread、implements Runable重写run()、implements Callable重写call(),从而保证了线程的复用,减少了不必要的资源开销。

创建线程连接池的方式

常用的创建连接池的方式有如下三种:

Executors工厂方法创建

    /**
     * 使用 Executors 工厂方法创建:
     *
     * Executors 类提供了一些方便的工厂方法来创建不同类型的线程池。
     */
    //ExecutorService threadPool = Executors.newFixedThreadPool(int nThreads);
    //ExecutorService threadPool = Executors.newSingleThreadExecutor();
    //ExecutorService threadPool = Executors.newCachedThreadPool();
    //ExecutorService threadPool = Executors.newScheduledThreadPool(int corePoolSize);

针对各创建方式的具体含义可以查看Executors详解

ThreadPoolExecutor 构造函数创建

    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            5, // 核心线程数
            10, // 最大线程数
            1, // 空闲线程存活时间为1秒
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100), // 任务队列大小为100
            Executors.defaultThreadFactory(), // 使用默认线程工厂
            new ThreadPoolExecutor.AbortPolicy() // 拒绝策略为中止策略
    );

针对ThreadPoolExecutor构造方法的更多信息可以查看Executors

ScheduledThreadPoolExecutor

         ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
         //scheduleAtFixedRate 固定频率执行任务 周期起点为任务开始
         scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
             try {
                 TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("scheduleAtFixedRate 周期起点为任务开始");
             //初始延迟:1s  周期:1s
         },1,1, TimeUnit.SECONDS);
 ​
         //scheduledWithFixedDelay 固定延迟执行任务,周期起点为任务结束
         scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
             try {
                 TimeUnit.SECONDS.sleep(1);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println("scheduledWithFixedDelay 周期起点为任务结束 ");
             //初始延迟:1s  周期:1s
         },1,1, TimeUnit.SECONDS);


可以看出,上述这个是针对定时执行的线程而定制的线程池。

scheduleAtFixedRate 以任务开始为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过0.5s又开始执行任务

scheduledWithFixedDelay 以任务结束为周期起点,比如说一个任务执行要0.5s,每隔1s执行,相当于执行完任务过1s才开始执行任务

此处推荐阅读Executor详解

并行执行&编排执行

在项目开发中,后端服务对外提供API接口一般都会关注响应时长。但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最终的结果再返回给调用方,这种情况下,往往会导致我们的接口响应特别的慢。

定义一些任务

package cn.org.xiaosheng.service;

/**
 * @author XiaoSheng
 * @date 2024-04-24
 * @dec 任务
 */
public class MyService {

    public Integer getThreeData() {
        // 模拟调用第三方接口耗时
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 1;
    }

    public Integer doServic() {
        // 模拟调用第三方接口耗时
        try {
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 3;
    }

    public Integer doServic2() {
        // 模拟调用第三方接口耗时
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 5;
    }


}

普通串行方式

    /**
     * 直接执行各任务(串行方式), 不采用异步方式
     */
    public void directCall() {
        MyService myService = new MyService();
        System.out.println("直接执行任务前: " + System.currentTimeMillis());
        Integer threeData = myService.getThreeData();
        Integer integer = myService.doServic();
        Integer integer1 = myService.doServic2();
        Integer integer2 = Stream.of(threeData, integer, integer).filter(Objects::nonNull).min(Integer::compareTo).get();
        System.out.println("直接执行完成后: " + System.currentTimeMillis() + "返回值为:" + integer2);

    }
    public static void main(String[] args) {
        MyThreadTest myThreadTest = new MyThreadTest();
        //myThreadTest.directCall();  耗时大概: 6s
    }
直接执行任务前: 1714013137428
直接执行完成后: 1714013143490返回值为:1

执行完成,通过查看响应时间,发现耗时大概是6s

线程池submit提交异步任务

    /**
     * 通过多线程进行任务分解
     */
    public void threadPoolTest1() {
        System.out.println("多线程执行任务开始前: " + System.currentTimeMillis());
        ThreadPoolExecutor threadPool = new MyThreadPoolExecutor().getThreadPool();
        MyService myService = new MyService();
        Future<Integer> submit = threadPool.submit(() -> {
            return myService.getThreeData();
        });

        Future<Integer> submit1 = threadPool.submit(() -> {
            return myService.doServic2();
        });

        Future<Integer> submit2 = threadPool.submit(() -> {
            return myService.doServic();
        });
        Integer integer = Stream.of(submit, submit1, submit2).map(intCode -> {
            try {
                return intCode.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return null;
        }).filter(Objects::nonNull).min(Integer::compareTo).get();
        // 注意: Future需要调用get进行线程阻塞获取值, 各线程才真正进行执行, 否则,按照Future机制不会执行相关业务操作。
        System.out.println("多线程执行任务结束后: " + System.currentTimeMillis() + "返回值为: " + 1);
    }

这里需要注意,因为Future它的阻塞机制,也就是说后续对其异步执行的响应结果存在get()操作阻塞取值,提交的异步任务才会被执行返回,若没有进行阻塞,主线程会直接往下进行执行,不会被异步线程中的任务阻塞。

    public static void main(String[] args) {
        MyThreadTest myThreadTest = new MyThreadTest();
        //myThreadTest.directCall();  // 耗时大概: 6s
        myThreadTest.threadPoolTest1(); // 耗时大概: 3s 取决于耗时最大的查询操作了
    }

程序执行结果:

多线程执行任务开始前: 1714013424582
多线程执行任务结束后: 1714013427646返回值为: 1

CompletableFuture

    public Integer completableFuture() {
        System.out.println("使用CompletableFuture方法前: " + System.currentTimeMillis());
        MyService myService = new MyService();
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> myService.doServic());
        CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> myService.doServic2());
        CompletableFuture<Integer> integerCompletableFuture2 = CompletableFuture.supplyAsync(() -> myService.getThreeData());
        Integer integer =  Stream.of(integerCompletableFuture, integerCompletableFuture1, integerCompletableFuture2)
                .map(CompletableFuture::join).sorted(Integer::compareTo).findFirst().get();
        System.out.println("使用CompletableFuture方法后: " + System.currentTimeMillis() + "返回值为: " + integer);
        return integer;
    }
    public static void main(String[] args) {
        MyThreadTest myThreadTest = new MyThreadTest();
        //myThreadTest.directCall();  // 耗时大概: 6s
        //myThreadTest.threadPoolTest1(); // 耗时大概: 3s 取决于耗时最大的查询操作了
        myThreadTest.completableFuture();
    }

执行结果:

使用CompletableFuture方法前: 1714014403127
使用CompletableFuture方法后: 1714014406191返回值为: 1

最后看结果大概也是3s左右。

也就是说异步执行,确实能够大大的降低接口响应时间,尤其是对那种耗时长,步骤多的接口,效果提升尤为明显,且响应时间取决于最大耗时步骤所需执行时间。

而如果我们想要动手进行优化的时候呢,就会涉及到串行处理改并行处理的问题。在JAVA中并行处理的能力支持已经相对完善,通过对CompletableFuture的合理利用,可以让我们面对这种聚合类处理的场景会更加的得心应手。

CompletableFuture深入了解

CompletableFurure基本信息

JAVA8之后加入的新成员,CompletableFuture的实现与使用上,也处处体现出了函数式异步编程的味道。一个CompletableFuture对象可以被一个环节接一个环节的处理、也可以对两个或者多个CompletableFuture进行组合处理或者等待结果完成。通过对CompletableFuture各种方法的合理使用与组合搭配,可以让我们在很多的场景都可以应付自如。

CompletableFuture 用于表示一个还未完成的异步计算。它提供了一种简单的机制来编写异步代码。

定义: CompletableFuture 可以用来表示将来某一时刻会完成的任务,可以通过 get 方法获取结果,如果任务完成,get 会立即返回结果,如果任务尚未完成,get 会阻塞直到任务完成。

API:

  • completableFuture.complete(result): 手动设置成功完成的结果,并通知等待的线程。

  • completableFuture.completeExceptionally(ex): 手动设置失败的异常,并通知等待的线程。

  • completableFuture.get(): 阻塞当前线程直到计算完成并返回结果。

  • completableFuture.get(long timeout, TimeUnit unit): 阻塞当前线程直到计算完成、超时或抛出异常。

  • completableFuture.join(): 等同于 get(),但不抛出异常,而是返回 null

  • CompletableFuture.runAsync(Runnable task): 异步地执行任务,使用默认的 ForkJoinPool。

  • CompletableFuture.supplyAsync(Supplier<U> supplier): 异步地执行返回结果的任务,使用默认的 ForkJoinPool。

  • completableFuture.thenApply(Function<T,U> fn): 当前 CompletableFuture 完成后,应用该函数返回一个新的 CompletableFuture。

  • completableFuture.thenAccept(Consumer<T> action): 当前 CompletableFuture 完成后,执行该动作,无返回值。

  • completableFuture.thenRun(Runnable action): 当前 CompletableFuture 完成后,执行该操作,无参数和返回值。

  • completableFuture.exceptionally(Function<Throwable,T> fn): 如果计算过程中发生异常,则应用该函数来处理异常并返回一个新的结果。

  • completableFuture.handle(BiFunction<T,Throwable,U> fn): 处理计算的结果或异常,根据计算是否成功来应用不同的函数。

  • completableFuture.whenComplete(BiConsumer<T,Throwable> action): 无论正常或异常情况,都执行该操作。

  • completableFuture.cancel(boolean mayInterruptIfRunning): 尝试取消此任务的执行,如果已经开始,则根据给定的参数可中断执行。

示例代码

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
 
public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个 CompletableFuture 实例
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            // 异步执行的任务
            System.out.println("异步任务执行中...");
        });
 
        // 当异步任务完成时打印结果
        future.thenRun(() -> System.out.println("异步任务完成"));
 
        try {
            // 等待异步任务完成并获取结果
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

Future与CompletableFuture

Future在应对一些简单且相互独立的异步执行场景很便捷,但是在一些复杂的场景,比如同时需要多个有依赖关系的异步独立处理的时候,或者是一些类似流水线的异步处理场景时,就显得力不从心了。比如:

  • 同时执行多个并行任务,等待最快的一个完成之后就可以继续往后处理
  • 多个异步任务,每个异步任务都需要依赖前一个异步任务执行的结果再去执行下一个异步任务,最后只需要一个最终的结果
  • 等待多个异步任务全部执行完成后触发下一个动作执行
  • ...

所以呢, 在JAVA8开始引入了全新的CompletableFuture类,它是Future接口的一个实现类。也就是在Future接口的基础上,额外封装提供了一些执行方法,用来解决Future使用场景中的一些不足,对流水线处理能力提供了支持。

当我们需要进行异步处理的时候,我们可以通过CompletableFuture.supplyAsync方法,传入一个具体的要执行的处理逻辑函数,这样就轻松的完成了CompletableFuture的创建与触发执行。

方法名称 作用描述 supplyAsync 静态方法,用于构建一个CompletableFuture<T>对象,并异步执行传入的函数,允许执行函数有返回值T。 runAsync 静态方法,用于构建一个CompletableFuture<Void>对象,并异步执行传入函数,与supplyAsync的区别在于此方法传入的是Callable类型,仅执行,没有返回值
public void testCreateFuture(String product) {
    // supplyAsync, 执行逻辑有返回值PriceResult
    CompletableFuture<PriceResult> supplyAsyncResult =
            CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product));
    // runAsync, 执行逻辑没有返回值
    CompletableFuture<Void> runAsyncResult =
            CompletableFuture.runAsync(() -> System.out.println(product));
}

特别补充:

supplyAsync或者runAsync创建后便会立即执行,无需手动调用触发。

任务编排

在流水线处理场景中,往往都是一个任务环节处理完成后,下一个任务环节接着上一环节处理结果继续处理。CompletableFuture用于这种流水线环节驱动类的方法有很多,相互之间主要是在返回值或者给到下一环节的入参上有些许差异,使用时需要注意区分:

image-20240425112046123

具体的方法的描述归纳如下:

方法名称 作用描述 thenApply 对CompletableFuture的执行后的具体结果进行追加处理,并将当前的CompletableFuture泛型对象更改为处理后新的对象类型,返回当前CompletableFuture对象。 thenCompose 与thenApply类似。区别点在于:此方法的入参函数返回一个CompletableFuture类型对象。 thenAccept 与thenApply方法类似,区别点在于thenAccept返回void类型,没有具体结果输出,适合无需返回值的场景。 thenRun 与thenAccept类似,区别点在于thenAccept可以将前面CompletableFuture执行的实际结果作为入参进行传入并使用,但是thenRun方法没有任何入参,只能执行一个Runnable函数,并且返回void类型

因为上述thenApplythenCompose方法的输出仍然都是一个CompletableFuture对象,所以各个方法是可以一环接一环的进行调用,形成流水线式的处理逻辑:

执行异常处理

期望总是美好的,但是实际情况却总不尽如人意。在我们编排流水线的时候,如果某一个环节执行抛出异常了,会导致整个流水线后续的环节就没法再继续下去了,比如下面的例子:

public void testExceptionHandle() {
    CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("supplyAsync excetion occurred...");
    }).thenApply(obj -> {
        System.out.println("thenApply executed...");
        return obj;
    }).join();
}

执行之后会发现,supplyAsync抛出异常后,后面的thenApply并没有被执行。

那如果我们想要让流水线的每个环节处理失败之后都能让流水线继续往下面环节处理,让后续环节可以拿到前面环节的结果或者是抛出的异常并进行对应的应对处理,就需要用到handlewhenCompletable方法了。

先看下两个方法的作用描述:

方法名称 作用描述 handle 与thenApply类似,区别点在于handle执行函数的入参有两个,一个是CompletableFuture执行的实际结果,一个是是Throwable对象,这样如果前面执行出现异常的时候,可以通过handle获取到异常并进行处理。 whenComplete 与handle类似,区别点在于whenComplete执行后无返回值

我们对上面一段代码示例修改使用handle方法来处理:

    /**
     * 测试异常
     */
    public void completableFutureExection() {
        CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("supplyAsync excetion occurred...");
        }).handle((obj, e) -> {
            if (e != null) {
                System.out.println("thenApply executed, exception occurred...");
            }
            return obj;
        }).join();
    }

再执行可以发现,即使前面环节出现异常,后面环节也可以继续处理,且可以拿到前一环节抛出的异常信息:

thenApply executed, exception occurred...

CompletableFuture组合

前面一直在介绍流水线式的处理场景,但是很多时候,流水线处理场景也不会是一个链路顺序往下走的情况,很多时候为了提升并行效率,一些没有依赖的环节我们会让他们同时去执行,然后在某些环节需要依赖的时候,进行结果的依赖合并处理,类似如下图的效果。

image-20240425115534038

CompletableFuture相比于Future的一大优势,就是可以方便的实现多个并行环节的合并处理。相关涉及方法介绍归纳如下:

方法名称 作用描述 thenCombine 将两个CompletableFuture对象组合起来进行下一步处理,可以拿到两个执行结果,并传给自己的执行函数进行下一步处理,最后返回一个新的CompletableFuture对象。 thenAcceptBoth 与thenCombine类似,区别点在于thenAcceptBoth传入的执行函数没有返回值,即thenAcceptBoth返回值为CompletableFuture<Void>。 runAfterBoth 等待两个CompletableFuture都执行完成后再执行某个Runnable对象,再执行下一个的逻辑,类似thenRun。 applyToEither 两个CompletableFuture中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenApply。 acceptEither 两个CompletableFuture中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenAccept。 runAfterEither 等待两个CompletableFuture中任意一个执行完成后再执行某个Runnable对象,可以理解为thenRun的升级版,注意与runAfterBoth对比理解。 allOf 静态方法,阻塞等待所有给定的CompletableFuture执行结束后,返回一个CompletableFuture<Void>结果。 anyOf 静态方法,阻塞等待任意一个给定的CompletableFuture对象执行结束后,返回一个CompletableFuture<Void>结果。

结果等待与获取

在执行线程中将任务放到工作线程中进行处理的时候,执行线程与工作线程之间是异步执行的模式,如果执行线程需要获取到共工作线程的执行结果,则可以通过get或者join方法,阻塞等待并从CompletableFuture中获取对应的值。image-20240425115817347

getjoin的方法功能含义说明归纳如下:

方法名称 作用描述 get() 等待CompletableFuture执行完成并获取其具体执行结果,可能会抛出异常,需要代码调用的地方手动try...catch进行处理。 get(long, TimeUnit) 与get()相同,只是允许设定阻塞等待超时时间,如果等待超过设定时间,则会抛出异常终止阻塞等待。 join() 等待CompletableFuture执行完成并获取其具体执行结果,可能会抛出运行时异常,无需代码调用的地方手动try...catch进行处理。

从介绍上可以看出,两者的区别就在于是否需要调用方显式的进行try...catch处理逻辑,使用代码示例如下:

public void testGetAndJoin(String product) {
    // join无需显式try...catch...
    PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
            .join();
    
    try {
        // get显式try...catch...
        PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
                .get(5L, TimeUnit.SECONDS);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Aysn重载方法差异

我们在使用CompletableFuture的时候会发现,有很多的方法,都会同时有两个以Async命名结尾的方法版本。以前面我们用的比较多的thenCombine方法为例:

  1. thenCombine(CompletionStage, BiFunction)
  2. thenCombineAsync(CompletionStage, BiFunction)
  3. thenCombineAsync(CompletionStage, BiFunction, Executor)

从参数上看,区别并不大,仅第三个方法入参中多了线程池Executor对象。看下三个方法的源码实现,会发现其整体实现逻辑都是一致的,仅仅是使用线程池这个地方的逻辑有一点点的差异:

   public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

有兴趣的可以去翻一下此部分的源码实现,这里概括下三者的区别:

  1. thenCombine方法,沿用上一个执行任务所使用的线程池进行处理
  2. thenCombineAsync两个入参的方法,使用默认的ForkJoinPool线程池中的工作线程进行处理
  3. themCombineAsync三个入参的方法,支持自定义线程池并指定使用自定义线程池中的线程作为工作线程去处理待执行任务。

现在,我们知道了方法名称带有Async和不带Async的实现策略上的差异点就在于使用哪个线程池来执行而已。那么,对我们实际的指导意义是啥呢?实际使用的时候,我们怎么判断自己应该使用带Async结尾的方法、还是不带Async结尾的方法呢?

image-20240425132112320

上面是Async结尾方法默认使用的ForkJoinPool创建的逻辑,这里可以看出,默认的线程池中的工作线程数是CPU核数 - 1,并且指定了默认的丢弃策略等,这里就是核心的要点。

所以说,符合以下几个条件的时候,可以考虑使用带有Async后缀的方法,指定自定义线程池:

  • 默认线程池的线程数满足不了实际诉求
  • 默认线程池的类型不符合自己业务诉求
  • 默认线程池的队列满处理策略不满足自己诉求

并发和并行的区别

并发

所谓并发,其关注的点是服务器的吞吐量情况,也就是服务器可以在单位时间内同时处理多少个请求。并发是通过多线程的方式来实现的,充分利用当前CPU多核能力,同时使用多个进程去处理业务,使得同一个机器在相同时间内可以处理更多的请求,提升吞吐量。

image-20240425132558358

所有的操作在一个线程中串行推进,如果有多个线程同image-20240425132701232步处理,则同时有多个请求可以被处理。但是因为是串行处理,所以如果某个环节需要对外交互时,比如等待网络IO的操作,会使得当前线程处于阻塞状态,直到资源可用时被唤醒继续往后执行。

对于高并发场景,服务器的线程资源是非常宝贵的。如果频繁的处于阻塞则会导致浪费,且线程频繁的阻塞、唤醒切换动作,也会加剧整体系统的性能损耗。所以并发这种多线程场景,更适合CPU密集型的操作。

并行

所谓并行,就是将同一个处理流程没有相互依赖的部分放到多个线程中进行同时并行处理,以此来达到相对于串行模式更短的单流程处理耗时的效果,进而提升系统的整体响应时长吞吐量

image-20240425132811988

基于异步编程实现的并行操作也是借助线程池的方式,通过多线程同时执行来实现效率提升的。与并发的区别在于:并行通过将任务切分为一个个可独立处理的小任务块,然后基于系统调度策略,将需要执行的任务块分配给空闲可用工作线程去处理,如果出现需要等待的场景(比如IO请求)则工作线程会将此任务先放下,继续处理后续的任务,等之前的任务IO请求好了之后,系统重新分配可用的工作线程来处理

image-20240425132951459

根据上面的示意图介绍可以看出,异步并行编程,对于工作线程的利用率上升,不会出现工作线程阻塞的情况,但是因为任务拆分、工作线程间的切换调度等系统层面的开销也会随之加大。

前面介绍了下并发与并行两种模式的特点、以及各自的优缺点。所以选择采用并发还是并行方式来提升系统的处理性能,还需要结合实际项目场景来确定。

综合而言

  1. 如果业务处理逻辑是CPU密集型的操作,优先使用基于线程池实现并发处理方案(可以避免线程间切换导致的系统性能浪费)。
  2. 如果业务处理逻辑IO密集型的耗时场景、且相互之间没有依赖,比如本地IO操作、网络IO请求等等,这种情况优先选择使用并行处理策略(可以避免宝贵的线程资源被阻塞等待)。