Java 线程池
就像数据库连接可以使用连接池管理一样,Java 中的线程也可以使用线程池来管理。本文介绍在 Java 中如何使用线程池,以及有哪些线程池。
为什么需要线程池
每个线程的创建和销毁,都会消耗一定的系统资源,尤其在高并发的系统中,频繁创建和销毁线程会造成大量的资源浪费。
那么,为了避免频繁的创建和销毁线程,就可以在系统启动时,预先创建好一定数量的线程,并将其交由线程调度器管理,这就是线程池。
怎么使用线程池
依旧是用一个示例来演示。
1 | public class MyRunnable implements Runnable { |
1 | public class Main { |
运行后得到如下结果:
1 | Thread pool created |
几种线程池的简介
Single thread pool
是用 Executors.newSingleThreadExecutor()
创建。
该线程池仅有一个线程,并拥有一个无边界的队列。提交到队列中的任务将会按顺序执行。如果当前线程在执行过程中出现错误而被终止,那么线程池会创建一个新的线程并继续执行队列中后续的任务。
与使用 Executors.newFixedThreadPool(1)
创建的线程池不同的是,我们不能为 single thread pool 分配更多的线程数。
Fixed thread pool
使用 Executors.newFixedThreadPool(int nThreads)
创建。
该线程池维护着固定数量的线程 (nThreads 个),在任何时间只允许最多 nThreads 个线程执行任务,多出来的任务将会在队列中等待,直到有空闲的线程出现。如果其中一个线程在执行过程中因为错误而异常退出,则线程池会立刻创建一个新的线程并执行后续的任务。
该线程池在显式关闭 (ExecutorService#shutdown
) 前将一直存在。
Work stealing pool
使用 Executors.newWorkStealingPool(int parallelism)
或 Executors.newWorkStealingPool()
创建。
该线程池无法保证各个被提交的任务将会以何种顺序执行。
newWorkStealingPool(int parallelism)
该方法将根据给定的 “并行量 (parallelism)”,来创建一个包含足够数量线程的线程池,并会使用多个队列来减少线程与队列的争抢。
“并行量” 的值对应于最多允许参与执行任务的线程数量。但实际存在的线程数可能会动态的增减。
Executors.newWorkStealingPool()
将所有的 “可用的处理器” 的数目作为 “并行量” 来创建线程池。
可用的处理器数量使用 Runtime.getRuntime().availableProcessors()
获取,其值等同于 CPU 中逻辑处理器的数量。
Cached thread pool
使用 Executors.newCachedThreadPool()
创建。
当接收到新的任务后,线程池会根据有无可用线程,来决定使用线程池中的空闲线程,或者在线程池中创建新的线程。
如果线程池中有线程空置超过 60 秒,则该线程就会被终止并从线程池中移除。
Scheduled thread pool
使用 Executors.newScheduledThreadPool(int corePoolSize)
创建。corePoolSize
为线程池中保持的线程数。
该线程池可以指定一个延迟,或指定一个周期,并按照这个计划执行任务。
线程池是如何工作的
那么,在调用 ExecutorService#execute(Runnable)
之后,线程池究竟做了些什么呢?
在 ThreadPoolExecutor#execute(Runnable)
方法中,有这样一段注释:
Proceed in 3 steps:
- If fewer than corePoolSize threads are running, try to
start a new thread with the given command as its first
task. The call to addWorker atomically checks runState and
workerCount, and so prevents false alarms that would add
threads when it shouldn’t, by returning false.- If a task can be successfully queued, then we still need
to double-check whether we should have added a thread
(because existing ones died since last checking) or that
the pool shut down since entry into this method. So we
recheck state and if necessary roll back the enqueuing if
stopped, or start a new thread if there are none.- If we cannot queue task, then we try to add a new
thread. If it fails, we know we are shut down or saturated
and so reject the task.
也就是说:
flowchart TD check_running_threads_number{检查正在运行的线程数是否小于corePoolSize}; start_new_thread("调用addWorker尝试开启一个新的线程,并将这个runnable作为第一个task交给这个线程"); try_enqueue_the_task{"尝试将任务加入队列,并重新调用addWorker再次检查能否加入新的worker"}; reject_the_task("此时线程池已经饱和,或者正在被关闭,所以拒绝掉这个task"); 开始 --> check_running_threads_number check_running_threads_number -- 是 --> start_new_thread start_new_thread -- 成功 --> 结束 start_new_thread -- 失败 --> try_enqueue_the_task check_running_threads_number -- 否 --> try_enqueue_the_task try_enqueue_the_task -- 成功 --> 结束 try_enqueue_the_task -- 失败 --> reject_the_task reject_the_task --> 结束
这里要注意一个比较容易引起误解的点,就是在 core pool size < 当前正在运行的线程数量 < max pool size
时,如果有新的任务进来,那么这个任务首先会被放入队列。仅当队列满了的时候,线程池才会为这个新来的任务创建新的线程。
参考文章
Thread pools and work queues
Core pool size vs maximum pool size in ThreadPoolExecutor - StackOverflow