10问10答:你真的了解线程池吗 线程是什么


10问10答:你真的了解线程池吗 线程是什么

文章插图
《Java开发手册》中强调 , 线程资源必须通过线程池提供 , 而创建线程池必须使用ThreadPoolExecutor 。手册主要强调利用线程池避免两个问题 , 一是线程过渡切换 , 二是避免请求过多时造成OOM 。但是如果参数配置错误 , 还是会引发上面的两个问题 。所以本节我们主要是讨论ThreadPoolExecutor的一些技术细节 , 并且给出几个常用的最佳实践建议 。
我在查找资料的过程中 , 发现有些问题存在争议 。后面发现 , 一部分原因是因为不同JDK版本的现实是有差异的 。因此 , 下面的分析是基于当下最常用的版本JDK1.8 , 并且对于存在争议的问题 , 我们分析源码 , 源码才是最准确的 。
1 corePoolSize=0会怎么样
这是一个争议点 。我发现大部分博文 , 不论是国内的还是国外的 , 都是这样回答这个问题的:
提交任务后 , 先判断当前池中线程数是否小于corePoolSize , 如果小于 , 则创建新线程执行这个任务 。否者 , 判断等待队列是否已满 , 如果没有满 , 则添加到等待队列 。否者 , 判断当前池中线程数是否大于maximumPoolSize , 如果大于则拒绝 。否者 , 创建一个新的线程执行这个任务 。按照上面的描述 , 如果corePoolSize=0 , 则会判断等待队列的容量 , 如果还有容量 , 则排队 , 并且不会创建新的线程 。
—— 但其实 , 这是老版本的实现方式 , 从1.6之后 , 实现方式就变了 。我们直接看execute的源码(submit也依赖它) , 我备注出了关键一行:
int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);// 注意这一行代码 , 添加到等待队列成功后 , 判断当前池内线程数是否为0 , 如果是则创建一个firstTask为null的worker , 这个worker会从等待队列中获取任务并执行 。else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);线程池提交任务后 , 首先判断当前池中线程数是否小于corePoolSize 。如果小于则尝试创建新的线程执行该任务;否则尝试添加到等待队列 。如果添加队列成功 , 判断当前池内线程数是否为0 , 如果是则创建一个firstTask为null的worker , 这个worker会从等待队列中获取任务并执行 。如果添加到等待队列失败 , 一般是队列已满 , 才会再尝试创建新的线程 。但在创建之前需要与maximumPoolSize比较 , 如果小于则创建成功 。否则执行拒绝策略 。答
上述问题需区分JDK版本 。在1.6版本之后 , 如果corePoolSize=0 , 提交任务时如果线程池为空 , 则会立即创建一个线程来执行任务(先排队再获取);如果提交任务的时候 , 线程池不为空 , 则先在等待队列中排队 , 只有队列满了才会创建新线程 。
所以 , 优化在于 , 在队列没有满的这段时间内 , 会有一个线程在消费提交的任务;1.6之前的实现是 , 必须等队列满了之后 , 才开始消费 。
2 线程池创建之后 , 会立即创建核心线程么
之前有人问过我这个问题 , 因为他发现应用中有些Bean创建了线程池 , 但是这个Bean一般情况下用不到 , 所以咨询我是否需要把这个线程池注释掉 , 以减少应用运行时的线程数(该应用运行时线程过多 。)

不会 。从上面的源码可以看出 , 在刚刚创建ThreadPoolExecutor的时候 , 线程并不会立即启动 , 而是要等到有任务提交时才会启动 , 除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程 。
prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.prestartAllCoreThreads:Starts all core threads.3 核心线程永远不会销毁么
这个问题有点tricky 。首先我们要明确一下概念 , 虽然在JavaDoc中也使用了“core/non-core threads”这样的描述 , 但其实这是一个动态的概念 , JDK并没有给一部分线程打上“core”的标记 , 做什么特殊化的处理 。这个问题我认为想要探讨的是闲置线程终结策略的问题 。
在JDK1.6之前 , 线程池会尽量保持corePoolSize个核心线程 , 即使这些线程闲置了很长时间 。这一点曾被开发者诟病 , 所以从JDK1.6开始 , 提供了方法allowsCoreThreadTimeOut , 如果传参为true , 则允许闲置的核心线程被终止 。
请注意这种策略和corePoolSize=0的区别 。我总结的区别是:
corePoolSize=0:在一般情况下只使用一个线程消费任务 , 只有当并发请求特别多、等待队列都满了之后 , 才开始用多线程 。allowsCoreThreadTimeOut=true && corePoolSize>1:在一般情况下就开始使用多线程(corePoolSize个) , 当并发请求特别多 , 等待队列都满了之后 , 继续加大线程数 。但是当请求没有的时候 , 允许核心线程也终止 。所以corePoolSize=0的效果 , 基本等同于allowsCoreThreadTimeOut=true && corePoolSize=1 , 但实现细节其实不同 。

