📖JAVA线程池源码解析与使用shi

发布: 2020-12-04
热度: 40
趋势: 40
权重: 0
🎯

频繁的创建与销毁线程是非常浪费系统资源的行为,多线程编程中是必要考虑到线程复用,线程池就是实现线程复用的一种方式,看看JAVA的线程池如何让使用都有哪些关键参数

ThreadPoolExecutor

在 JAVA 中,线程池最核心的类是 java.uitl.concurrent.ThreadPoolExecutor 。

线程池的主要作用就是存放线程,当需要使用线程时,优先从池中取用一个空闲的线程。

线程池主要实现了线程的复用,无需频繁的创建和销毁线程。

因此,线程池也要具备动态扩容、超时销毁、排队等待等功能。

类图与实现

ThreadPoolExecutor 类图如下:

SC-255-tdpooljava1.png

  1. Executor 接口:顶层,仅定义了一个方法(用于执行任务)
  2. ExecutorService 接口:继承并拓展 Executor 接口,添加一些新的方法,如操控线程池生命周期的 shutDown()、shutDownNow()等,可异步跟踪执行任务生成返回值 Future 方法、submit()等
  3. AbstractExecutorService 抽象类:实现了 ExecutorService 中声明的方法,除了 execute(Runnable command)

构造函数

ThreadPoolExecutor 提供了 4 个构造函数,如下图:

SC-256-tdpooljava2.png

前三个构造函数,基本上是调用了第四个构造函数,通过构造函数基本上知道了数据结构和各个核心树形的意义。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory.
     *
     * @param corePoolSize 核心线程数量
     * @param maximumPoolSize 最大线程数量
     * @param keepAliveTime 非核心线程的最大存活时间,仅对超出核心线程之外生效
     * @param unit 用于keepAliveTime的时间单位,如秒、小时、天等
     * @param workQueue 等待执行的任务队列,提供了三种队列
     * @param threadFactory 线程工厂类,主要用来创建线程
     * @param handler 拒绝策略,线程和队列都满了的情况触发
     * @throws IllegalArgumentException 抛出参数异常
     * @throws NullPointerException 抛出空指针异常
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

工作原理

上文大致介绍了,线程池核心类的实现,本节主要深入学习下线程池的工作原理。

线程池状态

线程池的状态主要使用原子操作类 AtomicInteger ctl 来标识。

ctl 其中高 3 位表示线程池状态,低 29 位表示目前的线程数量。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits(高3位)
    // 高3位为1,可以接收新任务,处理队列中等待的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高3位为0,不再接收新任务,处理队列中等待的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高3位为001,不再接收新任务,不处理队列中等待的任务,尝试终端运行中任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 高3位为010,所有任务被终止,workerCount为0,此状态将调用terminated()方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高3位为100,terminated()方法调用完成后变成此状态
    private static final int TERMINATED =  3 << COUNT_BITS;

相关方法:

  • runStateOf(int c) :获取高 3 位保存的线程池状态
  • workerCountOf(int c):获取低 29 位的线程数量
  • ctlOf(int rs, int wc):rs 表示 runState,wc 表示 workerCount,打包合并成 ctl

初始化线程

线程池默认情况下是不进行线程的初始化的(当然提供了预创建的初始化方法)。

  • prestartCoreThread():初始化一个核心线程
  • prestartAllCoreThreads():初始化所有核心线程

线程初始化一般发生在提交一个任务的时候,集中在 execute()方法。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 如果当前线程小于核心线程,创建一个新线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 线程池是否在运行
        // 如果在运行,队列是否允许插入,插入成功,再次验证线程池是否运行
        // 如果不在运行,移除插入的任务,然后抛出拒绝策略
        // 如果在运行,没有线程了,创建一个新线程(注意是空线程)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 添加非核心线程失败,就直接拒绝
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker()

上文可知,创建线程主要调用了 addWorker()。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判断线程池的状态,如果大于或等SHUTDOWN,不处理提交的任务,直接返回
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            // 判断当前需要创建的线程是否为核心线程,入参core为true,且当前线程数小于corePoolSize,跳出循环,开始创建新线程
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // 自旋方法,goto语法
            }
        }
        // 创建线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        ThreadPoolExecutor.Worker w = null;
        try {
            w = new ThreadPoolExecutor.Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 获取线程池主锁
                // 工作线程通过Woker类实现,ReentrantLock锁保证线程安全
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加线程到workers中(线程池中)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 启动新建的线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

线程池底层

为什么 worker 可以承载线程?

SC-257-tdpooljava3.png

Worker 类继承了 AQS,并实现了 Runnable 接口。

它有两个重要的成员变量:firstTask 和 thread。

  • firstTask 用于保存第一次新建的任务
  • thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程

上文提到将 worker 添加到 workers 。

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

workers 是一个 hashSet。

所以,线程池底层的存储结构其实就是一个 HashSet。

任务队列策略

workQueue,它用来存放等待执行的任务。

workQueue 的类型为 BlockingQueue,通常可以取下面三种类型:

  1. ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小
  2. LinkedBlockingQueue:基于链表的先进先出队列,如创建没有指定大小,默认为 Integer.MAX_VALUE
  3. synchronousQueue:不会保存提交的任务,直接新建一个线程来执行新来的任务

任务拒绝策略

线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize。

如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  • AbortPolicy:丢弃任务,抛出 RejectedExecutionException 异常。
  • DiscardPolicy:丢弃任务,不抛出异常。
  • DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • CallerRunsPolicy:由调用线程处理该任务

使用示例

public static void main(String[] args) {
    // 完整构造
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,  new ArrayBlockingQueue<Runnable>(5));
    // 常见快速构造
    // 容量固定为1
    ExecutorService justOne = Executors.newSingleThreadExecutor();
    // 容量最大
    ExecutorService intMax =  Executors.newCachedThreadPool();
    // 指定容量
    ExecutorService inputNo = Executors.newFixedThreadPool(10);
    // 使用示例
    for(int i=0;i<15;i++){
        MyTask myTask = new MyTask(i);
        executor.execute(myTask);
        System.out.println("PoolSize:"+executor.getPoolSize()+",WaitQueue:"+ executor.getQueue().size()+",Down:"+executor.getCompletedTaskCount());
    }
    executor.shutdown();
}

线程池大小建议

需要根据任务的类型来配置线程池大小:

如果是 CPU 密集型任务,就需要尽量压榨 CPU,参考值可以设为 NCPU+1

如果是 IO 密集型任务,参考值可以设置为 2*NCPU

当然,这只是一个参考值。

小结

线程池本质是一个 hashSet。

多余的任务会放在阻塞队列中。

阻塞队列满了后,才会触发非核心线程的创建。

非核心线程只是临时过来打杂的,空闲了,自己关闭。

线程池提供了两个钩子(beforeExecute,afterExecute),继承线程池,在执行任务前后做一些事情。

线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)

当前文章暂无讨论,留下脚印吧!
大纲
  • ThreadPoolExecutor
    • 类图与实现
    • 构造函数
  • 工作原理
    • 线程池状态
    • 初始化线程
      • addWorker()
      • 线程池底层
    • 任务队列策略
    • 任务拒绝策略
  • 使用示例
  • 线程池大小建议
  • 小结
提交成功,请等待审核通过后全面展示!

发表评论

昵称
邮箱
链接
签名
评论

温馨提示:系统将通过浏览器临时记忆您曾经填写的个人信息且支持修改,评论提交后仅自己可见,内容需要经过审核后方可全面展示。

选择头像