线程池源码


参考美团技术分享:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

一、线程池基本使用

1、三种线程池

//线程池只有单个线程,不管获取几次线程,都是同一个
ExecutorService threadPool = Executors.newSingleThreadExecutor();

//创建一个固定大小的线程池,即线程池包含指定数量的线程
ExecutorService threadPool = Executors.newFixedThreadPool(5);

//创建一个可伸缩的线程池,会根据cpu的性能调整线程池的大小
ExecutorService threadPool = Executors.newCachedThreadPool();

源码

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

2、七大参数

public ThreadPoolExecutor(int corePoolSize,//核心线程数
                          int maximumPoolSize,//最大线程
                          long keepAliveTime,//超过核心线程数的空闲线程最大存活时间
                          TimeUnit unit,//keepAliveTime时间单位
                          BlockingQueue<Runnable> workQueue,//阻塞队列
                          ThreadFactory threadFactory,//线程工厂
                          RejectedExecutionHandler handler//拒绝策略
                         ) {}

最大线程数大于核心线程数的部分必须在核心线程和阻塞队列满了之后才会创建

所以如果阻塞队列是无界队列(即最大容量是 Integer.MAX_VALUE

案例:

银行有5个业务办理窗口,1 和 2 是一直处于开启的状态,3 、4、5 是当 1、2 窗口处理不来的情况下(1,2 窗口都有人,且候客区已满,并且还有人进来),才开启的。

当 3、4、5 窗口经过一定时间没人办理业务时,会关闭。

候客区有 3 个座位。

当 5 个窗口全部处于开启状态,并且候客区已满座,此时再来人时必须有处理策略。

image-20210428195253767

(1) corePoolSize:核心线程池大小,即一直处于可用状态的线程数,类比窗口12
(2) maximumPoolSize:最大核心线程池大小,类比5个窗口。(大于核心线程数的部分必须在核心线程和阻塞队列满了之后才会创建)
(3) keepAliveTime:线程最大空闲时间(大于核心线程数的部分超时了没人调用就会释放)类比345一段时间无人办理,就关闭。
(4) unit:超时单位
(5) workQueue:阻塞队列,等待获取线程的程序,类比候客区,注意:需要设置队列最大容量
(6) threadFactory:线程工厂,用于创建线程(将runable包装成thread)
(7) handler:拒绝策略,类比窗口和候客区都满了,此时对来人的策略。
对照源码解析,可以得出以下结论:

1.如果线程池中线程数量 < 核心线程数,新建一个线程执行任务;

2.如果线程池中线程数量 >= 核心线程数,则将任务放入任务队列

3.如果线程池中线程数量 >= 核心线程数 且 < maxPoolSize,且任务队列满了,则创建新的线程;

4.如果线程池中线程数量 > 核心线程数,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的任务队列如果是无界队列,那么设置线程池最大数量是无效的;

5.如果线程池中的任务队列满了,而且线程数达到了maxPoolSize,并且没有空闲的线程可以执行新的任务,这时候再提交任务就会执行拒绝策略

3、四种拒绝策略

img

拒绝策略:
(1) 不处理,抛出异常
new ThreadPoolExecutor.AbortPolicy() // 银行满了,还有人进来,不处理这个人的,抛出异常
(2) 让请求的线程去执行,即主线程要获取线程,此时线程池已达到最大承载,让主线程本身去运行要获取的线程里的代码
new ThreadPoolExecutor.CallerRunsPolicy() // 哪来的去哪里!
(3) new ThreadPoolExecutor.DiscardPolicy() //队列满了,丢掉任务,不会抛出异常!
(4) new ThreadPoolExecutor.DiscardOldestPolicy() //阻塞队列满了,丢弃阻塞队列头部的任务(最早的),也不会抛出异常!

最大承载:最大核心线程数+阻塞队列,类比最多能接待几个人

第四种演示:

// 最大线程数 3,阻塞队列长度 2,最大承载5 ,启动6个线程,研究第6个线程处理方式
public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                                3,3,20, TimeUnit.SECONDS,
                                  new LinkedBlockingQueue<>(2),
                                Executors.defaultThreadFactory(),
                                  new ThreadPoolExecutor.DiscardOldestPolicy());

    try {
        for (int i = 1; i <= 6; i++) {
            final int temp = i;
            threadPool.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "---ok-编号" + temp);
            });
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        threadPool.shutdown();
    }
}
pool-1-thread-2---ok-编号2
pool-1-thread-1---ok-编号1
pool-1-thread-3---ok-编号3
pool-1-thread-1---ok-编号6
pool-1-thread-2---ok-编号5