在JDK1.6之后 , 如果allowsCoreThreadTimeOut=true , 核心线程也可以被终止 。
4 如何保证线程不被销毁
首先我们要明确一下线程池模型 。线程池有个内部类Worker , 它实现了Runnable接口 , 首先 , 它自己要run起来 。然后它会在合适的时候获取我们提交的Runnable任务 , 然后调用任务的run()接口 。一个Worker不终止的话可以不断执行任务 。
我们前面说的“线程池中的线程” , 其实就是Worker;等待队列中的元素 , 是我们提交的Runnable任务 。
每一个Worker在创建出来的时候 , 会调用它本身的run()方法 , 实现是runWorker(this) , 这个实现的核心是一个while循环 , 这个循环不结束 , Worker线程就不会终止 , 就是这个基本逻辑 。
在这个while条件中 , 有个getTask()方法是核心中的核心 , 它所做的事情就是从等待队列中取出任务来执行:如果没有达到corePoolSize , 则创建的Worker在执行完它承接的任务后 , 会用workQueue.take()取任务、注意 , 这个接口是阻塞接口 , 如果取不到任务 , Worker线程一直阻塞 。如果超过了corePoolSize , 或者allowCoreThreadTimeOut , 一个Worker在空闲了之后 , 会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任务 。注意 , 这个接口只阻塞等待keepAliveTime时间 , 超过这个时间返回null , 则Worker的while循环执行结束 , 则被终止了 。final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 看这里 , 核心逻辑在这里while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 注意 , 核心中的核心在这里Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}答
实现方式非常巧妙 , 核心线程(Worker)即使一直空闲也不终止 , 是通过workQueue.take()实现的 , 它会一直阻塞到从等待队列中取到新的任务 。非核心线程空闲指定时间后终止是通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的 , 一个空闲的Worker只等待keepAliveTime , 如果还没有取到任务则循环终止 , 线程也就运行结束了 。
引申思考
Worker本身就是个线程 , 它再调用我们传入的Runnable.run() , 会启动一个子线程么?如果你还没有答案 , 再回想一下Runnable和Thread的关系 。
5 空闲线程过多会有什么问题
笼统地回答是会占用内存 , 我们分析一下占用了哪些内存 。首先 , 比较普通的一部分 , 一个线程的内存模型:
虚拟机栈本地方法栈程序计数器【10问10答:你真的了解线程池吗 线程是什么】我想额外强调是下面这几个内存占用 , 需要小心:
ThreadLocal:业务代码是否使用了ThreadLocal?就算没有 , Spring框架中也大量使用了ThreadLocal , 你所在公司的框架可能也是一样 。局部变量:线程处于阻塞状态 , 肯定还有栈帧没有出栈 , 栈帧中有局部变量表 , 凡是被局部变量表引用的内存都不能回收 。所以如果这个线程创建了比较大的局部变量 , 那么这一部分内存无法GC 。TLAB机制:如果你的应用线程数处于高位 , 那么新的线程初始化可能因为Eden没有足够的空间分配TLAB而触发YoungGC 。答
线程池保持空闲的核心线程是它的默认配置 , 一般来讲是没有问题的 , 因为它占用的内存一般不大 。怕的就是业务代码中使用ThreadLocal缓存的数据过大又不清理 。
如果你的应用线程数处于高位 , 那么需要观察一下YoungGC的情况 , 估算一下Eden大小是否足够 。如果不够的话 , 可能要谨慎地创建新线程 , 并且让空闲的线程终止;必要的时候 , 可能需要对JVM进行调参 。
6 keepAliveTime=0会怎么样
这也是个争议点 。有的博文说等于0表示空闲线程永远不会终止 , 有的说表示执行完立刻终止 。还有的说等于-1表示空闲线程永远不会终止 。其实稍微看一下源码知道了 , 这里我直接抛出答案 。

