百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

Java线程池深度揭秘(java 线程池详解)

cac55 2024-09-20 12:42 33 浏览 0 评论

作为 Java 程序员,无论是技术面试、项目研发或者是学习框架源码,不彻底掌握 Java 多线程的知识,做不到心中有数,干啥都没底气,尤其是技术深究时往往略显发憷。

坐稳扶好,通过今天的分享,能让你轻松 get 如下几点。

1. Executor 框架家族简介;

2. 源码解读:线程池状态以及状态流转;

3. 源码解读:部分成员变量及方法;

4. 源码解读:任务提交submit方法背后;

5. 源码揭秘之后的反思;

6. 寄语。

Executor 家族简介

一图胜千言,脑中有图心不慌。

(一)Executor 接口。

public interface Executor {
    void execute(Runnable command);
}

Executor 是一个接口(主要用于定义规范),定义了 execute 方法,用于接收 Runnable 对象。

(二)ExecutorService 接口。

public interface ExecutorService extends Executor {
    // ... ...
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    // ... ...
}

ExecutorService 也是一个接口,继承了 Executor 接口,增加了更多方法,相当于扩展了 Executor 接口的功能,例如定义了 submit() 系列方法,支持任务执行后得到返回结果。

(三)AbstractExecutorService 抽象类。

public abstract class AbstractExecutorService implements ExecutorService {
    // ... ...
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    // ... ...
}

AbstractExecutorService 是一个抽象类,实现了 ExecutorService 接口中的部分方法,例如提供了任务提交的 submit 方法的默认实现,而 submit 方法最终会调用 execute 方法。

不过 AbstractExecutorService 并没有实现 execute 方法,相当于为子类留了个口子,让子类去灵活扩展(钩子函数)。

(四)ScheduledExecutorService 接口。

public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

ScheduledExecutorService 接口继承了 ExecutorService,增加定时调度的方法,使其成为一个可定时调度任务的接口,相当于扩展了 ExecutorService 的功能。

(五)ScheduledThreadPoolExecutor 类。

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    // ... ...
}

ScheduledThreadPoolExecutor 类继承自 ThreadPoolExecutor 类,并且实现了 ScheduledExecutorService 接口,变成一个可定时调度任务的线程池。

(六)ThreadPoolExecutor 类。

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ... ...
}

ThreadPoolExecutor 继承 AbstractExecutorService 抽象类,并实现了 execute 等一系列方法。

(七)Executors 类。

public class Executors {
    // ... ...
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }    
    // ... ...
}

研发人员可以通过 Executors 工厂类来创建线程池并返回一个ExecutorService 对象,而内部几乎全是对 ThreadPoolExecutor 的封装。


通过 Executor 的家族简单认识,应该能感觉到 ThreadPoolExecutor 类的重要性,所以接下来要重点对 ThreadPoolExecutor 类的源码进行剖析。

源码解读:线程池状态以及状态流转

上面注释截图来源于 ThreadPoolExecutor 的源码,别懵圈,仔细看差不多都能懂,能够看出线程池的五种状态以及对应的状态流转。


不知道你能看懂多少,看不懂也没关系,接下来把上面的注释用图呈现给大家。通过源码中的注释,能够勾勒出如下线程池的状态流转图(好的注释是多么的重要啊,感叹号!)。

源码解读:部分成员变量及方法

/**
 * ctl 是一个 AtomicInteger 类型的原子对象。
 * 其实设计很有意思:ctl 共包括 32 位(高 3 位表示"线程池状态",低 29 位表示"线程池中的线程数量")。
 * 个人感觉:线程池状态与线程数量合二为一,用一个变量来表示,来减少锁竞争,提高并发效率。
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** 表示线程池线程数的位数:32 - 3 = 29 位 */
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 表示最大线程容量(000,11111111111111111111111111111)*/
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits(运行状态保存在 int 值的高 3 位)
/** 111,00000000000000000000000000000 */
private static final int RUNNING    = -1 << COUNT_BITS;
/** 000,00000000000000000000000000000 */
private static final int SHUTDOWN   =  0 << COUNT_BITS;
/** 001,00000000000000000000000000000 */
private static final int STOP       =  1 << COUNT_BITS;
/** 010,00000000000000000000000000000 */
private static final int TIDYING    =  2 << COUNT_BITS;
/** 011,00000000000000000000000000000 */
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
/** 获取线程池的运行状态 */
private static int runStateOf(int c)     { return c & ~CAPACITY; }
/** 线程池内有效线程的数量 (workerCount) */
private static int workerCountOf(int c)  { return c & CAPACITY; }
/** 线程池的状态和线程的数量组装,成为 ctl */
private static int ctlOf(int rs, int wc) { return rs | wc; }

