博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
史前最详细的:Java线程池核心流程解析
阅读量:3965 次
发布时间:2019-05-24

本文共 9198 字,大约阅读时间需要 30 分钟。

1. 前言

线程池是JAVA开发中最常使用的池化技术之一,可以减少线程资源的重复创建与销毁造成的开销。

2. 灵魂拷问:怎么做到线程重复利用?

很多同学会联想到连接池,理所当然的说:需要的时候从池中取出线程,执行完任务再放回去。

如何用代码实现呢?

此时就会发现,调用线程的start方法后,生命周期就不由父线程直接控制了。线程的run方法执行完成就销毁了,所谓的“取出”和“放回”只不过是想当然的操作。

这里先说答案:生产者消费者模型

3. ThreadPoolExecutor的实现

v2-cf29d51dadce2c71641276d686b4916e_720w.jpg

 

3.1 结构

首先看下ThreadPoolExecutor的继承结构

顶级接口是Executor,定义execute方法

ExecutorService添加了submit方法,支持返回future获取执行结果,以及线程池运行状态的相关方法

本文着重讲线程池的执行流程,因此将暂时忽略线程池的状态相关的代码,也建议新手看源码时从核心流程看起。

3.2 核心方法:execute()

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        // 判断是否小于核心线程数        if (workerCountOf(c) < corePoolSize) {            //添加worker,添加成功则退出            if (addWorker(command, true))                return;            c = ctl.get();        }        // 核心线程数已用完则放入队列        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            // 双重检查,避免入队完成后,所有线程已销毁,导致没有消费者消费当前任务            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        // 队列已满则开启非核心线程,达到最大线程数则使用拒绝策略        else if (!addWorker(command, false))            reject(command);    }

 

execute方法就是一个生产的过程,主要分为开启线程和入队

开启线程会传入command(即当前任务),开启的线程会立即消费该任务

入队的任务则会由Worker消费

主要关注corePoolSize,maximumPoolSize,queueSize(任务队列长度),workerCount(当前worker数量)这几个参数,可以总结为以下:

v2-b7c186b9e03865d3b4bbd1743e73d5d3_720w.jpg

3.2 消费者:Worker

v2-e0844adffc623a180eb5e4d1572272db_720w.jpg

 

Worker类实现Runnable接口,继承AQS,主要先关注thread和firstTask两个属性和run方法

Worker(Runnable firstTask) {    setState(-1);    this.firstTask = firstTask;    this.thread = getThreadFactory().newThread(this);}

 

