专业编程教程与实战项目分享平台

网站首页 > 技术文章 正文

java juc forkjoin 并行流计算详解

ins518 2024-11-05 11:16:23 技术文章 8 ℃ 0 评论

Oracle的官方给出的定义是:Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

主要有两步:1、拆分 2、合并

它的模型大致是这样的:线程池中的每个线程都有自己的工作队列,当自己队列中的任务都完成以后,会从其它线程的工作队列中窃取一个任务执行,这样可以充分利用CPU资源。



import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 *  场景:求和 (1~1_000_000_000L)
 *  ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:
 *  RecursiveAction和RecursiveTask
 */
@Data
public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;
    private Long end;
    private Long splitValue = 100000L;


    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if( (end - start) < splitValue ){
            Long temp = 0L;
            for (Long i = start ; i <= end  ; i++ ){
                temp += i;
            }

            return temp;
        }else{
            // 求出中间值
            Long middle = (end + start) / 2 ;
            //任务拆分1
            ForkJoinDemo forkJoinLeft = new ForkJoinDemo(start ,middle );
            ForkJoinTask<Long> left =  forkJoinLeft.fork();
            //任务拆分2
            ForkJoinDemo forkJoinRight = new ForkJoinDemo( middle+1 ,end );
            ForkJoinTask<Long> right =  forkJoinRight.fork();
            //返回拆分后的结果
            return left.join() + right.join();
        }
    }

    /**
     *  使用普通循环去计算
     */
    public static void test1(){
        Long sum = 0L;
        Long startTime = System.currentTimeMillis();
        for (Long i=0L ; i<=1_000_000_000L ; i++){
            sum += i;
        }
        Long endTime = System.currentTimeMillis();
        System.out.println( "计算结果为=" +  sum + "使用时间为:"+(endTime-startTime) + "ms");
    }

    /**
     * 使用forkjoin 计算
     */
    public static void test2(){
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool();
            Long startTime = System.currentTimeMillis();
            ForkJoinDemo forkJoinDemo =  new ForkJoinDemo( 0L , 1_000_000_000L  );
            //submit 为有返回值的提交
            ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit( forkJoinDemo );
            Long sum = forkJoinTask.get() ;
            Long endTime = System.currentTimeMillis();
            System.out.println( "计算结果为=" +  sum + "使用时间为:"+(endTime-startTime) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            forkJoinPool.shutdown();
        }
    }

    /**
     * 使用并行流计算
     */
    public static void test3(){
        Long startTime = System.currentTimeMillis();
        Long sum = LongStream.rangeClosed(0,1_000_000_000).parallel().reduce(0,Long::sum);
        Long endTime = System.currentTimeMillis();
        System.out.println( "计算结果为=" +  sum + "使用时间为:"+(endTime-startTime) + "ms");
    }

    public static void main(String[] args) {
        //普通方式计算方法 5167ms
        test1();

        //采用forkjoin方式求和 4073ms
        test2();

        //采用并行流的方式去计算 125ms
        test3();
    }
}



核心是ForkJoinPool执行者实现了ForkJoinTask接口的实例。ForkJoinTask对象支持创建子任务来等待子任务完成。当一个任务正在等待另一个任务完成并且有待执行的任务时,executor就能够通过”偷取”任务,在内部的线程池里分发任务。

ForkJoinTask对象主要有两个重要的方法:

  • fork()方法允许ForkJoinTask任务异步执行
  • join()方法允许一个ForkJoinTask等待另一个ForkJoinTask执行完成。


使用fork / join框架应遵循一些指导原则:

  • 使用尽可能少的线程池 - 在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池
  • 使用合理的阈值将ForkJoingTask拆分为子任务,如上述代码的splitValue,有优化空间
  • 避免在 ForkJoingTask中出现阻塞的IO任务

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表