在JDK1.8中 , keepAliveTime=0表示非核心线程执行完立刻终止 。
默认情况下 , keepAliveTime小于0 , 初始化的时候才会报错;但如果allowsCoreThreadTimeOut , keepAliveTime必须大于0 , 不然初始化报错 。
7 怎么进行异常处理
很多代码的写法 , 我们都习惯按照常见范式去编写 , 而没有去思考为什么 。比如:
如果我们使用execute()提交任务 , 我们一般要在Runable任务的代码加上try-catch进行异常处理 。如果我们使用submit()提交任务 , 我们一般要在主线程中 , 对Future.get()进行try-catch进行异常处理 。—— 但是在上面 , 我提到过 , submit()底层实现依赖execute() , 两者应该统一呀 , 为什么有差异呢?下面再扒一扒submit()的源码 , 它的实现蛮有意思 。
首先 , ThreadPoolExecutor中没有submit的代码 , 而是在它的父类AbstractExecutorService中 , 有三个submit的重载方法 , 代码非常简单 , 关键代码就两行:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}正是因为这三个重载方法 , 都调用了execute , 所以我才说submit底层依赖execute 。通过查看这里execute的实现 , 我们不难发现 , 它就是ThreadPoolExecutor中的实现 , 所以 , 造成submit和execute的差异化的代码 , 不在这 。那么造成差异的一定在newTaskFor方法中 。这个方法也就new了一个FutureTask而已 , FutureTask实现RunnableFuture接口 , RunnableFuture接口继承Runnable接口和Future接口 。而Callable只是FutureTask的一个成员变量 。
所以讲到这里 , 就有另一个Java基础知识点:Callable和Future的关系 。我们一般用Callable编写任务代码 , Future是异步返回对象 , 通过它的get方法 , 阻塞式地获取结果 。FutureTask的核心代码就是实现了Future接口 , 也就是get方法的实现:
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)// 核心代码s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;// 死循环for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 只有任务的状态是’已完成‘ , 才会跳出死循环if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}get的核心实现是有个awaitDone方法 , 这是一个死循环 , 只有任务的状态是“已完成” , 才会跳出死循环;否则会依赖UNSAFE包下的LockSupport.park原语进行阻塞 , 等待LockSupport.unpark信号量 。而这个信号量只有当运行结束获得结果、或者出现异常的情况下 , 才会发出来 。分别对应方法set和setException 。这就是异步执行、阻塞获取的原理 , 扯得有点远了 。
回到最初我们的疑问 , 为什么submit之后 , 通过get方法可以获取到异常?原因是FutureTask有一个Object类型的outcome成员变量 , 用来记录执行结果 。这个结果可以是传入的泛型 , 也可以是Throwable异常:
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}// get方法中依赖的 , 报告执行结果private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}FutureTask的另一个巧妙的地方就是借用RunnableAdapter内部类 , 将submit的Runnable封装成Callable 。所以就算你submit的是Runnable , 一样可以用get获取到异常 。

不论是用execute还是submit , 都可以自己在业务代码上加try-catch进行异常处理 。我一般喜欢使用这种方式 , 因为我喜欢对不同业务场景的异常进行差异化处理 , 至少打不一样的日志吧 。如果是execute , 还可以自定义线程池 , 继承ThreadPoolExecutor并复写其afterExecute(Runnable r, Throwable t)方法 。或者实现Thread.UncaughtExceptionHandler接口 , 实现void uncaughtException(Thread t, Throwable e);方法 , 并将该handler传递给线程池的ThreadFactory 。但是注意 , afterExecute和UncaughtExceptionHandler都不适用submit 。因为通过上面的FutureTask.run()不难发现 , 它自己对Throwable进行了try-catch , 封装到了outcome属性 , 所以底层方法execute的Worker是拿不到异常信息的 。8 线程池需不需要关闭

