为什么需要线程池?
计算机资源是有限的,重复创建和销毁线程会浪费系统资源,并且无限制创建线程会导致系统资源耗尽
什么是线程池?
基于池化技术来统一创建、销毁、复用、管理线程的工具
使用线程池的优点
降低资源消耗、提高响应速度、提升线程的可管理性
创建线程次主要有两种方式
使用 Executors 类提供的静态方法
使用 ThreadPoolExecutor 构造方法
ThreadPoolExecutor 构造方法参数
总共有七个参数:
int corePoolSize 核心线程数
int maximumPoolSize 最大线程数
long keepAliveTime 线程空闲时间
TimeUnit unit 线程空闲时间单位
BlockingQueue<Runnable> workQueue 阻塞队列
ThreadFactory threadFactory 线程工厂
RejectedExecutionHandler handler 拒绝策略
ThreadFactory 线程工厂
ThreadFactory 不是必须参数,自定义 ThreadFactory 可以定制线程名称
RejectedExecutionHandler 拒绝策略
RejectedExecutionHandler 不是必须参数,默认为 AbortPolicy
DiscardPolicy 直接丢弃任务
DiscardOldestPolicy 丢弃队列头的任务
CallerRunsPolicy 直接在调用线程执行任务
AbortPolicy 抛出 RejectedExecutionHandler 异常
Executors 类提供的静态方法要慎用
Executors.newFixedThreadPool() 和 Executors.newSingleThreadExecutor() 没有规范线程池阻塞队列大小
Executors.newCachedThreadPool() 允许的最大线程数是 Integer.MAX_VALUE
这些方式都有 OOM 风险,在生产代码中应当谨慎使用
线程池执行流程
当线程池创建完毕后,池内会始终保持核心线程
当向线程池提交任务,如果核心线程都在使用中,就存放到阻塞队列等待
如果阻塞队列满了,就按照最大线程数创建新的非核心线程去执行任务
如果阻塞队列满了,并且也达到了最大线程数的限制,就会使用拒绝策略处理任务
非核心线程空闲下来后会按照设置的空闲时间进行销毁
提交任务
execute Runnable
// 创建 threadPool
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 15, 10, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100));
// execute
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
submit Runnable
Future<?> submitRunnable = threadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
submit Runnable 并指定返回内容
Future<Integer> submitRunnableWithResult = threadPool.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}, 100);
submit Callable
Future<String> submitCallable = threadPool.submit(new Callable<String>() {
@Override
public String call() {
System.out.println(Thread.currentThread().getName());
return "submitCallable";
}
});
批量提交任务
invokeAll:提交 Callable 集合,返回 Future 集合,可以指定超时时间
ArrayList<Callable<Integer>> callables = new ArrayList<>();
for (int i = 0; i < 5; i++) {
int res = i;
callables.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return res;
}
});
}
try {
List<Future<Integer>> futures = threadPool.invokeAll(callables);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
invokeAny:提交 Callable 集合,返回第一个完成的 Future 结果,可以指定超时时间
try {
Integer res = threadPool.invokeAny(callables, 100, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
获取任务执行结果
try {
String str = future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
get 方法会阻塞当前线程,等待结果的返回
get 方法也可以加入超时时间
boolean done = future.isDone();
isDone 可以判断 任务是否执行完成
Future cancel()
cancel 方法传入一个布尔值,代表是否中断正在执行的任务
Future isCancelled()
返回是否被取消
cancel 可能产生的几种结果
取消未执行的任务:返回取消成功,之后 get 会抛出 CancellationException
取消已完成的任务:返回取消失败
取消正在执行的任务:返回取消成功,之后 get 会抛出 CancellationException,但是线程是否真的停止,需要看有没有响应中断
ThreadPoolExecutor shutdown()
拒绝新任务
正在执行的任务和队列中的任务还会继续执行
ThreadPoolExecutor shutdownNow()
拒绝新任务
尝试中断正在执行的任务
不会执行队列中的任务,直接返回任务 Runnable 集合
ThreadPoolExecutor awaitTermination()
awaitTermination 方法需要设置一个超时时间
在超时时间内,阻塞等待线程池关闭,到了超时时间就检查线程池是否停止并返回布尔值
ThreadPoolExecutor 中定义了线程池的 6 种状态
RUNNING 正常运行
SHUTDOWN 不接收新任务,继续执行队列任务
STOP 不接收新任务,不执行队列任务,中断正在执行的任务
TIDYING 开始清理,所有任务已停止,活跃线程为 0
TERMINATED 线程池彻底关闭
线程池状态的转化
shutdown 方法 RUNNING -> SHUTDOWN
shutdownNow 方法 RUNNING -> STOP
SHUTDOWN 或 STOP 工作线程为空时,进入 TIDYING 状态,做清理回收工作
TIDYING 完成后调用 terminated 方法,彻底关闭线程池
线程池状态监控常用方法
isShutDown 调用 shutdown/shutdownNow 方法后返回 true
isTerminated 线程池彻底关闭后返回 true
getActiveCount 返回正在工作的线程数
getPoolSize 返回当前存在的线程数
getlargestPoolSize 返回历史最大的线程数
getQueue 返回阻塞队列
getTaskCount 返回已提交任务数
getCompletedTaskCount 返回已完成任务数
创建定时任务线程池 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor,实现了 ScheduledExecutorService
// 1
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(10);
// 2
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(10);
执行延迟任务
延迟执行 Runnable
ScheduledFuture<?> scheduledRunnableFuture = scheduled.schedule(new Runnable() {
@Override
public void run() {
System.out.println("延迟执行 Runnable");
}
}, 2, TimeUnit.SECONDS);
延迟执行 Callable
ScheduledFuture<?> scheduledCallableFuture = scheduled.schedule(new Callable<String>() {
@Override
public String call() {
System.out.println("延迟执行 Callable");
return "scheduledCallableFuture";
}
}, 2, TimeUnit.SECONDS);
执行周期任务
scheduleAtFixedRate 以指定频率执行任务
接收 Runnable,指定第一次执行的延迟时间,以及任务开始执行的间间隔
ScheduledFuture<?> fixedRateFuture = scheduled.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}, 1,2, TimeUnit.SECONDS);
scheduleWithFixedDelay 以指定任务间隔执行任务
接收 Runnable,指定第一次执行的延迟时间,以及上一个任务执行完成和下一个任务开始的时间间隔
ScheduledFuture<?> fixedDelayFuture = scheduled.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}, 1,2, TimeUnit.SECONDS);