forkJoin框架

简介

fork/join框架是java7提供的一个用于并行执行的任务框架,是一个把大人物分割成若干小任务,最终汇总每个小任务后得到大任务结果的框架

  • fork递归将任务分解为较小的独立子任务,知道它们足够简单以便执行任务
  • Join将所有子任务的结果递归地连接成单个结果,或者在返回void的任务的情况下,程序只是等待每个子任务执行完毕

比如计算 1+2+......+10000,可以分割成10个子任务,每个子任务对1000个数进行求和,最终汇总这10个子任务的结果。如下图所示:

Fork/Join的特性:

ForkJoinPool不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比ExecutorService更好

ForkJoinPool主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等;

ForkJoinPool 最适合的是计算密集型的任务。

ForkJoinPool的宗旨是使用少量的线程来处理大量的任务,而CPU密集型任务,当一个大任务分解成多个子任务后,多个线程获取到多个处理器的时间分片,可以并行的执行子任务。

fork/join框架是java并发工具包中一种可以将一个大任务拆分成小任务来异步执行的工具,jdk1.7引入

fork/join框架主要包含三个模块

  • 任务对象ForkJoinTask(包括RecursiveTask,RecursivesiveAction和CountedCompleter)
  • 执行fork/Join的任务线程forkJoinWorkerThread
  • 线程池forkJoinpool

这三者的关系是forkJoinPool可以通过池中的ForkJoinWorkerT想read来处理forkJoinTask任务

ForkJoinPool 只接收 ForkJoinTask 任务(在实际使用中,也可以接受runnable、Callable任务,但在真正运行时,也会把这些任务封装成ForkJoinTask类型的任务),RecursiveTask是ForkJoinTask的子类,是一个可递归执行的ForkJoinTask,RecursiveAction是一个无返回值的RecursiveTask,CountedCompleter在任务执行后会触发一个自定义的钩子函数

在实际运用中,我们一般都会继承 RecursiveTaskRecursiveActionCountedCompleter 来实现我们的业务需求,而不会直接继承 ForkJoinTask 类。

案例

使用Fork/Join框架首先要考虑的就是如何分割任务,和分割任务的粒度,这里我们考虑每个子任务最多执行两个数相加,

数组取最大值,拆分成俩俩进行比对


代码

package test2;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
//因为需要有integer的返回值所以集成RecursiveTask
public class FKDemo extends RecursiveTask<Integer> {
    int[] arr;
    FKDemo(int[] array){
        arr=array;
    }
    public static void main(String[] args) {
        //实现求最大值
        ForkJoinPool pool = new ForkJoinPool();
        FKDemo fkDemo = new FKDemo(new int[]{1,23,46,7,89,43,67});
        Integer invoke = pool.invoke(fkDemo);
        System.out.println(invoke);
    }

    @Override
    protected Integer compute() {
        //这个方法里应该判断数据的大小进行拆分,然后做实际的业务操作
        if (arr.length<=2){
            //如果小于1返回自己,如果大于返回较大的数
            if (arr.length==1){
                return arr[0];
            }else{
                return Math.max(arr[0],arr[1]);
            }
        }else {
            int mid=arr.length/2;
            FKDemo left = new FKDemo(Arrays.copyOf(arr, mid));
            FKDemo right =new FKDemo(Arrays.copyOfRange(arr, mid,arr.length));
            //把拆分出来的任务投入到forkJoinPool里
            invokeAll(left,right);
            return Math.max(left.join(),right.join());
        }
    }
} 

通过parallelism可以指定线程池的数量,默认是线程数-1(有超线程的cpu就是核心*2,就是线程数了)

思想

差异

首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,很显然这是不可行的,也是很不合理的!!

工作窃取算法

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

充分利用线程进行并行计算,减少了线程间的竞争

在某些情况下还是存在竞争,比如双端队列里只有一个任务时,该算法会消耗更多的系统资源,比如创建多线程和多个双端队列

具体思路如下

  • 每个线程都有自己的一个workQueue,该工作队列是一个双端队列
  • 队列支持三个功能,push,pop,poll
  • 划分的子任务调用fork时,都会被push到自己的队列中
  • 默认情况下,工作线程从自己的双端队列获取出任务执行
  • 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取

局限性

对于forkjoin框架而言,当一个任务正在等待她使用join框架操作创建的子任务时,执行这个任务的工作线程查找其他未被执行的任务,并开始执行这些未被执行的任务,线程充分利用它们的时间来提高应用程序的性能,为了实现这个目标,forkJoin框架操作来同步机制,如果使用了其他同步机制,则在同步操作时,工作线程就不能去执行其他任务了,比如,在forkJoin框架中让任务进行了休眠,那么,在睡眠内,正在执行这个线程的工作线程不会执行其他任务了

在forkJoin框架中,所拆分的任务不应该去执行IO操作,比如读写数据文件

任务不能抛出检查异常,必须通过毕业的代码来处理这些异常

执行流程

上图可以看出,forkJoinPool中的执行分俩种

  • 直接通过 FJP 提交的外部任务(external/submissions task),存放在 workQueues 的偶数槽位;
  • 通过内部 fork 分割的子任务(Worker task),存放在 workQueues 的奇数槽位。

Last modification:December 15, 2022
如果觉得我的文章对你有用,请随意赞赏