仔细去看上面的代码,注释已经很清晰啦。重点关注 ctl 变量,这个变量将线程池自身状态和线程数量,融合在这一个变量中,其中高 3 位表示线程池状态,低 29 位表示线程池中的线程数量,这样在多线程环境下更易保证线程池自身状态和线程数量的统一,不得不佩服源代码作者 Doug Lea,可谓是设计甚妙!

源码解读:任务提交 submit 方法背后

疑问?当调用 submit() 方法,把一个任务提交给线程池去处理的时候,线程池的处理过程是什么样的呢?

通过开篇对 Executor 的家族简介,能够看到 submit() 方法最终会调用 ThreadPoolExecutor 的 execute 方法,走进源码好好看看 execute 方法都做了啥?

重点关注源码中的注释(红框圈住部分),若看懂注释,提交任务时线程池对应的处理,也就懂了一半啦(感触:好的编码规范真的好重要,业务开发时,核心代码一定要有注释)。

若依然很懵逼,一图胜千言,那就继续上图吧。

了解上图的整体流程,再去看看源码就彻悟啦。

public void execute(Runnable command) {
    //【Step 0. 如果任务为空则抛出 NPE 异常】
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //【Step 1. 判断核心线程是否已满】
    // 1.1. 判断当前线程数是否已经达到核心线程数的限制
    if (workerCountOf(c) < corePoolSize) {
        // 1.2. 如果未达到核心线程数的限制,则会直接添加一个核心线程,并指定首次执行的任务,进行任务处理
        if (addWorker(command, true))
            return;
        // 1.3. 如果添加失败,则刷新线程池的状态和线程的数量对应的变量 ctl
        c = ctl.get();
    }
    //【Step 2. 判断阻塞队列是否已满】
    // 2.1. 检查线程池是否是运行状态,然后将任务添加到等待队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 2.2. 任务成功添加到等待队列,再次刷新 ctl
        int recheck = ctl.get();
        // 2.3. 添加任务到等待队列成功后,如果线程池不是运行状态,则将刚添加的任务从队列移除并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.4. 判断当前线程数量,如果线程数量为 0,则添加一个非核心线程,并且不指定首次执行任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //【Step 3. 判断最大线程数量是否已经达到】
    // 3.1. 添加非核心线程,指定首次执行任务,如果添加失败,执行异常策略
    else if (!addWorker(command, false))
        reject(command);
}

结合注释去读代码,应该都能搞懂。很显然 execute 方法中,多处都调用了 addWorker 方法,接下来简单剖析一下 addWorker 方法。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // ... ...
        for (;;) {
            // ... ...
            // 通过 CAS 自旋,增加线程数 +1,增加成功跳出双层循环,继续往下执行
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // ... ...
        }
    }
    // 到这儿,说明已经成功的将线程数 +1 了,但是真正的线程还没有被添加
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 添加线程,Worker 是继承了 AQS,实现了 Runnable 接口的包装类
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // ... ...
                // 添加新增的 Worker
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
                // ... ...
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动 Worker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

为了简明扼要,方法酌情进行了删减。addWorker 方法主要是通过双重 for 循环进行线程数 +1,然后创建 Worker,并进行添加到 HashSet<Worker> workers 列表中,然后调用 t.start() 启动 Worker。

那么接下来再一起看看 Worker 里面都做了啥?

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    // ... ...
    final Thread thread;
    Runnable firstTask;

    /**
     * 通过指定的 firstTask 任务创建 Worker 对象
     */
    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        // 通过当前 Worker 对象创建对应的线程对象 t,
        // 所以调用 t.start() 时最终会调用 Worker 的 run 方法
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        // run 方法最终会调用 ThreadPoolExecutor 的 runWorker 方法
        runWorker(this);
    }
    // ... ...
}

