为什么要使用线程池?
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
线程池主要特点为:线程复用;控制最大并发数;管理线程。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
线程池如何使用
架构说明
线程的四种创建方法:
- Thread
- Runnable
- Callable
- 线程池
Java中的线程池是通过Executor框架实现 Executors(辅助工具类) 池化技术,最后关闭池子就行
核心类:ThreadPoolExecutor
永远传参传接口
线程池的重要参数
源码查看:
class ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NotNull TimeUnit unit,
@NotNull BlockingQueue<Runnable> workQueue,
@NotNull ThreadFactory threadFactory,
@NotNull RejectExecutionHandler handler
}
七大参数
- 1.
corePoolSize
:线程池中的常驻核心线程数 - 2.
maximumPoolSize
:线程池能够容纳同时执行的最大线程数,此值必须大于等于1. - 3.
keepAliveTime
:多余的空闲线程的存活时间。当前线程池数量超过corePoolSize
时,当空闲时间达到keepAliveTime
值时,多余空闲线程会被销毁直到只剩下corePoolSize
个线程为止。 - 4.
unit
:keepAliveTime
的单位 - 5.
workQueue
:任务队列,被提交但尚未被执行的任务。(阻塞队列) - 6.
threadFactory
:表示生成线程池中工作线程的线程工厂,用于创建线程一般用默认的即可。 - 7.
handler
:拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数的时候,应该如何做。
线程池的底层工作原理
非常重要
1 在创建了线程池后,等待提交过来的任务请求。
2 当调用execute()方法添加一个请求任务时,线程池会做如下判断:
2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建马上创建线程运行这个任务。
2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列。
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务。(队列出列,刚来的去队列中)
2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池
会启动饱和拒绝策略来执行。
3.当一个线程完成任务时,它会从队列中取下一个任务来执行。
4.当一个线程无事可做超过一定的时间(keepAlilveTime)时,线程池会判断:
如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
所以线程池的所有任务完成后它最终会收缩到corePoolSize的大小
用银行的例子来说明:
-
N个窗口 :
corePoolSize
-
总共有K个窗口:
maximumPoolSize
-
候客厅 :
BlockedQueue
1 银行开门,等待顾客上门办理业务
2 今天开N个窗口,有N个柜员提供服务。
2.1 如果顾客数小于等于N,可以立即为每一个顾客办理业务,不用等待。
2.2 如果又来了M个顾客,M个顾客在候客厅坐得下,那么顾客就在候客厅等候
2.3 如果候客厅坐满了,银行大堂经理发现窗口不够用,叫人过来加班,开启其他没有工作的窗口。
2.4 如果候客厅满了,开启的工作窗口也是最大数K,就会启动拒绝策略。(今天人太多了,明天来吧;或者您等会再来看看,等人少的时候来)
3 当一个顾客的业务办理完成后,叫一下个顾客来办理业务
4 当人数很少的时候,额外加班的窗口会慢慢关闭,留下N个窗口。
线程池的拒绝策略
等待队列也已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。 这时候我们就需要拒绝策略机制合理的处理这个问题。
四种拒绝策略:
AbortPolicy
(默认):直接抛出RejectedExecutionException
异常阻止系统正常运行。CallerRunsPolicy
:“调用者运行“一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者( main线程 ),从而降低新调用者的流量(谁让你找我的,你就找谁)DiscardoldestPolicy
:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。DiscardPolicy
:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案。
这四种内置拒绝策略均实现了RejectExecutionHandler
接口
面试题目:你在工作中单一的/固定数的/可变的三种创建线程池的方法,你用那个多?
阿里开发手册
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】线程池不允许使用Executors去创建,而是通过ThreadPooLExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool
和SingleThreadPool
:允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致OOM
(out of memory)。
2)CachedThreadPool
和ScheduledThreadPool
:允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致OOM
。
配置线程池
Runtime.getRuntime().availableProcessors()
查看核心数
corePoolSize
:1或者0
如何设置 maximumPoolSize
,分情况考虑,要看业务是CPU密集型还是IO密集型
CPU密集型
CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。
CPU密集型任务配置尽可能少的线程数量:
一般公式:CPU核数+1个线程的线程池
IO密集型
比如:数据库读写
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2
I0密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运行I0密集型的任务会导致浪费大量的CPU运算能力浪费在等待。
所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
IO密集型时,大部分线程都阻塞,故需要多配置线程数:
参考公式:CPU核数/1-阻塞系数阻塞系数在0.8~0.9之间比如8核CPU:8/(1-0.9)=80个线程数
附录
内置的四种拒绝策略源码
/* Predefined RejectedExecutionHandlers */
// ThreadPoolExecutor.java
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
使用 ThreadPooLExecutor
创建线程池
public class MyThreadPoolDemo {
public static void main(String[] args){
//通过 ThreadPooLExecutor 创建线程池
ExecutorService threadPool = new ThreadPoolExecutor(2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());//这里的策略可以改变
try{
for(int i=1;i<=11;i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
private static void threadPoolInit() {
ExecutorService threadPool = Executors.newCachedThreadPool();//一池N个线程
//模拟多个用户来办理业务,每个用户就是一个来自外部的请求线程
try{
for(int i=1;i<=10;i++){
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}