文档

Java™教程
隐藏目录
Fork/Join
路径:Java基础类
课程:并发性
章节:高级并发对象
子章节:执行器

分叉/合并

fork/join框架是ExecutorService接口的一个实现,可以帮助你充分利用多个处理器。它专为可以递归地分解为较小部分的工作而设计。其目标是利用所有可用的处理能力来提高应用程序的性能。

与任何ExecutorService实现一样,fork/join框架将任务分配给线程池中的工作线程。fork/join框架之所以与众不同,是因为它使用了一种工作窃取算法。当工作线程没有要执行的任务时,它们可以从其他仍在忙碌的线程中窃取任务。

fork/join框架的核心是ForkJoinPool类,它是AbstractExecutorService类的扩展。ForkJoinPool实现了核心的工作窃取算法,可以执行ForkJoinTask进程。

基本用法

使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码应该类似于以下伪代码:

if (我的工作部分足够小)
  直接执行工作
else
  将我的工作分成两部分
  调用这两部分并等待结果

将这段代码封装在一个ForkJoinTask子类中,通常使用其更专门的类型之一,即RecursiveTask(可以返回结果)或RecursiveAction

当你的ForkJoinTask子类准备好后,创建代表所有要执行的工作的对象,并将其传递给ForkJoinPool实例的invoke()方法。

模糊以提高清晰度

为了帮助你理解fork/join框架的工作原理,考虑以下示例。假设你想要对图像进行模糊处理。原始的图像由整数数组表示,每个整数包含单个像素的颜色值。模糊的目标图像也由具有与源相同大小的整数数组表示。

通过逐个像素地处理源数组来执行模糊处理。每个像素与其周围的像素取平均值(红、绿和蓝色分量取平均),并将结果放入目标数组中。由于图像是一个大数组,这个过程可能需要很长时间。你可以利用fork/join框架在多处理器系统上实现并发处理。以下是一种可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // 处理窗口大小;应为奇数。
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // 计算平均值。
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // 重新组装目标像素。
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

现在你需要实现抽象的compute()方法,该方法可以直接执行模糊操作,也可以将其拆分为两个更小的任务。简单的数组长度阈值有助于确定是执行工作还是拆分工作。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

如果上述方法在RecursiveAction类的子类中,那么在ForkJoinPool中设置任务运行是很简单的,具体步骤如下:

  1. 创建一个代表所有工作的任务。

    // 源图像像素在src中
    // 目标图像像素在dst中
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
    
  2. 创建将运行任务的ForkJoinPool

    ForkJoinPool pool = new ForkJoinPool();
    
  3. 运行任务。

    pool.invoke(fb);
    

完整的源代码,包括一些额外的代码用于创建目标图像文件,请参阅ForkBlur示例。

标准实现

除了使用fork/join框架来实现在多处理器系统上并发执行自定义算法的任务(例如前一节中的ForkBlur.java示例),Java SE中还有一些通用的功能已经使用fork/join框架进行了实现。其中之一是在Java SE 8中引入的,被java.util.Arrays类用于其parallelSort()方法。这些方法类似于sort(),但是通过fork/join框架利用并发性。在多处理器系统上,对大型数组进行并行排序比顺序排序更快。然而,这些方法如何利用fork/join框架超出了Java教程的范围。有关此信息,请参阅Java API文档。

fork/join框架的另一个实现是在java.util.streams包中使用的,该包是计划在Java SE 8发布时的Lambda项目的一部分。有关更多信息,请参见Lambda表达式部分。


上一页: 线程池
下一页: 并发集合