通过 Worker 的构造函数能够了解到,会通过创建的 Worker 对象去构建线程对象,当线程对象启动时最终会调用 runWorker 方法。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 取出需要执行的任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果 task 不是 null 或者去 workQueue 队列中取到待执行的任务不为 null
        while (task != null || (task = getTask()) != null) {
            // ... ...
            try {
                // 开始执行任务前的钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                    // ... ...
                } finally {
                    // 任务执行后的钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // ... ...
            }
        }
        completedAbruptly = false;
    } finally {
        // Worker 退出
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker 方法,首先会取出要执行的任务 task,如果为空则会调用 getTask 方法从任务队列中获取,然后调用任务对应的 run 方法进行执行,另外预置了 beforeExecute、afterExecute 两个钩子函数,让研发人员监控线程执行成为可能。

另外,线程池中的线程如何从队列中获取待执行的任务的呢?走进 getTask 方法看一看。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    // 这块体现了:线程池的线程是复用的,通过循环去获取队列中的任务去执行。
    for (;;) {
        int c = ctl.get();
        // ... ...
        int wc = workerCountOf(c);
        // allowCoreThreadTimeOut: 是否允许核心线程超时.
        // 如果设置为 false,那么线程池在达到 corePoolSize 个工作线程之前,不会让闲置的工作线程退出。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // ... ...
        try {
            // 从 workQueue 队列中取待执行的任务,根据 timed 来选择等待时间
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

为了便于理解,源码做了部分删减。重点关注从任务队列中获取待执行任务的对象的方法调用:workQueue.poll()、workQueue.take() ,前者是移除并返回队列中的头部元素,如果队列为空,则返回 null,而后者是移除并返回队列中的头部元素,如果队列为空,则阻塞。


烟未灭,酒过半 ... ... 源码探讨就谈到这儿... ...

源码揭秘之后的反思

(一)钩子函数的使用场景

场景一:

如上面自定义的 MyThreadPoolExecutor,可以让日志打印线程及线程数等等信息。意味着研发人员可以扩展 ThreadPoolExecutor,对钩子函数 beforeExecute、afterExecute 进行实现,进而可以知晓线程池内部的调度细节,可以有效进行监控,针对故障排查应该很有帮助。

场景二:

AbstractExecutorService 并没有实现 execute 方法,而是为子类 ThreadPoolExecutor 留了个口子,让子类去灵活扩展(钩子函数)。

仔细想想业务开发时,诸多的使用场景,何尝不是如此呢?

(二)线程池的 submit 方法与 execute 方法啥区别呢?

execute 方法,适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了。

而 submit() 方法,适用于需要关注返回值的场景,不过最终会调用 execute() 方法。

考虑到性能提升,如果不需要关注返回值,则建议直接调用 execute() 方法,因为那样会屏蔽很多中间调度。

(三)线程池状态与线程数量用一个 ctl 变量表示的好处?

线程池状态和线程数量合二为一,用一个原子变量来表示,来减少锁竞争,提高并发效率。

(四)清晰的注释是否有必要?

通过探秘源码,很多图都是根据源码注释勾勒出来的。可以看出清晰的注释,对于核心流程而言真的很重要,一定要养成良好的编码习惯,关键业务逻辑、核心流程,建议一定要写好注释,利人又利己,何乐而不为之。

(五)Executor 家族框架,若写基础框架时,是否有借鉴意义呢?

个人感觉很有借鉴意义,因为无论业务开发还是基础服务,总会看到类似模式框架的身影,总会有大牛模仿着造轮子,所以闲暇之余可以抽象一下。

寄语写最后

本次,主要对 Executor 家族进行了简单介绍,并着重对线程池背后的 ThreadPoolExecutor 类进行深度剖析,知其然知其所以然,希望对大家有帮助。

好了,本次就谈到这里,一起聊技术、谈业务、架构,少走弯路,不踩大坑。欢迎关注「一猿小讲」,会持续输出原创精彩分享,敬请期待!

推荐阅读:

Java程序跑的快,全要靠线程带

fastjson的这些坑,你误入了没?

Java 8 的这些特性,你知道吗?

相关推荐

微信新表情怎么更新哪里更新 微信新表情包安卓苹果更新方法一览

根据微信官方的消息,微信新增“裂开”等6个小表情。IT之家了解到,截止发稿时仍有很多用户反映没有收到小表情的更新,微信官方表示“不急,今夜都会有的”。根据微博网友的留言,这6个小表情的名称...

谷歌调整Android 15最低硬件要求:存储容量提升至32GB

IT之家4月15日消息,谷歌近期对Android系统的最低硬件要求进行了调整,其中最明显的变动的是Android15的存储容量要求从Android14的16GB提升至32...

微信8.0怎么更新安卓 微信8.0怎么更新不了 如何更新微信8.0安卓

微信8.0更新方法也是值得研究的,ios的话不用说,可以在商店直接升级,但是安卓的呢,是怎么更新的,在哪里可以更新到安卓版,下面就来介绍下安卓在哪更新。微信8.0更新方法一览更新内容一览:1、首屏页有...

谷歌升级安卓Files文件管理器,支持以ZIP格式压缩文件、文件夹

IT之家9月10日消息,科技媒体AndroidAuthority昨日(9月9日)发布博文,表示谷歌旗下Files应用将支持以ZIP格式压缩文件、文件夹。该媒体逆向编译最新的...

安卓 7.0,魅族 15 Plus 喜迎 Flyme 8 稳定版更新

IT之家9月4日消息据网友投递,魅族15Plus已获得Flyme8.0.5.0A稳定版更新推送。此次更新基于安卓7.0,更新包体积约284.7MB,更新内容包括游戏模式4.1、优...

一加7T+7T Pro「Android 10.0 稳定版」H2OS-全量包发布-可救砖

一加7T和一加7TPro官方终于全量包推送稳定版本了,对比之前的稳定版来说,修复了太多的BUG,其实小编在体验一加氢OS的感受下,并没感觉到什么BUG,系统还是非常稳定的,只不过太过简洁,习惯了MI...

大疆 OM 6 和 OM SE 智能手机云台曝光

IT之家9月21日消息,大疆OSMO官方宣布,将于9月22日21点发布“灵机随行”新品,根据此前爆料,预计为DJIOM6和OMSE手机云台。目前大疆DJIOM...

【性价比入门大耳台式组合】说说飞傲FT1+K11R2R

眼瞅着就要到了2024的Q4,大半年下来,才子伴乐谈写了不少分享,飞傲的份额可能是各个厂牌里面最高的,前三季度他们“按部就班”推出了一系列新品,远高于其他国内品牌。今天要说的主角,是他们近期推出的两款...

暑期畅享高质量音乐生活,创新科技818新品首发狂欢不停

炎热的八月已经过去了一半,各位小伙伴的暑期生活过的怎么样呢?是顶着火辣的太阳出门逛街,还是在家吹着空调过上清爽舒适的宅家生活,相信不少人都是选择后者,当然,整天躺在家里只是刷微博开黑聊天的话,久而久之...

唱机秘笈,让你的生活从双11开始,慢下来

双11马上就来啦!是不是已经在盖楼大战中感受到了狂欢前夕弥漫的紧张感?锋梭1元预定,最高抵扣200元的活动仍在继续,决战前夕锋梭要给你最后的法宝——一份唱机秘笈,让它帮你选出双11最适合你的黑胶唱机...

西门子smart200和西门子1200ModbusTCP通讯交流

我们上节课程基于博途V16讲了西门子1200PLC和1500PLC的ModbusTCP通讯,这节课程我们讲smart系列PLC和1200系列PLC的ModbusTCP通讯,作这个实验项目必须满足以下...

国产半桥驱动芯片SLM2110S试用记(s2104半桥驱动芯片)

IR公司的IR2110S,是一片非常成熟的半桥驱动芯片,大量应用于半桥或全桥开关电源或逆变电源上。我在300-3000W的逆变器上曾用过很多,性能一直不错,电路简单,驱动卡可以做得很小,且短路保护功能...

好声音爱好者的福音,创新天猫乐活季优惠强势来袭

作为好声音爱好者,在家追剧、看电影、玩游戏的时候,你一定对声音设备有很高的要求。或者遇到居家办公处理工作情况,流畅的沟通也十分重要。因此一套专业好用的声音设备显得尤为重要。但是功能齐全、性价比高的声音...

爬虫神器-亮数据,可以轻松解锁各种网站~

网络爬虫是一种常见的数据采集技术,与屏幕抓取不同,屏幕抓取只复制屏幕上显示的像素,网络爬虫提取的是底层的HTML代码,以及存储在数据库中的数据。一般使用抓包工具获取HTML,然后使用网页解析工具提取数...

Excel实用技巧:抓取网页实时数据(excel抓取网页部分数据)

Excel是一个强大的数据处理和分析工具,可以用于处理各种类型的数据。如果你需要在Excel中获取实时数据,那么本文将为你介绍如何利用Excel抓取网页实时数据。一、安装PowerQuery插件Po...

取消回复欢迎 发表评论: