前言
在并发很大的情况下,频繁创建线程与销毁线程会带来很大的消耗,线程池就是在解决这个问题的。
线程构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
各参数的意义
- corePoolSize :核心线程的数量
- maximumPoolSize :最大线程数
- keepAliveTime :闲置线程被回收的时间限制
- unit :时间单位(ms、us什么的)
- workQueue :消息队列(存储没有线程的任务,存储的任务需等待其他任务完成释放的线程)
- threadFactory :线程工厂,用来给线程取个有意义的名字
- handler :拒绝策略,即当加入线程失败,采用该 handler 来处理
corePoolSize 与 maximumPoolSize
当我们看到 corePoolSize 与 maximumPoolSize 的时候,可能会感到疑惑,那么当前线程数 poolSize 与 corePoolSize 和 maximumPoolSize 有什么关系呢?
当新提交一个任务时:
- 如果 poolSize < corePoolSize,新增加一个线程处理新的任务。
- 如果 poolSize = corePoolSize,新任务会被放入阻塞队列等待。
- 如果阻塞队列的容量达到上限,且这时 poolSize < maximumPoolSize,新增线程来处理任务。
- 如果阻塞队列满了,且 poolSize = maximumPoolSize,那么线程池已经达到极限,会根据饱和策略 RejectedExecutionHandler 拒绝新的任务。
线程池状态
常量与状态的源码
//高3位保存线程池状态,低29位保存线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//位移29位,即29 = Integer.Size - 3
private static final int COUNT_BITS = Integer.SIZE - 3;
//0001 1111 1111 1111 1111 1111 1111 1111
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//111 后29位全为0
private static final int RUNNING = -1 << COUNT_BITS;
//000 后29位全为0
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001 后29位全为0
private static final int STOP = 1 << COUNT_BITS;
//010 后29位全为0
private static final int TIDYING = 2 << COUNT_BITS;
//011 后29位全为0
private static final int TERMINATED = 3 << COUNT_BITS;
//c:开头三位为000,后29位为线程数。~COUNT_MASK:111 后29位为0。
//c & ~COUNT_MASK得一个高3位为状态,低29位线程数的值,主要是根据c获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//c:同上。
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//一般用于设置状态并转移线程数,用于advanceRunState方法中。
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的5个状态
上面的代码是在jdk11里的,线程的池的状态有 RUNNING
、 SHUTDOWN
、 STOP
、 TIDYING
、 TERMINATED
这五种状态,他们都是使用int的高3位来储存线程池的状态,以低29位数来储存线程数,即最大线程数位2的29次方-1。
- RUNNING :接受新的任务并且处理队列的任务。
- SHUTDOWN :不接受任务,但处理已经进入队列的任务。
- STOP :不接受新任务,也不处理队列里的任务,并且中断正在执行的任务。
- TIDYING :所有任务执行完成。
- TERMINATED : terminated() 已经执行完成。
状态间的相互转换
RUNNING
->SHUTDOWN
:可以调用 shutdown 方法RUNNING
或者SHURDOWN
-> STOP:调用 shutdownNowSHUTDOWN
->TIDYING
:当队列和线程池为空STOP
->TIDYING
:当线程池为空TIDYING
->TERMINATED
:当terminated()钩子方法执行完成
public void shutdown() {
//上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置线程状态,即ctl
advanceRunState(SHUTDOWN);
//给所有工作线程设置中断
interruptIdleWorkers();
//空方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终止
tryTerminate();
}
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置线程状态,即ctl
advanceRunState(STOP);
//给所有工作线程设置中断
interruptWorkers();
//移除队列里的所有任务,tasks装的是所有未完成的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止
tryTerminate();
return tasks;
}
final void tryTerminate() {
for (;;) {
//获取当前线程池,线程数
int c = ctl.get();
//检查状态,c<RUNNING || c>=TIDYING || (c>=STOP && 队列不为空) 不进行处理
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
//当前线程数不为零
if (workerCountOf(c) != 0) { // Eligible to terminate
//只中断一次worker线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果上面取得的线程数c与当前线程数ctl相等,将ctl设置为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//terminated()方法完成,设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
execute源码分析
execute()方法主要分为以下四种情况:
- 情况1: 如果线程池内的有效线程数少于核心线程数 corePoolSize, 那么就创建并启动一个线程来 执行新提交的任务.
- 情况2: 如果线程池内的有效线程数达到了核心线程数 corePoolSize,并且线程池内的 阻塞队列未满, 那么就将新提交的任务加入到该阻塞队列中.
- 情况3: 如果线程池内的有效线程数达到了核心线程数 corePoolSize 但却小于最大线 程数 maximumPoolSize, 并且线程池内的阻塞队列已满, 那么就创建并启动 一个线程来执行新提交的任务.
- 情况4: 如果线程池内的有效线程数达到了最大线程数 maximumPoolSize, 并且线程 池内的阻塞队列已满, 那么就让 RejectedExecutionHandler 根据它的拒 绝策略来处理该任务, 默认的处理方式是直接抛异常.
execute源码
public void execute(Runnable command) {
int c = ctl.get();
//当前线程数与核心线程比较,小于核心线程
if (workerCountOf(c) < corePoolSize) {
//添加工作线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断线程池状态是否为RUNNING与向队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//双重检查,如果状态不为RUNNING且移除任务成功
if (! isRunning(recheck) && remove(command))
//使用拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
//添加工作线程失败,使用拒绝策略
reject(command);
}
addWorker源码分析
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
/**
* 1.状态为STOP以上的
* 2.状态为SHUTDOWN,任务不为null的
* 3.状态为SHUTDOWN,队列为空的
*/
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
//core为true,即当前线程数比较corePoolSize,用于前面的情况1,2
//core为false,即当前线程数比较maximumPoolSize,用于前面的情况3,4
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//ctl加1,不成功一直循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//线程开启状态
boolean workerStarted = false;
//任务添加状态
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
//判断是否运行状态,或者
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将work加入HashSet里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//添加线程失败,ctl减1,移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
线程消费队列里任务
ThreadPoolExecutor
里Worker
是线程的承载者,它继承与AQS
,实现了Runnable
的 run()方法.
public void run() {
runWorker(this);
}
runWorker源码分析
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环从队列获取任务
// 由前边可知, task 就是 w.firstTask
// 如果 task为 null, 那么就不进入该 while循环, 也就不运行该 task. 如果
// task不为 null, 那么就执行 getTask()方法. 而getTask()方法是个无限
// 循环, 会从阻塞队列 workQueue中不断取出任务来执行. 当阻塞队列 workQueue
// 中所有的任务都被取完之后, 就结束下面的while循环.
while (task != null || (task = getTask()) != null) {
//锁住该线程
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
//执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 将 task 置为 null, 这样使得 while循环是否继续执行的判断, 就只能依赖于判断
// 第二个条件, 也就是 (task = getTask()) != null 这个条件, 是否满足.
task = null;
//任务运行完成后,在完成任务数上+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask源码分析
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
//获取线程状态与线程数
int c = ctl.get();
// Check if queue empty only if necessary.
//1.线程状态为STOP以上,这时不处理队列里的任务
//2.线程状态为SHUTDOWN,且队列为空,无任务可处理
//上面两种情况,直接线程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
//线程数减一
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
拒绝策略
CallerRunsPolicy
:直接在本线程运行AbortPolicy
:直接抛异常DiscardPolicy
:不处理DiscardOldestPolicy
:去除队列中的一个,执行execute
方法- 其他:可以实现
RejectedExecutionHandler
接口,自定义拒绝策略
不同线程池的优劣
Executors类则扮演着线程池工厂的角色,通过 Executors可以取得一个拥特定功能的线程池。
- newFixedThreadPool::创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程 数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
- newCachedThreadPool:创建一个可缓存的线程池。可创建的线程池基本没有限制,但如果线程空闲超过一定的时间,该 线程会自动终止。
- newSingleThreadExecutor:创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异 常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务, 并且在任意给定的时间不会有多个线程是活动的 。
- newScheduleThreadPool:newScheduleThreadPool创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor 与 newScheduleThreadPool 分别被 FinalizableDelegatedExecutorService 与 ScheduledThreadPoolExecutor 包装。
关于线程池状态的思考
- 2019/03/06:为什么不把状态与线程数分开?为什么不用byte或者String来存储状态,用int来存储?
- 2019/03/09:addWorker的if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) 为什么允许SHOTDOWN状态时添加任务?
参考
链接 : https://blog.csdn.net/cleverGump/article/details/50688008