一般来讲 , 线程池的生命周期跟随服务的生命周期 。如果一个服务(Service)停止服务了 , 那么需要调用shutdown方法进行关闭 。所以ExecutorService.shutdown在Java以及一些中间件的源码中 , 是封装在Service的shutdown方法内的 。
如果是Server端不重启就不停止提供服务 , 我认为是不需要特殊处理的 。
9 shutdown和shutdownNow的区别

shutdown => 平缓关闭 , 等待所有已添加到线程池中的任务执行完再关闭 。shutdownNow => 立刻关闭 , 停止正在执行的任务 , 并返回队列中未执行的任务 。本来想分析一下两者的源码的 , 但是发现本文的篇幅已经过长了 , 源码也贴了不少 。感兴趣的朋友自己看一下即可 。
10 Spring中有哪些和ThreadPoolExecutor类似的工具

10问10答:你真的了解线程池吗 线程是什么

文章插图
这里我想着重强调的就是SimpleAsyncTaskExecutor , Spring中使用的@Async注解 , 底层就是基于SimpleAsyncTaskExecutor去执行任务 , 只不过它不是线程池 , 而是每次都新开一个线程 。
另外想要强调的是Executor接口 。Java初学者容易想当然的以为Executor结尾的类就是一个线程池 , 而上面的都是反例 。我们可以在JDK的execute方法上看到这个注释:
/** * Executes the given command at some time in the future.The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. */所以 , 它的职责并不是提供一个线程池的接口 , 而是提供一个“将来执行命令”的接口 。真正能代表线程池意义的 , 是ThreadPoolExecutor类 , 而不是Executor接口 。
最佳实践总结
【强制】使用ThreadPoolExecutor的构造函数声明线程池 , 避免使用Executors类的 newFixedThreadPool和newCachedThreadPool 。【强制】 创建线程或线程池时请指定有意义的线程名称 , 方便出错时回溯 。即threadFactory参数要构造好 。【建议】建议不同类别的业务用不同的线程池 。【建议】CPU密集型任务(N+1):这种任务消耗的主要是CPU资源 , 可以将线程数设置为N(CPU核心数)+1 , 比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断 , 或者其它原因导致的任务暂停而带来的影响 。一旦任务暂停 , CPU就会处于空闲状态 , 而在这种情况下多出来的一个线程就可以充分利用CPU的空闲时间 。【建议】I/O密集型任务(2N):这种任务应用起来 , 系统会用大部分的时间来处理I/O交互 , 而线程在处理I/O的时间段内不会占用CPU来处理 , 这时就可以将CPU交出给其它线程使用 。因此在I/O密集型任务的应用中 , 我们可以多配置一些线程 , 具体的计算方法是2N 。【建议】workQueue不要使用无界队列 , 尽量使用有界队列 。避免大量任务等待 , 造成OOM 。【建议】如果是资源紧张的应用 , 使用allowsCoreThreadTimeOut可以提高资源利用率 。【建议】虽然使用线程池有多种异常处理的方式 , 但在任务代码中 , 使用try-catch最通用 , 也能给不同任务的异常处理做精细化 。【建议】对于资源紧张的应用 , 如果担心线程池资源使用不当 , 可以利用ThreadPoolExecutor的API实现简单的监控 , 然后进行分析和优化 。
10问10答:你真的了解线程池吗 线程是什么

文章插图
线程池初始化示例:
private static final ThreadPoolExecutor pool;static {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),threadFactory, new ThreadPoolExecutor.AbortPolicy());pool.allowCoreThreadTimeOut(true);}threadFactory:给出带业务语义的线程命名 。corePoolSize:快速启动4个线程处理该业务 , 是足够的 。maximumPoolSize:IO密集型业务 , 我的服务器是4C8G的 , 所以4*2=8 。keepAliveTime:服务器资源紧张 , 让空闲的线程快速释放 。pool.allowCoreThreadTimeOut(true):也是为了在可以的时候 , 让线程释放 , 释放资源 。workQueue:一个任务的执行时长在100~300ms , 业务高峰期8个线程 , 按照10s超时(已经很高了) 。10s钟 , 8个线程 , 可以处理10 1000ms / 200ms 8 = 400个任务左右 , 往上再取一点 , 512已经很多了 。handler:极端情况下 , 一些任务只能丢弃 , 保护服务端 。作者 | 风楼
本文为阿里云原创内容 , 未经允许不得转载 。

    推荐阅读