多次运行结果:编号1、2、3顺序随机,编号5、6顺序随机,编号4从没出现

说明:该策略是丢弃阻塞队列头部的任务,并把新的任务添加到阻塞队列末尾。

4、submit和execute

submit 能获取到返回值,execute 不能

5、为什么不用Executors 创建线程

注:摘自阿里巴巴开发手册

image-20210428193609940

6、类继承关系

image-20230829160144484

  • Executor:只提供了一个执行任务的接口,用户无需关注如何创建线程,如何调度线程,只需要提供一个Runnable对象

  • ExecutorService:在执行任务的基础上,新增了提交任务、线程池生命周期的管控等接口

  • AbstractExecutorService:抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可

  • ThreadPoolExecutor:一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务

二、如何设置合理参数

1、corePoolSize

应先获取服务器的 CPU 核心数,通过以下代码

Runtime.getRuntime().availableProcessors();        //当前计算机CPU核数

之后根据线程任务属于 CPU 密集型,还是 IO 密集型进行配置

CPU 密集型任务

  • CPU 密集型:指任务存在大量的循环或计算,对 CPU 的压力较大,为避免大量的线程切换造成的性能损耗
最大线程数  =  CPU 核心数 + 1

加一是为了保证当线程由于操作系统或其他原因导致暂停时,剩下的线程的补上,保证 CPU 时钟周期不浪费

IO 密集型任务

IO 密集型:指任务存在大量 IO (如数据库操作),CPU 大部分时间在阻塞等待,故线程数应该大一些,所以该模型下使用多线程能大大提升性能

IO 密集型有两个常用的公式

参考公式1:

CPU 核数 * 2

大部分书中推荐配置

参考公式2:(阻塞系数在 0.8 ~ 0.9 之间)

CPU 核数 / (1 - 阻塞系数)

比如 8 核 CPU

8 / (1 - 0.9) = 80 个线程数

某大厂使用的公式

具体应用那种公式得看实际生产环境的情况,可在根据情况自行选择

定时任务

线程数量:如果是定时任务,并不考虑即时性,配置1-2个线程都是可以的

回收线程资源:若场景以天、周等等大时间为单位,需要将线程池中的线程都设置为非核心线程,不然会一直占用系统内存等等资源,其只是定期跑一跑而已,没有必要一直占用资源。

还有一种8020原则,不做介绍

2、最大线程数

原则上就是性能最高线程数,因为此时性能已经是最高,再设置比他大的线程数反而性能变低。极端情况下才会使用到最大线程数,正常情况下不应频繁出现超过核心线程数的创建。

核心线程数 + 1

3、阻塞队列

考虑到在实际应用中我们获取并发性的场景主要是两种:

  • (1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。
  • (2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。根据系统任务的峰值设置阻塞队列的容量,尽量不要让线程池动用核心线程池外的其他线程。

4、最大空闲时间

使用默认的 60 秒即可

5、拒绝策略

一般推荐采用 AbortPolicy 抛出异常即可,详细见上文的四种拒绝策略

6、线程工厂

可以自定义线程池中线程的创建方式:

  • 1.可以设置有意见的线程名,这样方便我们开发调试,问题日志查找及定位。
  • 2.可以设置守护线程。
  • 3.设置线程优先级
  • 4.处理未捕获的异常:
/**
 * 自定义线程工厂
 *
 * 线程工厂可以设置线程信息
 */
public class MyThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    /**
     * 构造函数传入我们想业务需要的线程名字threadName,方便发生异常是追溯
     * @param threadName
     */
    public MyThreadFactory(String threadName) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        if (threadName == null || threadName.isEmpty()){
            threadName = "pool";
        }
        namePrefix = threadName +
                poolNumber.getAndIncrement() +
                "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                namePrefix + threadNumber.getAndIncrement(),
                0);
        //守护线程
        if (t.isDaemon())
            t.setDaemon(true);
        //线程优先级
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        /**
         * 处理未捕捉的异常
         * 在执行一个任务时,线程可能会由于未捕获的异常而终止,默认处理是将异常打印到控制台。
         * 但这种处理方式有时并非你所想要的,存放如文件或者db会更合适。
         */
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("do something to handle uncaughtException");
            }
        });
        return t;
    }
}

