简单说就是管理线程的池子
帮我们管理线程,提高线程的可管理性(统一的分配、调优、监控)
重复利用,降低资源消耗
提高响应速度
Java中的线程池是如何实现的?
线程池中线程被抽象为静态内部类Worker,是基于AQS实现的,存放在HashSet中。
Worker线程管理
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker
要被执行的线程存放在BlockingQueue中;
基本思想就是从workQueue中取出要执行的任务,放在worker中处理
通过ThreadPoolExecutor创建
线程池参数:
corePoolSize: 线程池核心线程数最大值
maximumPoolSize: 线程池最大线程数大小
keepAliveTime: 线程池中非核心线程空闲的存活时间大小
unit: 线程空闲存活时间单位
workQueue: 存放任务的阻塞队列
threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
handler: 线城池的饱和策略事件,主要有四种类型
默认情况下,线程池在初始的时候,线程数为0。当接收到一个任务时,如果线程池中存活的线程数小于corePoolSize核心线程,则新建一个线程。另外,如果想在线程初始化时候就有核心线程,可以调用prestartCoreThread()或prestartAllCoreThread(),前者是初始一个,后者是初始全部。
ThreadPoolExecutor运行流程
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
线程池原理
提交一个任务,线程池里存活的核心线程数小于corePoolSize时,线程池会创建一个核心线程去处理提交的任务
如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。
当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建非核心线程执行提交的任务;如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。
拒绝策略
AbortPolicy直接抛出异常阻止线程运行;(默认的)
DiscardPolicy 直接丢弃任务,不做处理
DiscardOldestPolicy 丢弃队列里最老的任务,将当前这个任务继续提交给线程池
CallerRunsPolicy 交给线程池调用所在的线程进行处理
如果线程池中的一个线程运行时出现了异常,会发生什么?
如果提交任务的时候使用了submit,则返回的feature里会存有异常信息,但是如果是execute则会打印出异常栈。但是不会给其他线程造成影响。之后线程池会删除该线程,会新增加一个worker。
有哪些线程池?
newFixedThreadPool (固定数目线程的线程池)
阻塞队列为无界队列LinkedBlockingQueue。阻塞队列是无界的,这样就提交任务高峰期有可能会造成任务一直堆积在队列里,超出内存容量最终导致内存溢出。所以要配合上合适的策略
适用于处理CPU密集型的任务,适用执行长期的任务
newCachedThreadPool(可缓存线程的线程池)
阻塞队列是SynchronousQueue
适用于并发执行大量短期的小任务
工作机制:
提交任务
因为没有核心线程,所以任务直接加到SynchronousQueue队列。
判断是否有空闲线程,如果有,就去取出任务执行。
如果没有空闲线程,就新建一个线程执行。
执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。
风险点
阻塞队列采用的SynchronousQueue,所以是不存储等待任务的,并且最大线程数的值是Integer.MAX_VALUE。所以当任务提交量高峰时,相当于无限制的创建线程。并且空闲时间是60秒,QPS高峰期最终会将服务器资源耗尽,所以真正实际应用中不建议使用。所以要配合上合适的策略
newSingleThreadExecutor(单线程的线程池)
阻塞队列是LinkedBlockingQueue。任务队列是无界的LinkedBlockingQueue,存在任务队列无限添加造成OOM的风险。
适用于串行执行任务的场景,一个任务一个任务地执行
newScheduledThreadPool(定时及周期执行的线程池)
阻塞队列是DelayedWorkQueue
周期性执行任务的场景,需要限制线程数量的场景
线程池的最大线程数也是Integer.MAX_VALUE,可以理解为会无限创建线程。存在将资源耗尽的风险,所以一般场景下不建议使用。所以要配合上合适的策略
newWorkStealingPool(一个具有抢占式操作的线程池)
参数中传入的是一个线程并发的数量,这里和之前就有很明显的区别,前面4种线程池都有核心线程数、最大线程数等等,而这就使用了一个并发线程数解决问题。这个线程池不会保证任务的顺序执行,也就是 WorkStealing 的意思,抢占式的工作,哪个线程抢到任务就执行。
有哪几种工作队列?
ArrayBlockingQueue
(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量
LinkedBlockingQueue
(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,
吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
使用无界队列的线程池会导致内存飙升吗?:
会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。
DelayQueue
(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
PriorityBlockingQueue
(优先级队列)是具有优先级的无界阻塞队列;
SynchronousQueue
(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,newCachedThreadPool线程池使用了这个队列
线程池的异常处理
try-catch捕获异常
submit执行, Future.get()接收异常
重写ThreadPoolExecutor.afterExecute方法,处理传递的异常引用
实例化时,传入自己的 ThreadFactory,设置Thread UncaughtExceptionHandler处理未检测的异堂
线程池的状态
RUNNING: 能接受新提交的任务,并且也能处理阻塞队列中的任务。
SHUTDOWN: 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP: 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
TIDYING: 所有的任务都已终止了,workerCount(有效线程数)为0。
TERMINATED: 在terminated()方法执行完后进入该状态。
维护线程池的生命周期方法
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。
在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多。
美团的文章:
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html