五:线程池
5.1 线程池概述
5.1.1 为什么要使用线程池
1.在业务开发过程中,为了提升效率,可以将一个业务通过多线程的方式去执行。
比如有一个较大的任务,可以将任务分成几块,分别交给几个线程去执行,最终做一个汇总即可。
比如做业务操作时,需要发送短信或邮件,可以采用异步的方式,其实就是构建另一个线程去执行。
2.如果不采用线程池,那么每次使用线程的时候,都需要先创建一个线程,然后使用完再销毁,这样会对系统造成额外的开销 。在处理过程中,到底由多线程执行了多少个任务,以及每个线程的开销都无法统计和管理。
3.所以需要一个线程池去管理这些内容。线程池和连接池的概念类似,都是在一个java的集合中存储大量的线程对象,每次需要执行异步操作或多线程操作时,不需要重新创建线程,重新从集合中拿取线程对象直接执行方法就可以了。
5.1.2 JDK中线程池
JDK中提供了线程池的类。
在线程池创建初期,可以将任务提交到线程池,会根据一些机制来执行这些任务:
- 任务直接被执行
- 任务被暂时存储起来,等到有空闲线程再处理
- 任务被直接拒绝
JDK提供的线程池中记录了每个线程处理了多少个任务,以及整个线程池处理了多少个任务,同时还可以针对任务执行前后做一些钩子函数的实现(类似spring的前置处理和后置处理)。
比如可以在任务执行前后做一些日志记录,这样可以多记录一些信息,方便后面统计线程池执行任务时的一些内容参数。
5.2 JDK中自带线程池
5.2.1 概述
JDK中基于Executors提供了很多线程池。但是一般不会使用。
5.2.2 newFixedThreadPool
5.2.2.1 结构
固定线程数线程池。这个线程池的线程数是固定的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构建时需要给newFixedThreadPool设置一个nThreads属性,而nThreads其实就是设置线程个数。当前线程池的本质其实就是使用ThreadPoolExecutor。
线程池构建好后,线程个数已经固定好了,但是线程时懒加载的。即刚创建好的线程池中其实是没有线程的,当有一个任务进来时,就会创建一个线程来执行这个任务。当任务数量超过线程总数时,任务就会进入LinkedBlockingQueue中进行等待。
5.2.2.2 使用
java:
package com.xqm.juc.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test01_newFixedThreadPool {
public static void main(String[] args) {
ExecutorService threadPool=Executors.newFixedThreadPool(2);
// 提交任务使用的是execute
for (int i = 0; i < 5; i++) {
threadPool.execute(() -> {
System.out.println("当前执行的线程的线程名为:" + Thread.currentThread().getName());
});
}
}
}
结果:
当前执行的线程的线程名为:pool-1-thread-2
当前执行的线程的线程名为:pool-1-thread-1
当前执行的线程的线程名为:pool-1-thread-2
当前执行的线程的线程名为:pool-1-thread-1
当前执行的线程的线程名为:pool-1-thread-2
5.2.3 newSingleThreadExecutor
单例线程池。线程池中只有一个核心线程数在处理任务。
5.2.3.1 结构
// 构建单例线程池
// 适合顺序处理的一些业务
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
// 核心线程数为1 最大线程数为1 存活时间为0L
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
// 多余的任务存放在LinkedBlockingQueue
new LinkedBlockingQueue<Runnable>()));
}
外层的FinalizableDelegatedExecutorService:
// 包装类
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
// 这里和正常的ThreadPoolExecutor没区别
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
// finalize是当前对象被GC回收之前执行的方法
protected void finalize() {
// 当前FinalizableDelegatedExecutorService的目的是为了在当前线程池被GC回收之前
// 可以被执行shutdown方法
// shutdown方法是将当前线程池停止,并且干掉工作线程/核心线程
super.shutdown();
}
}
但是不能基于这种方法保证线程池一定会执行shutdown,finaliza在执行时,是守护线程,这种线程无法保证一定可以执行完毕。
因此在使用线程池时,如果线程池是基于业务构建的,在使用完毕后,一定要手动执行shutdown,否则会造成JVM中一堆线程。
5.2.3.2 使用
package com.xqm.juc.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test02_newSingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{
System.out.println("当前线程的名称为:"+Thread.currentThread().getName());
});
executorService.execute(()->{
System.out.println("当前线程的名称为:"+Thread.currentThread().getName());
});
executorService.execute(()->{
System.out.println("当前线程的名称为:"+Thread.currentThread().getName());
});
executorService.execute(()->{
System.out.println("当前线程的名称为:"+Thread.currentThread().getName());
});
}
}
结果
当前线程的名称为:pool-1-thread-1
当前线程的名称为:pool-1-thread-1
当前线程的名称为:pool-1-thread-1
当前线程的名称为:pool-1-thread-1
会发现执行任务的只有一个线程。
5.2.3.3 测试shutdown
不调用shutdown,查看线程是否被回收
package com.xqm.juc.threadPool;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test02_newSingleThreadExecutor_02 {
public static void main(String[] args) throws InterruptedException, IOException {
newThreadPool();
System.gc();
// 保证GC执行完毕
TimeUnit.SECONDS.sleep(5);
// 保证JVM不会停止,等待控制台录入信息
// 可以看到,即便调用gc,线程仍然存在,并没有被回收
System.in.read();
}
private static void newThreadPool(){
ExecutorService threadPool = Executors.newFixedThreadPool(200);
for (int i = 0; i < 200; i++) {
final int a=i;
threadPool.execute(()->{
System.out.println(a);
});
}
}
}
结果:
可以看到,即便时执行了System.gc(),线程仍然在后台。
调用shutdown,查看线程是否被回收:
package com.xqm.juc.threadPool;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test02_newSingleThreadExecutor_02 {
public static void main(String[] args) throws InterruptedException, IOException {
newThreadPool();
System.gc();
// 保证GC执行完毕
TimeUnit.SECONDS.sleep(5);
// 保证JVM不会停止,等待控制台录入信息
// 可以看到,即便调用gc,线程仍然存在,并没有被回收
System.in.read();
}
private static void newThreadPool(){
ExecutorService threadPool = Executors.newFixedThreadPool(200);
for (int i = 0; i < 200; i++) {
final int a=i;
threadPool.execute(()->{
System.out.println(a);
});
}
threadPool.shutdown();
}
}
结果:
调用完之后,发现线程已经不在了。
注意:如果是局部变量,仅限当前使用的线程池,在使用完毕后记得shutdown,避免线程无法结束。
如果是全局的线程池,用完之后不要shutdown,因为其他业务也要使用当前线程池。
5.2.4 newCachedThreadPool
可缓存的线程池。
5.2.4.1 结构
当第一次提交任务到线程池时,会直接构建一个工作线程。当这个工作线程执行完任务后,60s内没有新的任务,就会结束线程。如果60s内有任务,会继续执行。如果没有线程是空闲的,就会构建线程去执行。
最大的特点就是,只要有任务进来,就一定会有工作线程去处理。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
// 60s
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
5.2.4.2 测试
package com.xqm.juc.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test03_newCachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
final int m=i;
executorService.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(m);
});
}
executorService.shutdown();
}
}
5.2.5 newScheduledThreadPool
定时任务线程池。这个线程池可以以一定周期去执行一个任务或者延迟多久去执行一个任务。
5.2.5.1 结构
从源码中可以看出,是基于ScheduledThreadPoolExecutor构建的线程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
构建的是ScheduledThreadPoolExecutor类型的线程池。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
所以本质上还是正常的线程池,只不过在原来的基础上实现了定时任务的功能。
原理上是基于DelayQueue实现的延迟执行,周期性执行是任务执行完毕后,再次扔回到阻塞队列中。
5.2.5.2 测试
package com.xqm.juc.threadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Test04_newScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);
// 正常执行
threadPool.execute(()->{
System.out.println("这是正常执行的threadPool");
});
// 延迟执行,当前任务延迟5s后执行
threadPool.schedule(()->{
System.out.println(Thread.currentThread().getName()+"这是延迟5s执行的线程");
},5,TimeUnit.SECONDS);
// 周期执行,这个是按3s周期执行的,因为3s大于1s。这个方法计算下次执行时间,是按任务开始之后执行
threadPool.scheduleAtFixedRate(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"这是周期执行的线程");
// 延迟2s后执行,然后以3s周期一直执行,因为休眠3s后超过了1s
},2,1,TimeUnit.SECONDS);
// 周期执行,这个是按4s周期执行的,这个方法计算下次执行时间,是按任务结束之后执行
threadPool.scheduleWithFixedDelay(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"这是周期执行的线程");
// 延迟5s后执行,然后以4s周期一直执行
},5,1,TimeUnit.SECONDS);
}
}
结果:
这是正常执行的threadPool
pool-1-thread-2这是周期执行的线程
pool-1-thread-1这是延迟5s执行的线程
pool-1-thread-3这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-2这是周期执行的线程
pool-1-thread-4这是周期执行的线程
pool-1-thread-3这是周期执行的线程
pool-1-thread-5这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
pool-1-thread-1这是周期执行的线程
scheduleAtFixedRate特点:这个方法在计算下次执行时间时,是从任务刚刚开始时就计算的。
scheduleWithFixedDelay特点:这个方法计算下次执行时间,是按任务结束之后开始计算时间。
5.2.6 newSingleThreadScheduledExecutor
单例定时任务线程池。实际和定时任务线程池相同,只不过默认核心线程池为1,并且用完之后就扔掉。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
5.2.7 newWorkStealingPool
newWorkStealingPool和之前的线程池有很大区别。
之前定长、缓存、单例、定期线程池都是基于ThreadPoolExecutor去实现的。
5.2.7.1 结构
newWorkStealingPool是基于ForkJoinPool构建出来的。
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
5.2.7.2 ThreadPoolExecutor和ForkJoinPool的区别
ThreadPoolExecutor的核心点:
在ThreadPoolExecutor中,只有一个阻塞队列存放当前任务。
ForkJoinPool的核心点:
当有一个特别大的任务时,如果采用上面方式,这个大任务只能让某一个线程去执行,其他线程继续等待。ForkJoinPool的第一个特点就是将大任务拆分成小任务,放到当前线程的阻塞队列中,其他空闲线程就可以去处理有任务的线程的阻塞队列中的任务。
案例:
单线程处理一个任务,计算总和。
package com.xqm.juc.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test05_newWorkStealingPool {
static int[] num=new int[500_000_000];
static {
for (int i = 0; i < num.length; i++) {
num[i]=(int) (Math.random()*1000);
}
}
public static void main(String[] args) {
// 单线程累加5亿数据
System.out.println("计算数组总和");
long start=System.nanoTime();
int total=0;
for (int i = 0; i < num.length; i++) {
total+=num[i];
}
long end=System.nanoTime();
//单线程运算结果为:642796714,计算时间为:164259600
System.out.println("单线程运算结果为:"+total+",计算时间为:"+(end-start));
}
}
结果
计算数组总和
单线程运算结果为:642796714,计算时间为:164259600
ForkJoinPool计算任务
package com.xqm.juc.threadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Test05_newWorkStealingPool02 {
static int[] num = new int[500_000_000];
static {
for (int i = 0; i < num.length; i++) {
num[i] = (int) (Math.random() * 1000);
}
}
public static void main(String[] args) {
// 单线程累加5亿数据
System.out.println("计算数组总和");
long start = System.nanoTime();
int total = 0;
for (int i = 0; i < num.length; i++) {
total += num[i];
}
long end = System.nanoTime();
//单线程运算结果为:642796714,计算时间为:164259600
System.out.println("单线程运算结果为:" + total + ",计算时间为:" + (end - start));
// 多线程累加5亿数据
// 使用forkJoinPool时,不推荐使用Runnable和Callable,而是使用另外两种任务的描述方式
// Runnable(没有返回结果)->RecursiveAction
// Callable(有返回结果)->RecursiveTask
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool(10);
long startTime = System.nanoTime();
ForkJoinTask<Integer> task = forkJoinPool.submit(new SumRecursiveTask(0, num.length - 1));
Integer result = task.join();
long endTime = System.nanoTime();
System.out.println("分而治之运算结果为:" + result + ",计算时间为:" + (endTime - startTime));
}
static class SumRecursiveTask extends RecursiveTask<Integer> {
//指定一个线程处理哪个位置的数据
private int start, end;
//步长 5千万,10个线程
private int MAX_STRIDE = 50_000_000;
public SumRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// 在这个方法中,需要设置好任务拆分的逻辑和聚合的逻辑
int sum = 0;
int stride = end - start;
if (stride <= MAX_STRIDE) {
// 可以处理任务
for (int i = start; i <= end; i++) {
sum+=num[i];
}
} else {
// 将任务进行拆分
int middle=(end+start) / 2;
// 声明为2个任务
SumRecursiveTask leftTask = new SumRecursiveTask(start, middle);
SumRecursiveTask rightTask = new SumRecursiveTask(middle + 1, end);
// 分别执行两个任务
leftTask.fork();
rightTask.fork();
// 等待结果并且获取sum
sum= leftTask.join()+ rightTask.join();
}
return sum;
}
}
}
结果:
计算数组总和
单线程运算结果为:646785622,计算时间为:140342600
分而治之运算结果为:646785622,计算时间为:80721300
注意:也不是所有任务都能拆分,必须任务大,耗时长,并且任务可以被拆分和聚合的运算才行。
5.3 ThreadPoolExecutor基础
5.3.1 线程池的七个核心参数
前面的Executors中构建线程池的方式,大多数都是基于ThreadPoolExecutor去new出来的。
在ThreadPoolExecutor中,有7个核心参数,每个参数都有决定性的作用。
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 初始化线程
}
- corePoolSize:核心线程数。当前任务执行结束后,线程不会被销毁。
- maximumPoolSize:最大线程数。代表当前线程池中最多有多少线程,非核心线程数=最大线程数-核心线程数。非核心线程数在任务执行完之后会进销毁。
- keepAliveTime:存活时间/等待时间,非核心线程数在阻塞队列位置等待时间。
- unit:等待时间的单位。
- workQueue:阻塞队列。任务在没有核心工作线程处理时,会扔到阻塞队列中进行等待。
- threadFactory:线程工厂。用来构建线程。设置Thread的一些信息。
- handler:拒绝策略。当线程池无法处理任务时,阻塞队列已满的情况,会执行拒绝策略,默认有四种拒绝策略,也可以自定义。
拒绝策略:(当核心线程数满了之后,往阻塞队列中放任务,阻塞队列满了之后,会创建线程到最大线程数,最大线程数也满了情况下,就会执行拒绝策略)
-
AbortPolicy:会直接抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
-
DiscardPolicy:会直接丢弃最新的任务,并且没有提示。也就是什么都不做。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
-
DiscardOldestPolicy:丢弃阻塞队列中最早的任务,通常是存活时间最长的任务。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // poll取出队列中第一个任务 e.getQueue().poll(); e.execute(r); } }
-
CallerRunsPolicy:将这个任务交给提交任务的线程执行,这样任务既不会被丢弃,当前线程也不会提交新的任务了
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
5.3.2 为什么要自定义线程池
- 如果使用JDK自带的线程池,可以设置的核心参数最多只有两个,这样会导致对线程池的控制力度很粗。所以需要自己自定义线程池。
5.3.3 ThreadPoolExecutor使用
package com.xqm.juc.threadPool;
import java.util.concurrent.*;
public class Test06_ThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.构建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5,
10, TimeUnit.SECONDS
, new ArrayBlockingQueue<>(4),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// 线程工场
Thread thread = new Thread(r);
thread.setName("test-ThreadPoolExecutor");
return thread;
}
// 默认的拒绝策略:new ThreadPoolExecutor.DiscardPolicy());
// 使用lambda表达式来自定义拒绝策略
}, (r, executor) -> {
System.out.println("这是拒绝策略");
});
// 2.让线程池处理没有返回结果的任务
threadPoolExecutor.execute(()->{
System.out.println("没有返回结果的任务");
});
// 3.让线程池处理有返回结果的任务
Future<Object> future = threadPoolExecutor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return "有返回结果";
}
});
Object o = future.get();
System.out.println("有返回结果的任务---"+o);
// 如果是局部变量的线程池,用完要shutdown
threadPoolExecutor.shutdown();
}
}
5.4 ThreadPoolExecutor源码分析
5.4.1 线程池核心属性
核心属性就是ctl的值,高3位为线程状态值,低29位为工作线程数量值。
// 线程池的核心属性
// ctl就是int类型的数值,内部是基于AtomicInteger套了一层,进行运算是,是原子性的
// ctl表示线程池中两个核心状态:
// 一是线程池的状态:ctl的高3位,代表线程池的状态
// 二是工作线程的数量:ctl的低29位,表示工作线程的个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.SIZE:获取Integer的bit位个数,也就是32
// 声明了一个常量,COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY=1左移29位,然后减1
// 00000000 00000000 00000000 00000001
// 00100000 00000000 00000000 00000000
// 00011111 11111111 11111111 11111111
// 所以CAPACITY就是工作线程的最大值
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态的表示
// 当前线程状态中,只有RUNNING状态代表线程池没问题,可以正常接受任务处理
// -1 << COUNT_BITS:高三位为111,代表RUNNING,可以处理阻塞队列中的任务,可以正常接收任务处理
private static final int RUNNING = -1 << COUNT_BITS;
// 0 << COUNT_BITS: 高三位为000,代表SHUTDOWN,可以处理阻塞队列中任务,但是不会接收新的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 1 << COUNT_BITS: 高三位为001,代表STOP,不会接收新的任务,正在处理中的任务线程会被打断,
// 只是会进行中断位的标记,具体是否能打断还得看是否有中断状态.同时阻塞队列中任务一个不管.
private static final int STOP = 1 << COUNT_BITS;
// 2 << COUNT_BITS:高三位为010,代表TIDYING,这个状态是从SHUTDOWN或者STOP转换过来的,代表
// 当前线程池马上关闭,就是一个过渡状态.
private static final int TIDYING = 2 << COUNT_BITS;
// 3 << COUNT_BITS:高三位为011,代表TERMINATED,这个状态是从TIDYING转换过来的,只需要执行一个
// TERMINATED方法.
private static final int TERMINATED = 3 << COUNT_BITS;
线程池的状态示意图:
获取线程状态和工作线程数的方法:
// 使用下面两个的方法时,需要传递ctl的值过来
// 拿到高3位的值.也就是线程池运行的状态.c为ctl的值,CAPACITY为00011111 11111111 11111111 11111111
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 拿到低29位的值,也就是工作线程的数量.CAPACITY为00011111 11111111 11111111 11111111
private static int workerCountOf(int c) { return c & CAPACITY; }
5.4.2 线程池有参构造
最主要是核心线程数可以为0的。
// 线程池的有参构造方法.无论调用重载的哪个方法,最终都会调用这个放.
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 健壮性校验
// 核心线程数不能为负数,但是可以为为0,比如newCachedThreadPool就是核心线程数为0
if (corePoolSize < 0 ||
// 最大线程数必须大于0
maximumPoolSize <= 0 ||
// 最大线程数必须大于等于核心线程数
maximumPoolSize < corePoolSize ||
// 非核心线程数的最大空闲时间/存活时间,必须大于等于0
keepAliveTime < 0)
throw new IllegalArgumentException();
// 如果阻塞队列为null,或者线程工场,或者拒绝策略不能为null
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 这行内容不需要过多关注,系统资源访问策略,和线程池核心业务关系不大.
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 各种赋值,赋值到成员变量
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
// 在JUC包下,凡是涉及到时间的,基本都是用纳秒来计算
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
5.4.3 线程池的execute方法
execute方法是提交任务到线程池的核心方法。
// 提交任务到线程池的核心方法
// command就是提交过来的任务
public void execute(Runnable command) {
// 提交的任务不能为null
if (command == null)
throw new NullPointerException();
// 获取核心属性ctl,用于后面的判断
int c = ctl.get();
// 线程池中使用的都是懒加载
// 工作线程数小于核心线程数,那么就添加核心线程
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程 addWorker(提交的任务,是核心线程吗)
// addWorker返回true,代表添加核心线程成功
// addWorker中会基于线程池状态,以及工作线程个数做判断,查看是否能添加工作线程
if (addWorker(command, true))
// 添加工作线程成功就直接返回
return;
// 到这说明线程池状态或者工作线程个数发生改变,导致添加工作线程失败,重新获取一次ctl
c = ctl.get();
}
// 走到这代表添加核心线程失败
// 判断工作线程是否为RUNNING,如果是,那就基于阻塞队列的offer方法添加到阻塞队列
// 如果添加到阻塞队列成功,走if内部
if (isRunning(c) && workQueue.offer(command)) {
// 这一步的目的是,如果任务在扔到阻塞队列前状态改变了
// 那么就需要重新获取线程池状态
int recheck = ctl.get();
// 如果线程池的状态不为RUNNING,那么就将任务从阻塞队列中移除,并且直接执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 到这,说明阻塞队列中有刚刚放进去的任务
// 一种是线程池的状态为RUNNING,另一种是线程池的状态不为RUNNING,并且移除失败
// 查看一下工作线程数是不是0个,防止有饥饿问题,也就是队列中有任务,但是没有线程去处理
// 发生饥饿问题有两种:
// 1.构建线程池时,核心线程数是0个
// 2.即便有核心线程,也可以设置核心线程超时(allowsCoreThreadTimeOut方法)
else if (workerCountOf(recheck) == 0)
// 添加一个非核心的工作线程去处理阻塞队列中的任务
addWorker(null, false);
// 到这代表线程池不为RUNNING或者添加到阻塞队列失败
// 构建一个非核心工作线程
} else if (!addWorker(command, false))
// 如果添加非核心线程失败,执行拒绝策略
reject(command);
}
execute方法流程图:
5.4.4 线程池的addWorker方法
addWorker方法主要分为两大块:
- 一:校验线程池的状态以及工作线程的个数
- 二:添加工作线程并启动工作线程
addWorker方法:
// addWorker有三种情况
// addWorker(command, true)
// addWorker(null, false)
// addWorker(command, false)
private boolean addWorker(Runnable firstTask, boolean core) {
// retry是给外层for循环添加一个标记位,是为了方便在内层for循环跳出外层for循环
retry:
// 外层for循环是校验线程池的状态
// 内存for循环是校验工作线程的个数
// ===============================校验线程池状态======================================
for (;;) {
// 获取ctl
int c = ctl.get();
// 拿到ctl的高三位值
int rs = runStateOf(c);
// 判断线程池的状态是否大于等于SHUTDOWN
// 因为只有RUNNING是-1,也就是小于SHUTDOWN,其他四种状态都是大于等于的
// 如果满足这个条件,也就是线程池不是RUNNING状态的
if (rs >= SHUTDOWN &&
// 下面这个判断是为了阻塞队列中有数据,但是没有工作线程去处理
// 如果线程池状态是shutdown,并且阻塞队列不为null,并且参数为null
// 那么就需要添加一个非核心工作线程去处理任务
// 下面这个条件就是满足addWorker(null, false)
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
// 不需要添加工作线程
return false;
// ===============================校验工作线程个数======================================
for (;;) {
// 基于ctl拿到低29位的值,代表当前工作线程个数
int wc = workerCountOf(c);
// 如果工作线程个数大于CAPACITY,就是低29位全为1
if (wc >= CAPACITY ||
// 如果是核心线程,就基于corePoolSize去判断
// 如果是非核心线程,就基于maximumPoolSize去判断
wc >= (core ? corePoolSize : maximumPoolSize))
// 代表不能添加,工作线程个数不满足要求
return false;
// 采用CAS的方式,对ctl进行+1
if (compareAndIncrementWorkerCount(c))
// CAS成功后,直接退出外层循环,代表可以添加工作线程
break retry;
// 防止有并发的情况,重新获取ctl的值
c = ctl.get();
// 获取线程池的状态,看是否与之前的ctl状态相同
// 相同说明没有改变,不同说明状态改变了
if (runStateOf(c) != rs)
// 状态改变了,说明要重新走一次外层for循环,进行状态改变
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// ===============================添加工作线程并启动工作线程======================================
// 声明了三个变量
// workerStarted:工作线程是否已启动,false代表默认没启动
boolean workerStarted = false;
// workerAdded:工作线程是否已添加,false代表默认没添加
boolean workerAdded = false;
// w:代表worker,工作线程
Worker w = null;
try {
// 构建工作线程,构建工作线程首先会先执行带来的任务,执行完才看阻塞队列中是否有任务执行
w = new Worker(firstTask);
// 获取worker中的Thread对象
final Thread t = w.thread;
// 判断Thread是否不等于null
// 在new Worker的时候,内部会通过ThreadFactory去构建Thread交给Worker
// 如果为null,代表ThreadFactory有问题
if (t != null) {
// 加锁,因为下面的workers成员变量是hashSet,线程不安全的
// 保证下面的workers和largestPoolSize赋值时的线程安全
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 保持锁定时重新检查
// 防止退出ThreadFactory失败
// 防止在锁获取之前,状态改变为shutdown
// 再次获取线程池状态,再次判断
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN:代表状态为RUNNING
if (rs < SHUTDOWN ||
// 如果状态为shutdown并且任务为null,也就是饥饿问题
(rs == SHUTDOWN && firstTask == null)) {
// t.isAlive()方法时判断线程是否正在启动,如果线程正在运行,返回true,否则返回false
// 这里如果为true的话,说明线程已经启动了,但是worker并没有启动,说明就有问题
// 校验ThreadFactory构建线程后,不能自己启动线程,如果启动了,就抛出异常
if (t.isAlive())
// 抛出线程异常
throw new IllegalThreadStateException();
// workers就是一个HashSet,所有的工作线程都在里面存着
// private final HashSet<Worker> workers = new HashSet<Worker>();
// 将new好的Worker添加到HashSet中
workers.add(w);
// 拿到工作线程的个数
int s = workers.size();
// largestPoolSize:记录最大线程个数
// 如果当前工作线程个数大于最大线程个数
if (s > largestPoolSize)
// 进行赋值
largestPoolSize = s;
// 添加工作线程成功,状态改变
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 如果工作线程添加成功
if (workerAdded) {
// 直接启动worker中的线程
t.start();
// 启动工作线程成功
workerStarted = true;
}
}
} finally {
// 如果工作线程启动失败
if (! workerStarted)
// 将添加的工作线程直接移除掉
addWorkerFailed(w);
}
// 返回工作线程是否成功启动
return workerStarted;
}
addWorkerFailed方法:
// 添加工作线程失败后操作
private void addWorkerFailed(Worker w) {
// 上锁,因为操作workers,是HashSet类型的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果添加的worker不为null
if (w != null)
// 从HashSet中移除
workers.remove(w);
// 工作线程-1
decrementWorkerCount();
// 因为工作线程启动失败,判断一下线程池状态
// 是不是可以走TIDYING状态,最终走TERMINATED状态
tryTerminate();
} finally {
// 释放锁
mainLock.unlock();
}
}
5.4.5 Worker工作线程
Worker对象主要包含了两部分内容:
- 工作线程要执行的任务
- 工作线程可能会被中断
// Worker继承了AQS,目的就是为了控制工作线程的中断
// Worder实现了Runnable,内部的Thread对象,在执行start方法时,必定要执行Worker中断的一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// 序列化
private static final long serialVersionUID = 6138294804551838833L;
// =============================Worker管理任务=========================================
// Worker中线程工场构建的线程
final Thread thread;
// 当前Worker要执行的任务
Runnable firstTask;
// 记录当前线程工场处理了多少个任务
volatile long completedTasks;
// Worker有参构造
Worker(Runnable firstTask) {
// 这个和中断状态有关,将state设置为-1,代表不允许中断
setState(-1);
// 赋值要执行的任务
this.firstTask = firstTask;
// 基于ThreadFactory构建Thread,this是Worker,将Worker当作Runnable传递进去
this.thread = getThreadFactory().newThread(this);
}
// 当Thread执行start方法时,执行的是Worker的run方法
public void run() {
// 任务执行时,执行的时runWorker方法
runWorker(this);
}
// =============================Worker管理中断=========================================
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 将state设置为0,每个worker都有自己的AQS,因此不需要加锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 当前方法是中断工作线程时,执行的方法
void interruptIfStarted() {
Thread t;
// 只有Worker中的state >=0的时候,才可以中断工作线程
// 线程不为null,中断标志位为false
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 中断线程,只是将中断标记位设置为true
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
5.4.6 runWorker方法
因为工作线程启动时,thread会启动start方法。然后走run方法,而run方法调用的是Worker中的run方法,Worker中的run方法调用的是runWorker方法。
runWorker方法就是让工作线程拿到任务去执行。
// 工作线程启动后执行的任务
final void runWorker(Worker w) {
// 拿到当前线程
Thread wt = Thread.currentThread();
// 由于线程池是懒加载,因此启动工作线程的时候,线程中自带有一个任务的
// 拿到worker中的任务
Runnable task = w.firstTask;
// 将线程中的firstTask置为null,要用任务就用上面的task
w.firstTask = null;
// Worker是基于AQS的,由于在有参构造中,将state设为-1,而只有当state>=0的时候,才能被中断
// 因此这里unlock会调用release方法,release方法会将state置为0,这样就可以被中断了
w.unlock(); // allow interrupts
// 判断工作线程是否是异常结束,默认就是异常结束
boolean completedAbruptly = true;
try {
// 获取任务
// 直接拿到第一个任务去执行
// 如果第一个任务为null的话,去阻塞队列中拿任务去执行
// getTask()就是去阻塞队列中拿任务
while (task != null || (task = getTask()) != null) {
// 执行worker的lock方法,上锁
// 在lock时,其他的操作不能中断当前线程,不可重入的锁,将state从0置为1
// shutdown操作不能中断处理任务操作,因为shutdown会尝试获取锁,是得不到的
w.lock();
// runStateAtLeast:比较第一个参数是否大于等于第二个参数
// 如果满足,说明线程池已经到了STOP状态以及之后的状态
if (
// 第一种:线程池到了STOP状态,并且当前线程还没有中断,就要确保当前线程是中断的,进到if内部执行中断方法
// 第二种:线程池没到STOP状态,也就是RUNNING或者SHUTDOWN状态
// 如果发现线程标记位是true,那么再次查看线程池状态是否大于STOP,那么就再次中断线程
// 总结来说,如果线程池状态是STOP及其之后的,那么就必须保证线程是中断的
(runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
// 如果为true,代表当前线程还没有中断
!wt.isInterrupted())
// 将当前线程中断
wt.interrupt();
try {
// 前置钩子函数,是空的,由自己自定义
// 如果需要在任务执行之前进行一些操作,就需要自定义实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
// 捕获一堆异常
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 后置钩子函数,在任务执行完之后的一些操作进行自定义
// 重写方法
afterExecute(task, thrown);
}
} finally {
// 任务执行完就丢掉任务
task = null;
// 当前工作线程处理完的任务+1
w.completedTasks++;
// 执行unlock方法,此时shutdown方法才可以中断当前线程
w.unlock();
}
}
// 到这代表工作线程正常结束了
// 正常结束的话,在getTask中会做一些操作,将ctl-1,代表工作线程-1
completedAbruptly = false;
} finally {
// 考虑干掉工作线程
processWorkerExit(w, completedAbruptly);
}
}
5.4.7 processWorkerExit方法
工作线程结束时做的额外处理。
// 工作线程结束前要执行的方法
// w就是工作线程对象
// completedAbruptly代表工作线程是否是异常结束,true代表异常结束
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果工作线程是异常结束
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 将工作线程总数-1
decrementWorkerCount();
// 获取锁,操作workers,也就是工作线程的HashSet,为了安全,进行加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当前工作线程处理的任务,累加到线程池处理任务的个数中
// 做一个总数统计
completedTaskCount += w.completedTasks;
// 将工作线程从HashSet中移除
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 只要工作线程凉了,就要查看线程池状态是否改变,往下面的状态进行改变
tryTerminate();
// 获取线程池状态
int c = ctl.get();
// runStateLessThan:判断c是否小于STOP
// 这里判断是否是RUNNING或是SHUTDOWN,是否处理阻塞队列中的任务
if (runStateLessThan(c, STOP)) {
// 如果是正常结束工作线程
if (!completedAbruptly) {
// 如果核心线程允许超时,那么min就是0,否则就是核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min==0,可能会出现没有工作线程,并且阻塞队列不为空
if (min == 0 && ! workQueue.isEmpty())
// min设置为1,代表至少有一个工作线程去处理任务
min = 1;
// 如果工作线程数大于min,那么就代表有工作线程去处理任务
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果是异常结束,那么就添加一个null任务的非核心线程去填补刚刚异常结束的工作
addWorker(null, false);
}
}
5.4.8 getTask方法
getTask()方法是获取阻塞队列中的任务方法。
在runWorker方法中执行。
工作线程在去阻塞队列获取任务前,要先查看线程池状态。如果线程池状态没问题,去阻塞队列poll或者take拿取任务。第二次循环时,不但要判断线程池状态,还要判断当前线程是否可以被干掉。
// 前半部分就是判断当前线程是否可以返回null,然后结束
// 后半部分就是从阻塞队列中拿取任务
private Runnable getTask() {
// timeOut默认值是false
boolean timedOut = false; // Did the last poll() time out?
// 剩下的所有都在死循环中
for (;;) {
// 拿到ctl
int c = ctl.get();
// 拿到线程池的状态
int rs = runStateOf(c);
// 如果线程池的状态是大于等于SHUTDOWN并且阻塞队列为空
// 或者线程池的状态是大于等于STOP
// 那么显然不需要处理阻塞队列任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 先将工作线程的个数-1,正常处理的情况下在这进行处理
// 如果是异常,就在runWorker中的processWorkerExit方法中扣除工作线程数量
decrementWorkerCount();
// 直接返回null
return null;
}
// 拿到工作线程个数
int wc = workerCountOf(c);
// 允许核心线程超时,那么timed为true
// 工作线程个数大于核心线程数,timed也为true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作线程数大于最大线程数(这种情况一般到不了)
// 第二个判断代表,只要工作线程数小于等于核心线程数,必然为false
// 即便工作线程数大于核心线程数,此时第一次循环也不会进入,因为timedOut第一次为false
if ((wc > maximumPoolSize || (timed && timedOut))
// 要么工作线程还有,要么阻塞队列为空,并且满足上面条件,才会走进if内部
&& (wc > 1 || workQueue.isEmpty())) {
// 正常结束,线程-1,因为是CAS操作,如果失败,重新走for循环
if (compareAndDecrementWorkerCount(c))
// 成功就return
return null;
// 失败就重新走for循环
continue;
}
// 到这代表工作线程从阻塞队列中拿取任务了
try {
// 如果是核心线程,timed是false
// 如果是非核心线程,timed是true方法
Runnable r = timed ?
// 非核心线程使用poll方法拿取任务
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 核心线程直接take拿取任务,没有就死等
workQueue.take();
// 如果任务不为null,直接返回这个任务去执行
if (r != null)
return r;
// 如果任务是null,没有拿到任务,将timedOut设置为true
// 在上面就可以返回null退出了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
5.4.9 shutdownNow方法
shutdownNow方法可以将线程池状态从RUNNING状态转换为STOP状态。
STOP状态是不会处理阻塞队列中的任务以及不会接收新的任务。会将正在处理中的任务进行中断。
shutdownNow方法:
// 因为shutdownNow不会处理阻塞队列中的任务,因此将全部任务全部返回
public List<Runnable> shutdownNow() {
// 声明返回结果
List<Runnable> tasks;
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 不关注这个方法,是系统操作访问资源策略使用的
checkShutdownAccess();
// 将线程池状态修改为STOP
advanceRunState(STOP);
// 中断工作线程,无论怎么样,强行直接中断
interruptWorkers();
// 将阻塞队列的任务全部扔到List集合中
tasks = drainQueue();
} finally {
// 释放锁
mainLock.unlock();
}
// 尝试修改线程池状态
tryTerminate();
// 返回任务
return tasks;
}
advanceRunState方法:修改线程池状态
// 修改线程池状态
private void advanceRunState(int targetState) {
// 死循环
for (;;) {
// 获取ctl的值
int c = ctl.get();
// 如果c大于等于targetState,返回true
// 如果当前线程池状态已经大于STOP,那就不需要修改了
if (runStateAtLeast(c, targetState) ||
// ctlOf:将两个参数进行或运算
// 不修改工作线程个数,但是将状态修改为STOP
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
interruptWorkers方法:中断工作线程
// 中断线程
private void interruptWorkers() {
// 因为是ReentrantLock,是可重入锁,因此直接中断所有线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历HashSet,拿到所有的工作线程,直接中断
for (Worker w : workers)
w.interruptIfStarted();
} finally {
// 释放锁
mainLock.unlock();
}
}
drainQueue方法:移除阻塞队列中所有方法
// 移除阻塞队列任务,内容全部扔到List集合中
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 阻塞队列自带的,直接清空阻塞队列,内容扔到List中
q.drainTo(taskList);
// 可能会出现任务丢失,需要重新遍历阻塞队列,重新扔
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
tryTerminate方法:查看当前线程池是否可以变为TIDYING或TERMINATED状态
// 查看当前线程池是否可以变为TIDYING或TERMINATED状态
final void tryTerminate() {
// 死循环
for (;;) {
// 拿到ctl
int c = ctl.get();
// 如果是RUNNING状态,直接告辞
if (isRunning(c) ||
// 如果状态是大于等于TIDYING,马上就要变成TERMINATED,也不需要管
runStateAtLeast(c, TIDYING) ||
// 如果是SHUTDOWN,并且阻塞队列不为空,说明还有任务要处理,也不需要管
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
// 出现上面三种状态都不需要干掉线程池
return;
// 如果还有工作线程
if (workerCountOf(c) != 0) {
// 再次中断线程
interruptIdleWorkers(ONLY_ONE);
// 等到工作线程全部结束,在尝试进入到TERMINATED状态
return;
}
// 到这说明工作线程为0,加锁,为了执行condition的释放操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状态修改为TIDYING,工作线程数为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 这个方法是空的,需要在线程池关闭后做一些额外操作,需要重写这些方法
terminated();
} finally {
// 执行terminated方法后,将状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 线程池提供了一个方法,主线程在提交一个任务到线程池之后,是可以继续做其他操作的
// 也可以让主线程提交任务后,等待线程池处理任务完毕后,在做后续操作
// 也就是让主线程执行awaitTermination进行等待,下面这个就是唤醒主线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
5.4.10 shutdown方法
shutdown方法可以将线程池从RUNNING状态修改为SHUTDOWN状态。
shutdown状态下,不会中断正在工作的线程,而且会继续处理阻塞队列中的任务。
shutdown方法:
// shutdown方法
public void shutdown() {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 系统资源访问策略,不需要关注
checkShutdownAccess();
// 将线程池状态修改为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
// 这个是专门为ScheduledThreadPoolExecutor准备的钩子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
// 释放锁
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
}
interruptIdleWorkers:中断空闲线程(和shutdownNow不同之处)
// onlyOne:false
// 中断空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程没有中断,就去尝试获取Worker的锁
// Worker锁是不可重入的,不会中断正在干活的线程,会中断空闲线程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 释放锁
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
5.5 线程池的核心参数设置
线程池的使用难度不大,难度在于线程池的参数配置。
主要难点在于任务是CPU密集型还是IO密集型,甚至还有混合型。
想调试出一个符合当前任务情况的核心参数,最好的方式就是测试。
因为每次修改项目都要重新部署太麻烦,可以实现一个动态监控以及修改线程池的方法。
线程池中提供了获取核心信息的get方法,同时也提供了动态修改核心属性的set方法。
或者采用一些开源项目比如hippo4j
https://github.com/opengoofy/hippo4j,并且可以和springboot进行整合。
仅供参考:
- CPU密集型:核心线程数 = CPU核数 + 1
- IO密集型:核心线程数 = CPU核数 * 2
5.6 线程池处理任务的核心流程
基于addWorker添加工作线程的流程切入到整体处理任务的位置。
5.7 ScheduleThreadPoolExecutor
5.7.1 ScheduleThreadPoolExecutor介绍
ScheduleThreadPoolExecutor是用来执行定时任务的线程池。能够实现延迟执行以及周期性执行的功能。
早期的定时任务是Timer类。但是Timer类是串行的等一些问题。而现在一般使用springboot的Schedule的方式或者采用QuartZ实现定时任务的功能。
每次新来一个任务,先丢到延迟队列中,然后再查看是否有工作线程是否小于核心线程数,如果是就创建一个核心线程处理任务。如果工作线程和核心线程都为0,那就创建一个非核心线程去处理任务。
5.7.2 ScheduleThreadPoolExecutor应用
ScheduledThreadPoolExecutor在构建时,直接调用了父类的构造方法。ScheduledThreadPoolExecutor的父类其实就是ThreadPoolExecutor。
ScheduledThreadPoolExecutor最多允许设置三个参数:
- 核心线程数
- 线程工场
- 拒绝策略
没有设置最大线程数、空闲时间以及单位、阻塞队列。默认的阻塞队列是DelayedWorkQueue,本质上其实就是延迟队列。DelayQueue其实就是一个无界队列,因此不需要设置最大线程数。
// 定时任务线程池的有参构造
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
应用:
package com.xqm.juc.threadPool;
import java.util.concurrent.*;
public class Test08_scheduledThreadPoolExecutor {
public static void main(String[] args) {
// 1.构建定时任务线程池
ScheduledThreadPoolExecutor pool=new ScheduledThreadPoolExecutor(4, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t=new Thread(r);
return t;
}
},new ThreadPoolExecutor.AbortPolicy());
// 2.应用
pool.execute(()->{
System.out.println("普通执行");
System.out.println("和其他执行线程池的execute没什么区别");
});
pool.schedule(()->{
System.out.println("延迟两秒执行");
},2, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(()->{
System.out.println("第一次执行为3s后,之后每隔2秒执行一次");
},3,2,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(()->{
System.out.println("和scheduleAtFixedRate方法一样");
System.out.println("第一次执行为3s后,之后每隔2秒执行一次");
},3,2,TimeUnit.SECONDS);
// scheduleAtFixedRate和scheduleWithFixedDelay的区别
pool.scheduleAtFixedRate(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一次执行为3s后,之后每隔5秒执行一次");
System.out.println("方法延迟时间计算,是从任务刚开始就进行计算,因为5s是大于2s的,因此之后都是5s执行一次");
},3,2,TimeUnit.SECONDS);
pool.scheduleWithFixedDelay(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一次执行为3s后,之后每隔7秒执行一次");
System.out.println("方法延迟时间计算,是从任务结束进行计算,因此是5+2,之后都是7s执行一次");
},3,2,TimeUnit.SECONDS);
}
}
5.7.3 核心属性和类
// 针对业务取消时的一些业务判断用到的标记
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;
// 计数器,如果两个任务的执行时间节点一模一样,可以根据这个序列来判断谁先执行
private static final AtomicLong sequencer = new AtomicLong();
// 获取当前系统时间的毫秒值
final long now() {
return System.nanoTime();
}
// 内部类,核心类之一
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 生成全局唯一序列,如果两个任务的时间一致,基于这个属性判断
private final long sequenceNumber;
// 任务执行的时间,单位为纳秒
private long time;、
// 1.period==0 这个任务是执行一次的任务
// 2.period > 0 代表是At
// 3.period < 0 代表是with
private final long period;
// 周期性执行,需要将任务重新扔回阻塞队列,基于当前属性拿到任务
RunnableScheduledFuture<V> outerTask = this;
int heapIndex;
// 构建schedule方法的任务
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 构建At和With方法的任务
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
....省略一堆代码
}
// 静态内部类,核心类之一
// 这个类就是DelayQueue,优先级队列和延迟队列
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {}
5.7.4 schedule方法
execute方法实际上还是调用的schedule方法,只不过设置的延迟时间是0秒,也就是放进堆中就是直接执行。
// execute方法,延迟时间是0秒,本质还是调用的schedule方法
// 也就是刚放进去就是在堆顶,直接被执行
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
5.7.4.1 schedule方法
// 延迟任务执行的时间,注意不是周期性执行,只执行一次
// command:任务
// delay:延迟时间
// unit:单位
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// 非空的健壮性校验
if (command == null || unit == null)
throw new NullPointerException();
// 将任务和延迟时间封装到一起,最终组装成ScheduledFutureTask对象
// 第一个方法:triggerTime,计算延迟时间,返回的是当前系统时间+延迟时间
// 第二个方法:ScheduledFutureTask,有参构造,封装成一个对象
// 第三个方法:decorateTask,直接返回任务,是让用户基于自身业务直接动态修改
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 执行当前的任务
delayedExecute(t);
// 返回FutureTask
return t;
}
5.7.4.2 triggerTime方法
// 对延迟时间做校验,如果小于0,那就设置为0,否则就是延迟时间
// 并且将时间转换为纳秒
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
5.7.4.3 triggerTime的重载方法
// 将延迟时间+当前系统时间
// 后面的校验是为了避免延迟时间超过long的取值范围
long triggerTime(long delay) {
return now() +
// 小于最大值的两倍,防止超过long的范围
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
5.7.4.4 overflowFree方法
private long overflowFree(long delay) {
// 拿到堆顶数据
Delayed head = (Delayed) super.getQueue().peek();
// 堆不为null
if (head != null) {
// 获取堆顶数据的延迟时间
long headDelay = head.getDelay(NANOSECONDS);
// 如果堆顶延迟时间小于0,并且延迟时间小于堆顶的时间
if (headDelay < 0 && (delay - headDelay < 0))
// 获取新的延迟时间
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
5.7.4.5 ScheduledFutureTask的有参构造方法
// 有参构造函数
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
// time就是任务要执行的时间
this.time = ns;
// 代表任务是延迟执行,不是周期执行
this.period = 0;
// 给唯一的一个序列
this.sequenceNumber = sequencer.getAndIncrement();
}
5.7.4.6 delayedExecute方法
// 执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 查看当前线程池是否还是RUNNING状态,如果不是RUNNING,那么新任务一概不处理
if (isShutdown())
// 直接拒绝新的任务
reject(task);
else {
// 直接将任务丢到延迟的阻塞队列中
super.getQueue().add(task);
// DCL的操作,再次查看线程池状态
// 如果线程池在添加任务到阻塞队列后,不是RUNNING状态了
if (isShutdown() &&
// task.isPeriodic():判断是否不为0,是0就是延迟执行。在execute中返回的是false
// canRunInCurrentRunState:如果状态是RUNNING,返回true,如果是SHUTDOWN,返回shutdownOK
!canRunInCurrentRunState(task.isPeriodic()) &&
// 如果任务不能执行,移除这个任务
remove(task))
task.cancel(false);
else
// 线程池状态正常,任务可以执行
ensurePrestart();
}
}
5.7.4.7 ensurePrestart方法
// 任务可以正常执行后做的操作
void ensurePrestart() {
// 拿到工作线程个数
int wc = workerCountOf(ctl.get());
// 如果工作线程数小于核心线程数
if (wc < corePoolSize)
// 添加空的核心线程
addWorker(null, true);
// 如果工作线程数为0
else if (wc == 0)
// 添加一个非核心线程
addWorker(null, false);
}
5.7.4.8 canRunInCurrentRunState方法
// 线程池状态不为RUNNING,查看任务是否能执行
// 延迟执行:periodic==false
// 周期执行:periodic==true
// continueExistingPeriodicTasksAfterShutdown:周期执行任务,默认为false
// executeExistingDelayedTasksAfterShutdown:延迟执行的任务,默认是true,可以执行的
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
5.7.4.9 isRunningOrShutdown方法
// 如果是延迟执行,shutdownOK默认为true
final boolean isRunningOrShutdown(boolean shutdownOK) {
// 获取线程池状态
int rs = runStateOf(ctl.get());
// 如果状态是RUNNING,返回true,正常可以执行
// 如果状态是SHUTDOWN,根据SHUTDOWN来决定
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
5.7.5 At和With方法
这两个方法在源码层面的第一个区别,就是在计算周期时间时,需要将这个值传递给period。基于正负数区别At和With。因此只需要学习其中一个就行。
5.7.5.1 At方法
// At方法
// command:任务
// initialDelay:第一次执行的延迟时间
// period:任务的周期执行时间
// unit:上面两个时间的单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
// 健壮性校验
if (command == null || unit == null)
throw new NullPointerException();
// 周期不能小于等于0
if (period <= 0)
throw new IllegalArgumentException();
// 将任务第一次执行时间和后续的时间封装好
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
// 扩展口,可以对任务做修改
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// 周期性任务,需要在任务执行完之后,重新仍回到阻塞队列
// 为了方便拿任务,将任务设置到outerTask成员变量中
sft.outerTask = t;
// 和schedule方法一样的方式
// 如果任务刚扔到阻塞队列,线程池状态变为SHUTDOWN,默认情况下,当前任务不执行
delayedExecute(t);
return t;
}
5.7.5.2 run方法
// 延迟任务以及周期任务在执行时,都会调用当前任务的run方法
public void run() {
// isPeriodic:返回不等于0
// periodic==false:延迟任务
// periodic==true:周期任务
boolean periodic = isPeriodic();
// 任务执行前,会再次判断状态,能否执行任务
if (!canRunInCurrentRunState(periodic))
// 取消任务
cancel(false);
// 判断是周期任务还是延迟任务,进入if中代表是延迟任务,那么就直接执行
else if (!periodic)
// 直接执行任务
ScheduledFutureTask.super.run();
// 到这代表是周期任务
// runAndReset是futureTask中方法,放在异步编程中介绍
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次任务执行时间
setNextRunTime();
// 将任务重新扔回线程池
reExecutePeriodic(outerTask);
}
}
5.7.5.3 setNextRunTime方法
// 设置下次任务执行时间
private void setNextRunTime() {
long p = period;
// 大于0代表是At
if (p > 0)
// 拿着之前的执行时间,直接追加上周期时间,不关注任务内部的时间
// 比如之前10:00:00执行第一次任务,周期为3s,那么第二次执行的时间为10:00:03,
// 但是如果任务内部执行时间为4s,当任务结束的时候,时间为10:00:04,再次判断执行
// 时间的话,由于10:00:04超过10:00:03,就直接执行
time += p;
// 否则就是With
else
// 如果是with,就是重新计算延迟时间
// 就是拿着系统时间,追加上延迟时间
// 比如之前10:00:00执行第一次任务,周期为3s,任务内部执行时间为4s,
// 那么任务执行完之后时间为10:00:04,计算第二次执行时间为10:00:07
time = triggerTime(-p);
}
5.7.5.4 reExecutePeriodic方法
// 将任务重新扔回线程池
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 如果任务状态校验后可以执行
if (canRunInCurrentRunState(true)) {
// 将任务添加到延迟队列
super.getQueue().add(task);
// dcl,判断线程池状态
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 添加工作线程
ensurePrestart();
}
}