三、线程池状态管理

1、线程池如何维护状态

线程池是如何管理自身的运行状态以及线程池中的线程数呢?

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起。

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。

// ctl:高三位表示线程池运行状态,低29位表示线程池线程运行数量
// 一个变量存储两个值的好处是不必费心思(比如加锁)去维护两个状态的一致性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 线程运行数量占用字节= 32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程运行数量最大值 = 536870911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;
// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;


// 以下参数 c 为 ctl.get()
// 获取线程池当前的运行状态(~:按位取反,即0变成1,1变成0。)
private static int runStateOf(int c){ 
    return c & ~CAPACITY; 
}

// 获取线程池当前运行的线程数量
private static int workerCountOf(int c){ 
    return c & CAPACITY; 
}

// 通过线程池状态和运行的线程数量获取ctl
private static int ctlOf(int rs, int wc){ 
    return rs | wc; 
}

-1的移位计算

-1 原码
1000 0000 0000 0000 0000 0000 0000 0001

-1 反码
1111 1111 1111 1111 1111 1111 1111 1110

-1 补码
1111 1111 1111 1111 1111 1111 1111 1111

(-1) << 29
1110 0000 0000 0000 0000 0000 0000 0000

2、线程池的状态流转

线程池本身是有状态的,以下是线程池的5种状态

运行状态 状态描述
RUNNING 线程池可以接收到新的任务提交,并且还可以正常处理阻塞队列中的任务。
SHUTDOWN 不再接收新的任务提交,线程池可以继续处理阻塞队列中的任务。
STOP 不再接收新的任务,同时还会丢弃阻塞队列中的既有任务;此外,它还会中断正在处理中的任务。
TIDYING 所有的任务都执行完毕后,(同时也涵盖了阻塞队列中的任务),当前线程池中的活动都线程数量降为0,将会调用terminated方法。
TERMINATED 线程池的终止状态,当terminated方法执行完毕后,线程池会进入该状态

线程池状态流转

状态转换 状态转换方式
RUNNING -> SHUTDOWN 当调用了线程池的shutdown方法时,或者当finalize方法被隐式调用后(该方法内部会调用shutdown方法)
RUNNING,SHUTDOWN -> STOP 当调用了线程池的shutdownNow方法时
SHUTDOWN -> TIDYING 在线程池与阻塞队列均变为空时
STOP -> TIDYING 在线程池变为空时
TIDYING->TERMINATED 在terminated方法被执行完毕时

图3 线程池生命周期

四、任务执行源码

1、execute执行流程图

image-20230831191403380

注意:在判断核心线程数时,就算有空闲线程也不会复用,会优先将核心线程数全都创建出来后,在丢到阻塞队列中被空闲线程获取

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理、线程管理。

  • 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。
  • 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

