JAVA多线程(二十二)Java多线程之WorkStealingPool工作窃取线程池

1.JAVA多线程(二十二)Java多线程之WorkStealingPool工作窃取线程池

1.1 工作窃取线程池WorkStealingPool

在Java 8中,引入了一种新型的线程池,作为newWorkStealingPool()来补充现有的线程池。WorkStealingPool线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。

顾名思义,它基于 工作窃取算法,其中任务可以生成其他较小的任务,这些任务将添加到并行处理线程的队列中。如果一个线程完成了工作并且无事可做,则可以从另一线程的队列中“窃取”工作。
通过源代码查看WorkStealingPool实现:

    /**
     * Creates a work-stealing thread pool using all
     * {@link Runtime#availableProcessors available processors}
     * as its target parallelism level.
     * @return the newly created thread pool
     * @see #newWorkStealingPool(int)
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    /**
     * Creates a thread pool that maintains enough threads to support
     * the given parallelism level, and may use multiple queues to
     * reduce contention. The parallelism level corresponds to the
     * maximum number of threads actively engaged in, or available to
     * engage in, task processing. The actual number of threads may
     * grow and shrink dynamically. A work-stealing pool makes no
     * guarantees about the order in which submitted tasks are
     * executed.
     *
     * @param parallelism the targeted parallelism level
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code parallelism <= 0}
     * @since 1.8
     */
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

在WorkStealingPool实现中:

  • parallelism => Runtime.getRuntime().availableProcessors() - 这是JVM可用的处理器数。
  • handler => ForkJoinPool.defaultForkJoinWorkerThreadFactory - 返回新线程的默认线程工厂。
  • asyncMode => true – 使其在aysnc模式下工作,并为分叉的任务设置FIFO顺序,这些任务永远不会从其工作队列中加入。

在这里插入图片描述

1.2 WorkStealingPool使用样例

  1. 单线程化线程池WorkStealingPool特点:
    • WorkStealingPool是守护线程,使用ForkJoinPool实现的WorkStealingPool根据当前操作系统的CPU有几个核就会创建几个线程。
    • 应用场景:WorkStealingPool能够合理的使用CPU进行对任务操作(并行操作)适合使用在很耗时的操作。
package com.yuanxw.chapter22;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * 使用WorkStealingPool工作窃取线程池,创建Callable类型的多线程,并最终返回结果
 * https://blog.csdn.net/yuan_xw/article/details/103730628
 */
public class WorkStealingPoolExample {

    public static void main(String[] args) throws InterruptedException {
        System.out.println("获得JAVA虚拟机可用的最大CPU处理器数量:" + Runtime.getRuntime().availableProcessors());
        ExecutorService executorService = Executors.newWorkStealingPool();
        /**
         * call方法存在返回值futureTask的get方法可以获取这个返回值。
         * 使用此种方法实现线程的好处是当你创建的任务的结果不是立即就要时,
         * 你可以提交一个线程在后台执行,而你的程序仍可以正常运行下去,
         * 在需要执行结果时使用futureTask去获取即可。
         */
        List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(i -> (Callable<String>) () -> {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(String.format("当前【%s】线程正在执行>>>", Thread.currentThread().getName()));
            return "callable type thread task:" + i;
        }).collect(Collectors.toList());

        // 执行给定的任务,返回持有他们的状态和结果的所有完成的期待列表。
        executorService.invokeAll(callableList).stream().map(futureTask-> {
            try {
                return futureTask.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return null;
        }).forEach(System.out::println);
    }
}

执行结果:

获得JAVA虚拟机可用的最大CPU处理器数量:8
当前【ForkJoinPool-1-worker-0】线程正在执行>>>
当前【ForkJoinPool-1-worker-6】线程正在执行>>>
当前【ForkJoinPool-1-worker-1】线程正在执行>>>
当前【ForkJoinPool-1-worker-3】线程正在执行>>>
当前【ForkJoinPool-1-worker-5】线程正在执行>>>
当前【ForkJoinPool-1-worker-7】线程正在执行>>>
当前【ForkJoinPool-1-worker-2】线程正在执行>>>
当前【ForkJoinPool-1-worker-4】线程正在执行>>>
当前【ForkJoinPool-1-worker-1】线程正在执行>>>
当前【ForkJoinPool-1-worker-3】线程正在执行>>>
当前【ForkJoinPool-1-worker-2】线程正在执行>>>
当前【ForkJoinPool-1-worker-5】线程正在执行>>>
当前【ForkJoinPool-1-worker-6】线程正在执行>>>
当前【ForkJoinPool-1-worker-0】线程正在执行>>>
当前【ForkJoinPool-1-worker-7】线程正在执行>>>
当前【ForkJoinPool-1-worker-4】线程正在执行>>>
当前【ForkJoinPool-1-worker-1】线程正在执行>>>
当前【ForkJoinPool-1-worker-2】线程正在执行>>>
当前【ForkJoinPool-1-worker-5】线程正在执行>>>
当前【ForkJoinPool-1-worker-3】线程正在执行>>>
callable type thread task:0
callable type thread task:1
callable type thread task:2
callable type thread task:3
callable type thread task:4
callable type thread task:5
callable type thread task:6
callable type thread task:7
callable type thread task:8
callable type thread task:9
callable type thread task:10
callable type thread task:11
callable type thread task:12
callable type thread task:13
callable type thread task:14
callable type thread task:15
callable type thread task:16
callable type thread task:17
callable type thread task:18
callable type thread task:19

– 以上为《JAVA多线程(二十二)Java多线程之WorkStealingPool工作窃取线程池》,如有不当之处请指出,我后续逐步完善更正,大家共同提高。谢谢大家对我的关注。

——厚积薄发(yuanxw)


版权声明:本文为yuan_xw原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。