从Worker的构造方法可以看出,thread就是线程池中真正消费任务的线程,创建时会传入this(即Worker对象),而Worker实现了Runnable,因此线程运行时就是执行了Worker的run方法。

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock();    boolean completedAbruptly = true;    try {        // 重点关注        while (task != null || (task = getTask()) != null) {            // ···            try {                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 重点关注                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                // 重点关注                task = null;                // ···            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}

 

再来看run方法,直接调用了ThreadPoolExecutor的runWorker方法,runWorker中有一个while循环,循环体执行了task.run方法

task首先会获取Worker中的firstTask属性,并将其置为null,因此firstTask只会执行一次,后续task将通过getTask方法获取。

因此Worker的工作流程可以概括为:消费完Worker的firstTask后,循环执行getTask获取任务并消费,获取不到task时,就退出循环,线程销毁。

此处便可以看出生产者消费者模型了。

private Runnable getTask() {    boolean timedOut = false;    for (;;) {        int c = ctl.get();        // ···        int wc = workerCountOf(c);        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                // 此处返回null,runWorker将退出循环                return null;            continue;        }        try {            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

 

runWorker方法退出循环的条件是getTask返回null。

观察getTask,只有同时满足以下情况时才会返回null

v2-bc217e38d3167e641d9335ae0661ebc5_720w.jpg

返回的task是通过workQueue.poll和workQueue.take得到的

两者执行时线程均会挂起,直至workQueue中有新的任务

不同之处在于poll方法阻塞keepAliveTime时间后会自动唤醒并返回null,此时timeOut置为true,即满足条件1

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        // ···        for (;;) {            int wc = workerCountOf(c);            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            if (compareAndIncrementWorkerCount(c))                break retry;            c = ctl.get();            // ···        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        w = new Worker(firstTask);        final Thread t = w.thread;        if (t != null) {            // ···            if (workerAdded) {                t.start();                workerStarted = true;            }        }    } finally {        // ···    }    return workerStarted;}

 

了解了Worker之后,再来看execute中调用的addWorker方法

方法有两个参数,firstTask即为Worker的firstTask,core则标记需要新增的是否是核心线程

retry循环与线程池状态相关,内层for循环则是重复尝试cas增加线程,若大于corePoolSize或者maximumPoolSize则新增失败,cas成功后,new一个Worker并start

3.3 总结

v2-91f972220b18aa41d6c0762ff8a54554_720w.jpg

回到最初的问题,线程是如何做到重复利用的?

并不存在取出线程使用完再归还的操作,线程启动后进入循环,主动获取任务执行,退出循环则线程销毁。

execute方法控制新增Worker和任务入队

附:手写简易线程池

public class MyThreadPool implements Executor {    private int corePoolSize;    private int maximumPoolSize;    private BlockingQueue
queue; //记录当前工作线程数 private AtomicInteger count; private long keepAliveTime; private RejectHandler rejectHandler; public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue
queue, long keepAliveTime, RejectHandler rejectHandler) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.queue = queue; this.keepAliveTime = keepAliveTime; this.rejectHandler = rejectHandler; count = new AtomicInteger(0); } @Override public void execute(Runnable task) { int ct = count.get(); //核心线程数未满,尝试增加核心线程 if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) { new Worker(task).start(); return; } //入队 if (queue.offer(task)) { return; } //队列已满,尝试增加非核心线程 if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) { new Worker(task).start(); return; } //已达最大线程数,拒绝 rejectHandler.reject(task); } class Worker extends Thread { Runnable firstTask; public Worker(Runnable firstTask) { this.firstTask = firstTask; } @Override public void run() { Runnable task = firstTask; firstTask = null; while (true) { try { //getTask会阻塞 if (task != null || (task = getTask()) != null) { task.run(); } else { //getTask超时才会进入,直接退出,线程销毁 break; } } catch (InterruptedException e) { e.printStackTrace(); } finally { //置空,否则不能getTask task = null; } } } } Runnable getTask() throws InterruptedException { //标记是否超时过 boolean timedOut = false; while (true) { int ct = count.get(); //超出核心线程数才进入超时逻辑,即使timeOut由于线程poll超时过一次变成true,执行到这里如果不超出corePoolSize,可以再次进入take分支 if (ct > corePoolSize) { //超出核心线程数 if (timedOut) { //已超时过,尝试减少工作线程数,失败会continue,然后重新比较corePoolSize,重试减少线程数 if (count.compareAndSet(ct, ct - 1)) { return null; } else { continue; } } Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (task == null) { //poll超时才进入 timedOut = true; continue; } return task; } else { //必然能获取到task return queue.take(); } } } public static interface RejectHandler { void reject(Runnable r); } public static void main(String[] args) { MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> { System.out.println(r + ": reject"); }); for (int i = 0; i < 3; i++) { final int x = i; new Thread(() -> { for (int j = 0; j < 5; j++) { final int y = j; pool.execute(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } LocalDateTime now = LocalDateTime.now(); System.out.println(String.format("线程i=%s, j=%s,执行结束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME))); }); } }).start(); } }}

 

 

 

有完整的Java初级,高级对应的学习路线和资料!专注于java开发。分享java基础、原理性知识、JavaWeb实战、spring全家桶、设计模式、分布式及面试资料、开源项目,助力开发者成长!

 

欢迎关注微信公众号:码邦主

 

转载地址:http://kvkki.baihongyu.com/

你可能感兴趣的文章
Linux 精萃
查看>>
sed 精萃
查看>>
awk 精萃
查看>>
awk 注释
查看>>
GROUPING SETS、ROLLUP、CUBE
查看>>
数据类型和变量
查看>>
表连接(JOIN)
查看>>
游标(Cursor)
查看>>
复合语句(compound statement)
查看>>
DB2 物化查询表
查看>>
IF 语句
查看>>
循环语句
查看>>
DB2 临时表
查看>>
ITERATE、LEAVE、GOTO和RETURN
查看>>
异常处理
查看>>
存储过程
查看>>
动态SQL(Dynamic SQL)
查看>>
在存储过程之间传递数据
查看>>
迁移存储过程
查看>>
GET DIAGNOSTIC 语句
查看>>