2、execute()源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 如果当前工作线程数小于核心线程数,在addWorker中创建工作线程并执行任务
    if (workerCountOf(c) < corePoolSize) {

        // 在工作线程小于核心线程数时,新的任务来了,就算有空闲线程,也不会复用,而是创建新线程,
        // 直到工作线程 = 核心线程数为止
        if (addWorker(command, true))
            return;
        // 添加到工作线程失败,证明其他线程的操作导致核心线程数满了,重置c
        c = ctl.get();
    }

    // 核心线程数已满(即核心线程全部创建完成)
    // 此时即使有空闲线程,新任务也会先进入阻塞队列
    // 线程池状态为运行中,并且任务能成功添加到阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {

        // 双重检查,因为从上次get()到进入此方法,线程池可能已成为SHUTDOWN状态
        int recheck = ctl.get();

        // 如果线程池不为运行中, 则从队列移除任务
        if (! isRunning(recheck) && remove(command))
            // 如果移除任务成功,触发拒绝任务
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 线程池处于执行状态,但工作线程数为 0,
            // 此时workQueue中还有待执行的任务(上面刚加入的任务)
            // 则新增一个addWorker,消费workqueue中的任务
            // 防止存在有任务但是没有线程执行的情况
            // 第一个参数为 null ,不指定工作线程首个执行的任务
            addWorker(null, false);
    }

    // 阻塞队列已满
    // 尝试增加工作线程执行command
    else if (!addWorker(command, false))
        // 如果当前线程池为SHUTDOWN状态或者线程池已饱和,拒绝任务
        reject(command);
}

3、新增工作线程addWorker()

检查是否可以根据当前池状态和给定的边界(core或maximum)添加新的工作线程。

如果是,则相应地调整工作线程的数量,如果可能的话,创建并启动一个新的工作线程,并将firstTask作为其第一个任务运行。

如果池已停止或有资格关闭,则此方法返回false。

如果线程工厂在被要求创建线程时失败,它还返回false。

如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常(通常是thread .start()中的OutOfMemoryError),我们将干净地回滚。

参数:firstTask,新线程应该首先运行的任务(如果没有则为空)。当线程少于corePoolSize时(在方法execute()中),或者当队列满时(在这种情况下,我们必须绕过队列),worker被创建为初始化的第一个任务来绕过队列。

private final ReentrantLock mainLock = new ReentrantLock();

/**
* 包含池中所有工作线程的设置。只在持有mainLock时访问。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 先做一个到底应不应该创建线程的判断
        // 这个判断条件可以简化为 (rs >= SHUTDOWN && 
        // (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
        //
        // 分为三种情况:1.rs > SHUTDOWN (线程池状态处于STOP、TIDYING、TERMINATED)
        //             2.rs >= SHUTDOWN && firstTask != null 
        //             3.rs >= SHUTDOWN && workQueue.isEmpty()
        // 在这三种情况下,都不会再创建新的线程
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 线程池中正在运行的线程的数量
            int wc = workerCountOf(c);
            // 判断线程是否已达上限
            // 如果添加corePoolSize中的线程,判断是否超过corePoolSize的上限
            // 如果添加maximumPoolSize中的线程,判断是否超过maximumPoolSize的上限
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 新增线程数,如果成功,则跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 线程池的状态是否等于最开始的状态
            if (runStateOf(c) != rs)
                // 不等于表示线程池有改变,需要重新执行一遍之前的操作 CAS
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {

        // 使用指定的线程工厂创建线程,并将传入的任务作为第一个执行的任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();

                    // 添加到线程池
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

4、内部类Worker

public class ThreadPoolExecutor extends AbstractExecutorService {    


    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        /** 工作线程 */
        final Thread thread;
        /** 线程首个执行的任务 */
        Runnable firstTask;
        /** 任务计数器,统计该线程执行了多少任务 */
        volatile long completedTasks;

        /**
         * 从ThreadFactory创建给定的线程和第一个任务
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** 将主运行循环委托给外部runWorker  */
        public void run() {
            runWorker(this);
        }

        // ...
    }


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {

                    // 空方法,可由开发者继承并实现,从而在任务执行前切入
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 空方法,可由开发者继承并实现,从而在任务执行后切入
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 任务计数器加一
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}

5、阻塞获取任务getTask()

任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。

线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:

图6 获取任务流程图

getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

public class ThreadPoolExecutor extends AbstractExecutorService {    

    /**
    * 执行阻塞或定时等待任务,取决于当前的配置设置,或者返回null,则这个工作者必须退出,因为:
    * 1.有超过maximumPoolSize的工作器(由于调用setMaximumPoolSize)。
    * 2. 存储池停止运行。
    * 3.池已关闭,队列为空。
    * 4. 该工作线程超时等待任务,超时的工作线程将被终止(即allowCoreThreadTimeOut || workerCount &gt;corePoolSize),如果队列不为空,则此worker不是池中的最后一个线程。
    */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果线程池状态>=SHUTDOWN并且工作队列为空 或 线程池状态>=STOP
            // 则返回null,让当前worker被销毁
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 当前线程是否允许超时销毁的标志
            // 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 从阻塞队列头部获取元素,根据是否允许超时,调用不同方法
                // 如果是工作线程数>核心线程数,则当前线程定义为非核心线程,使用超市等待poll(...)
                // 当超时之后,线程执行完成,销毁
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}

6、任务执行前后切入

public class ThreadPoolExecutor extends AbstractExecutorService {

    /**
    * 方法在给定线程中执行给定的Runnable之前调用。
    * 此方法由执行任务r的线程t调用,并可用于重新初始化ThreadLocals或执行日志记录。
    *
    * 这个实现不做任何事情,但可以在子类中自定义。注意:要正确嵌套多个覆盖,
    * 子类通常应该先调用超类的实现,即在此方法的末尾执行超类的beforeExecute。
    */
    protected void beforeExecute(Thread t, Runnable r) { }

    /**
    * 方法在给定Runnable执行完成时调用。此方法由执行任务的线程调用。
    * 如果非空,则Throwable是未捕获的RuntimeException或Error,导致执行突然终止。
    *
    * 这个实现不做任何事情,但可以在子类中自定义。注意:要正确嵌套多个覆盖,
    * 子类通常应该调用超类。即在此方法的开头执行超类的afterExecute。
    *
    * 注意:当操作显式地或通过submit等方法包含在任务(如FutureTask)中时,
    * 这些任务对象捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给此方法。
    * 如果你想在这个方法中捕获这两种类型的失败,你可以进一步探测这种情况,
    * 就像在这个示例子类中,如果任务被终止,它要么打印直接原因,要么打印底层异常:
    */
    protected void afterExecute(Runnable r, Throwable t) { }

    /**
    * 线程池销毁扩展
    */ 
    protected void terminated() { }
}

五、动态线程池

1、修改线程池参数

JDK原生线程池ThreadPoolExecutor提供了如下几个public的setter方法

// 设置核心线程数
public void setCorePoolSize(int corePoolSize){}

// 设置最大线程数
public void setMaximumPoolSize(int maximumPoolSize){}

// 设置线程最大空闲时间
public void setKeepAliveTime(long time, TimeUnit unit){}

// 设置拒绝策略
public void setRejectedExecutionHandler(RejectedExecutionHandler handler){}

// 设置线程工厂
public void setThreadFactory(ThreadFactory threadFactory){}

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:

图20 setCorePoolSize方法执行流程

2、获取线程池信息

// 当前线程池中的线程数
public int getPoolSize(){}

// 线程池中曾经同时存在的最大线程数
public int getLargestPoolSize() {}

// 当前活动线程数(正在执行任务)
public int getActiveCount() {}

// 返回提交的任务总数 = 已完成任务数 + 工作线程数 + 阻塞队列长度
public long getTaskCount() {}

// 返回已成功执行的任务数
public long getCompletedTaskCount() {}

// 获取阻塞队列
public BlockingQueue<Runnable> getQueue(){}

// 返回核心线程数
public int getCorePoolSize() {}

// 最大线程数
public int getMaximumPoolSize() {}

// 最大空闲时间
public long getKeepAliveTime(TimeUnit unit) {}

// 拒绝策略
public RejectedExecutionHandler getRejectedExecutionHandler(){}

// 线程工厂
public ThreadFactory getThreadFactory() {}

3、整体设计

动态化线程池的核心设计包括以下三个方面:

  1. 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:(1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。(2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求
  2. 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。
  3. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助了解线程池状态。

图17 动态化线程池整体设计

4、美团动态线程池

官方文档:https://dynamictp.cn/guide/introduction/background.html

阻塞队列只有 VariableLinkedBlockingQueue 类型可以修改capacity,该类型功能和LinkedBlockingQueue相似,只是capacity不是final类型,可以修改, VariableLinkedBlockingQueue参考RabbitMq的实现


  目录