线程池 ( ThreadPoolExecutor)
构造函数
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程;如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值maximumPoolSize
表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于corePoolSize
,此值只有在任务比较多,且不能存放在任务队列时,才会用到keepAliveTime
表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁到等于corePoolSize
为止,如果maximumPoolSize
等于corePoolSize
,那么线程池在空闲的时候也不会销毁任何线程unit
表示存活时间的单位,它是配合keepAliveTime
参数共同使用的workQueue
表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行threadFactory
表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// 默认的线程创建工厂,需要实现 ThreadFactory 接口
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 创建线程
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false); // 创建一个非守护线程
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值
return t;
}
}RejectedExecutionHandler
表示指定线程池的拒绝策略,当线程池的任务已经在缓存队列workQueue
中存储满了之后,并且不能创建新的线程来执行此任务时,就会用到此拒绝策略,它属于一种限流保护的机制ctl
:控制状态的属性1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }用一个 AtomicInteger 包装两个字段:
- 高 3 位保存 runState,低 29 位保存 workerCount
用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源
workerCount: 有效线程数
runState: 线程池的运行状态
- 定义
- RUNNING: 接受新任务并处理排队的任务
- SHUTDOWN: 拒绝接受新任务, 但是会处理还在排队的任务
- STOP: 拒绝接受新任务, 也不处理排队中任务, 并且会中断正在执行的任务
- TIDYING: 所有任务都已经停止, workerCount 为 0, 转换为状态 TIDYING 的线程将运行 terminated() 方法
- TERMINATED: terminated() 执行完毕
这些值之间的数字顺序很重要,可以进行有序的比较
runState 随着时间逐步增加,但不一定达到每个状态, 过渡的顺序为:
- RUNNING -> SHUTDOWN, 在调用
shutdown()
时,可能隐藏在finalize()
中调用 - (RUNNING or SHUTDOWN) -> STOP, 在调用
shutdownNow()
时 - SHUTDOWN -> TIDYING, 当队列和池子内的任务都为空时
- STOP -> TIDYING, 当池子内的任务为空时
- TIDYING -> TERMINATED, 当 terminated() 执行完毕时
- RUNNING -> SHUTDOWN, 在调用
线程在
awaitTermination()
中等待, 将在状态变为 TERMINATED 时返回
- 定义
线程数量
线程不是越多越好!
- 线程在java中是一个对象,更是操作系统的资源,创建、销毁需要消耗资源;
- 线程过多,会消耗很多的内存;
- 操作系统需要频繁切换线程上下文,影响性能。
如何确定数量
计算型任务:cpu数量的1-2倍;
IO型任务:根据具体的IO阻塞时长考虑
工作流程
API
- 接口Executor:定义了执行任务的
execute()
; - 接口ExecutorService:继承了接口Executor,拓展了Callable、Future、关闭方法;
- 接口ScheduledExecutorService:继承了接口ExecutorService,增加了定时任务相关方法;
- 实现类ThreadPoolExecutor:基础、标准的线程池实现;
- 实现类ScheduledThreadPoolExecutor:继承了实现类ThreadPoolExecutor,实现了ScheduledExecutorService中相关定时任务的方法。
创建一个核心线程数量5,最大数量10,加开线程存活5秒,等待队列3的线程池,最大容纳13个任务:new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3));
execute
1 | public void execute(Runnable command) { |
- 是否达到核心线程数量?没达到,创建一个工作线程来执行任务;
- 工作队列是否已经满?没满,则将新提交的任务存储到队列;
- 是否达到线程池最大数量?没达到,则创建一个新的工作线程来执行任务(加开的线程如果没有任务会自动销毁);
- 最后,拒绝执行。
addWorker
firstTask
,线程应首先运行的任务,如果没有则可以设置为 nullcore
,判断是否可以创建线程的阀值(最大值),如果等于 true 则表示使用 corePoolSize 作为阀值,false 则表示使用 maximumPoolSize 作为阀值```java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && // 线程池是否已停止 ! (rs == SHUTDOWN && // 线程池是否正在停止 firstTask == null && ! workQueue.isEmpty()) // 线程是否用于执行剩余任务 ) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || // 线程数是否超过容量 wc >= (core ? corePoolSize : maximumPoolSize)) // 是否超过判断的阀值 return false; if (compareAndIncrementWorkerCount(c)) // CAS 尝试登记线程数 break retry; // 登记成功 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 判断线程池状态运行过程中是否有改变 continue retry; // else CAS failed due to workerCount change; retry inner loop }
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 持有引用 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 更新创建过的最大线程数 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 启动线程, 而线程的 run 方法就是执行 runWorker() workerStarted = true; } }
} finally {
if (! workerStarted) addWorkerFailed(w);
}
return workerStarted;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
![addworker](https://cdn.jsdelivr.net/gh/JNhua/blog_images@master/img/20201029110745.png)
## Worker
### 构造函数
```java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread; // Worker 持有的线程
Runnable firstTask; // 初始化的任务,可以为 null
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
......
}
runWorker
1 | final void runWorker(Worker w) { |
getTask
1 | private Runnable getTask() { |
继承 AQS 原因分析
Worker 是通过继承 AQS,使用 AQS 来实现独占锁这个功能。没有使用可重入锁 ReentrantLock,而是使用 AQS,为的就是实现不可重入的特性去反应线程现在的执行状态
- lock 方法一旦获取了独占锁,表示当前线程正在执行任务中
- 如果正在执行任务,则不应该中断线程
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断
- 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收
- 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁。如果使用 ReentrantLock,它是可重入的,这样如果在任务中调用了如 setCorePoolSize 这类线程池控制的方法,会中断正在运行的线程。
- setCorePoolSize方法:
1 | public void setCorePoolSize(int corePoolSize) { |
- interruptIdleWorkers
1 | private void interruptIdleWorkers(boolean onlyOne) { |
如果允许重入,w.tryLock()
为true
,线程就把自己打断了。
此外,在构造方法中执行了setState(-1);,把 state 变量设置为 -1,是因为 AQS 默认的 state 是0,如果刚创建了一个 Worker 对象,还没有执行任务时,这时就不应该被中断:
1
2
3
4
5
6
7
8
9
10
11
12
13protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}- tryAcquire 方法是根据 state 是否是 0 来判断的,所以,
setState(-1);
将 state 设置为 -1 是为了禁止在执行任务前对线程进行中断 - 在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0, 允许中断和 lock
- tryAcquire 方法是根据 state 是否是 0 来判断的,所以,
相关参数
1 | // 用于操作 workers |
- 为什么workers使用HashSet和ReentraintLock而不使用并发的set ?
- 简化了统计数据,比如说将 worker 添加到 workers 后还需要判断是否需要更新 largestPoolSize 等,workers 只在获取到 mainLock 的情况下才会进行读写
- mainLock 也用于在中断线程
interruptIdleWorkers
的时候串行执行,否则可能会并发进行线程中断,引起不必要的中断高峰。否则退出中的线程会并发地中断那些还没有被中断的线程。
停止
shutdown:不接收新任务,等待任务执行结束;
shutdownNow:立即结束所有线程,队列中线程不再执行,不接收新任务,返回未结束任务列表(队列中的)。
1 | public void shutdown() { |
回收
processWorkerExit
- 线程池中线程的销毁依赖 JVM 的垃圾回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可
- Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务
- 当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用
- 线程回收的工作在 processWorkerExit 方法内完成
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
tryTerminate : 根据状态判断是否结束
1 | final void tryTerminate() { |
schedule
- scheduleAtFixedRate:如果执行时间大于周期时间,在上一个任务执行完毕后,立即执行下一个;
- scheduleWithFixedDelay:如果执行时间大于周期时间,在上一次完毕后,再重新计时。
相关问题
ThreadPoolExecutor 的执行方法有几种?它们有什么区别?
execute()
VSsubmit()
都是用来执行线程池任务,它们最主要的区别是
submit()
方法可以接收线程池执行的返回值,而execute()
不能接收返回值```java
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10L,TimeUnit.SECONDS, new LinkedBlockingQueue(20));
// execute 使用
executor.execute(new Runnable() {@Override public void run() { System.out.println("Hello, execute."); }
});
// submit 使用
Futurefuture = executor.submit(new Callable () { @Override public String call() throws Exception { System.out.println("Hello, submit."); return "Success"; }
});
System.out.println(future.get());1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
* `execute()` 方法属于 `Executor` 接口的方法,而 `submit() `方法则是属于 `ExecutorService` 接口的方法
* 在 `submit() `中处理的任务如果抛出异常, 只有在调用返回的 `Future `对象 `get `方法时才会抛出
### **拒绝策略的分类有哪些? 如何自定义拒绝策略?**
* 自带的拒绝策略有 4 种:
* **AbortPolicy**,终止策略,线程池会抛出异常并终止执行,它是**默认**的拒绝策略
* **CallerRunsPolicy**,把任务交给当前线程来执行
* **DiscardPolicy**,忽略此任务(最新的任务)
* **DiscardOldestPolicy**,忽略最早的任务(最先加入队列的任务)
* 自定义拒绝策略
* 自定义拒绝策略只需要新建一个` RejectedExecutionHandler` 对象,然后重写它的 `rejectedExecution() `方法即可
```java
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
new RejectedExecutionHandler() { // 添加自定义拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 业务处理方法
System.out.println("执行自定义拒绝策略");
}
});
for (int i = 0; i < 6; i++) {
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
线程池的工作队列有哪些?
- ArrayBlockingQueue, 是一个用数组实现的有界阻塞队列,按 FIFO 排序任务, 支持公平锁和非公平锁
- LinkedBlockingQueue, 基于链表结构的阻塞队列,按 FIFO 排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为 Integer.MAX_VALUE,吞吐量通常要高于 ArrayBlockingQuene
- DelayQueue, 是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序
- PriorityBlockingQueue, 是具有优先级的无界阻塞队列, 不能保证同优先级元素的顺序
- SynchronousQueue, 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue
- SynchronousQueue, 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue
- LinkedBlockingDeque, 一个由链表结构组成的双向阻塞队列,队列头尾都可以插入和移除元素, 多线程并发时, 可以将锁的竞争最多 降到一半
ThreadPoolExecutor 如何实现扩展?
- 通过重写
beforeExecute()
和afterExecute()
方法,我们可以在扩展方法中添加日志或者实现数据统计,比如统计线程的执行时间
关于 Executors 内的线程池对象
- Executors 源码中
Executors.newFixedThreadPool()
、Executors.newSingleThreadExecutor()
和Executors.newCachedThreadPool()
等方法的底层都是通过ThreadPoolExecutor
实现的FixedThreadPool
(固定数目线程的线程池)- 适用于处理 CPU 密集型的任务,确保 CPU 在长期被工作线程使用的情况下,尽可能的少的分配线程
- 特点
- 核心线程数和最大线程数大小一样
- keepAliveTime 为 0
- 阻塞队列为 LinkedBlockingQueue
- CachedThreadPool (可缓存线程的线程池)
- 适用于并发执行大量短期的小任务
- 特点
- 核心线程数为 0
- 最大线程数为 Integer.MAX_VALUE
- 阻塞队列为 SynchronousQueue
- 非核心线程空闲存活时间为 60 秒
- SingleThreadExecutor (单线程的线程池)
- 适用于串行执行任务的场景,一个任务一个任务地执行
- 特点
- 核心线程数为 1
- 最大线程数也为 1
- 阻塞队列是 LinkedBlockingQueue
- keepAliveTime 为 0
- ScheduledThreadPool (定时及周期执行的线程池)
- 周期性执行任务的场景,需要限制线程数量的场景
- 特点
- 最大线程数为 Integer.MAX_VALUE
- 阻塞队列是 DelayedWorkQueue
- keepAliveTime 为 0
- scheduleAtFixedRate() 按某种速率周期执行
- scheduleWithFixedDelay() 在某个延迟后执行
- 在阿里巴巴的《 Java 开发手册 》中是这样规定的:
- 线程池不允许使用
Executors
去创建,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。 Executors
返回的线程池对象的弊端如下:FixedThreadPool
和SingleThreadPool
:允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致 OOMCachedThreadPool
和ScheduledThreadPool
:允许的创建线程数量为Integer.MAX_VALUE
,可能会创建大量的线程,从而导致 OOM
- 线程池不允许使用