掘金 后端 ( ) • 2024-04-18 10:56

Java的Fork/Join框架是一种用于并行执行任务的框架,它基于“分而治之”的原则。在这个模型中,一个大任务被分割(fork)成若干个小任务,如果任务足够小,就直接执行并返回结果;否则,继续分割。最后,将小任务的结果合并(join)成大任务的结果。Fork/Join框架主要设计用来进行数据密集型的并行计算,利用多核处理器的优势来提高性能。

核心概念

  • ForkJoinPoolFork/Join框架的线程池,用于执行ForkJoinTask任务。
  • ForkJoinTask:这是一个代表任务的类,有两个重要的子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask:用于有返回结果的任务。

工作原理

ForkJoinPool采用工作窃取(work-stealing)算法来分配任务。每个工作线程都有自己的双端队列,用来存放分配给自己的任务。当一个线程完成了自己队列中的所有任务时,它可以从其他线程的队列末尾“窃取”一个任务来执行,从而有效地利用处理器核心,减少空闲时间。

源码解析(简化)

ForkJoinTaskforkjoin方法是框架的核心。fork方法将任务提交给ForkJoinPool以异步执行。join方法则等待任务执行完成并获取结果。

public abstract class ForkJoinTask<V> implements Future<V> {
    // Forks the task into a pool.
    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread()).getPool().externalPush(this);
        return this;
    }
    
    // Joins the task, waiting for completion.
    public final V join() {
        if (doJoin() != NORMAL)
            return getExceptionalCompletion();
        return getRawResult();
    }
    // Other methods...
}

代码演示

下面是一个使用Fork/Join框架的例子,我们将使用RecursiveTask来计算一个数组的和。

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class SumArray extends RecursiveTask<Long> {
    private final long[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 10000; // 分割任务的阈值

    public SumArray(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int middle = start + (end - start) / 2;
            SumArray leftTask = new SumArray(array, start, middle);
            SumArray rightTask = new SumArray(array, middle, end);
            leftTask.fork(); // 异步执行左侧任务
            long rightResult = rightTask.compute(); // 同步执行右侧任务
            long leftResult = leftTask.join(); // 等待左侧任务完成,并获取其结果
            return leftResult + rightResult;
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        long[] array = new long[20000];
        // 初始化数组...
        SumArray task = new SumArray(array, 0, array.length);
        Long result = pool.invoke(task);
        System.out.println("Sum: " + result);
    }
}

这个例子中,SumArray任务会递归地将数组分成更小的部分,直到部分的大小不超过阈值THRESHOLD,然后直接计算这些小部分的和。通过这种方式,Fork/Join框架能够利用多核处理器并行地计算出整个数组的和。

注意事项

  • 合理的任务分割:阈值(如上面示例中的THRESHOLD)的设定非常关键,它决定了任务是继续分割还是直接执行。不合理的阈值会影响性能。
  • 避免任务过度分割:过度分割会导致大量的任务创建和调度开销,可能会降低性能。
  • 处理异常Fork/Join框架中的异常处理需要特别注意,因为子任务的异常需要被显式捕获和处理。

总结,Fork/Join框架是一个强大的工具,用于提高多核处理器上数据密集型任务的并行处理能力。正确地使用它可以显著提高应用程序的性能,但也需要注意任务分割的策略和异常处理。