0%

Java - 线程池概述

阿里巴巴的Java代码规范中,对于线程池的使用有这么一条强制规范:

为什么使用Executors创建的线程池会导致OOM?怎样才是正确使用ThreadPoolExecutor创建线程池的姿势?

为什么要有线程池

从更广义的角度来说,我们常用的线程池,连接池等等,都可以被称之为资源池,他有如下优点:

  • 减少资源创建的开销

    如果没有资源池,那么各个模块会频繁的创建资源,并且使用过后就回收了,需要用的时候需要再次创建。而资源的创建,比如创建线程、新建一个连接等等,往往都是有开销的。

    资源池创建资源之后,可以供各个模块使用,用完了之后暂时不会释放,会放回资源池供其他模块使用,大大减少了资源创建的次数;

  • 避免影响应用稳定性

    如果对应用内的资源创建不加一约束,各自模块可以随意创建资源,那么有可能会导致过度使用系统给应用分配的资源,影响引用稳定性。

    资源池可以控制资源的最大数量,避免影响应用稳定性。

ThreadPoolExecutor

Executors.java文件中,不管是返回代码规范中的哪种ThreadPool,都会涉及到ThreadPoolExecutor对象,只是构造参数不同,因此我们先来看看ThreadPoolExecutor的构造函数:

1
2
3
4
5
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handle);

其中:

  • corePoolSize:核心线程数量;

  • maximumPoolSize:线程的最大数量;

  • keepAliveTime:非核心线程存活时间,设置为0则表示超出核心线程数量的空闲线程会被立即释放;

    还有一个成员变量,控制核心线程是否回收:allowCoreThreadTimeOut,默认为false,表示核心线程不会被释放,但是如果设置了此属性为true,则核心线程空闲超过keepAliveTime之后,也会被释放。

    如果allowCoreThreadTimeOut为true,keepAliveTime为0,则会抛异常提示Core threads must have nonzero keep alive times

  • unit:时间单位;

  • workQueue:线程池所使用的缓冲队列;

  • threadFactory:线程池创建线程使用的工厂;

  • handler:线程池对拒绝任务的处理策略;

参数的作用如下:

2

workQueue

workQueue是一个实现BlockingQueue接口的对象,作为线程数量达到核心线程数之后,缓存任务的队列。一些常用的队列有:

  • ArrayBlockingQueue

    一个以数组方式实现的先进先出的有界队列,需要在构造对象的时候指定队列容量,并且不支持扩容。

  • LinkedBlockingQueue

    一个以链表方式实现的队列,在构造对象的时候如果不传入队列容量的话,则队列默认容量为Integer.MAX_VALUE,设置了容量大小之后,不支持扩容。

  • SynchronousQueue

    SynchronousQueue是一个不存储元素的队列,插入元素的线程会被阻塞,直到有另外一个线程读取了数据;同理,读取数据的线程也会被阻塞,直到有另外一个线程插入了一个元素。

  • LinkedTransferQueue

    这是一个结合了LinkedBlockingQueue和SynchronousQueue特点的队列,除了有正常的Queue的接口之外,还实现了TransferQueue接口,使用接口中transfer相关的函数时,如果有消费者在阻塞等待,则元素不会存入队列,而是直接传递给消费者;反之如果没有消费者在阻塞,则元素会入队列,并且阻塞线程,直到元素被消费者获取之后才会返回。

  • PriorityBlockingQueue

    一个支持按照元素优先级排序的无界队列,默认按照元素的自然顺序排列,也可以自定义Comparator来定义排序规则。由于元素会按照优先级排序,所以队列的出队顺序与元素的入队顺序没有关联。

  • DelayQueue

    一个利用优先级队列实现的无界队列,队列中的元素需要实现Delayed接口,定义元素的延迟时间,以及元素之间的Comparator逻辑。

    一般来说,Comparator会与元素的延迟时间逻辑保持一致,最快要执行的任务,应该排到队列头。

在选择队列的时候,除了根据业务需求进行选型之外,还要注意队列的有界/无界性,一个无界队列意味着队列中可以一直增加元素,这样的行为可能会耗尽应用程序所申请的内存大小,从而影响应用的稳定性。

threadFactory

一般情况下,建议自定义一个实现了ThreadFactory的类,重写Thread newThread(Runnable r);方法,并且自定义线程的name,会极大的利于业务调试。实现方式可以参考Executors中的DefaultThreadFactory类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

handler

handler定义了当线程数达到maximumPoolSize,并且队列已满的情况下,再提交任务时的拒绝策略。我们先看看接口定义:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

其中:

  • r:提交的任务;
  • executor:提交的线程池。

当触发拒绝策略时,会以提交任务的线程来执行handler,JDK自带的有4种拒绝策略:

  • CallerRunsPolicy(调用者运行策略)

    1
    2
    3
    4
    5
    6
    7
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    r.run();
    }
    }
    }

    此策略下,会判断线程池是否已经关闭,如果没有关闭,则使用提交任务的线程来执行任务。由于是用提交任务的线程来执行任务,所以当触发拒绝策略时,对提交任务的线程而言,相当于是一个同步调用操作。

  • AbortPolicy(中止策略)

    1
    2
    3
    4
    5
    6
    7
    public static class AbortPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    e.toString());
    }
    }

    此策略下,会抛出一个非受检异常,这也是ThreadPoolExecutor的默认拒绝策略。在使用这种策略时,一定要注意捕获异常,否则会打断当前线程。

  • DiscardOldestPolicy(丢弃最老任务策略)

    1
    2
    3
    4
    5
    6
    7
    8
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
    e.getQueue().poll();
    e.execute(r);
    }
    }
    }

    从名字就不难看出,当选择这种策略时,会弹出队列头部的元素,然后尝试执行。

    这里两步操作不存在并发的问题,如果第一步弹出之后,被其他提交的任务入队列了,那么e.execute(r)又会触发一次拒绝策略,而入队操作本身有CAS来保证多线程同步的问题。

  • DiscardPolicy(丢弃策略)

    1
    2
    3
    4
    public static class DiscardPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    }

    啥也没有,即表示丢弃当前提交的任务。

一般来说,是要杜绝触发拒绝策略的,所以当发生拒绝策略时,最好是弄清楚触发的原因,从而优化线程池,Dubbo框架中自定义的拒绝策略,有很高的参考价值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static Semaphore guard = new Semaphore(1);

public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}

private void dumpJStack() {
//省略实现
}
}

当触发拒绝策略时,AbortPolicyWithReport最终还是会抛出一个非受检异常,但是在抛出之前,会尽量将现场的信息都打印出来,供开发者定位触发拒绝策略的原因。

核心源码分析

讲逻辑代码之前,先来看一个重要的成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}

ctl,是一个32位的整型,其中高三位用来标识线程池的状态,低29位用来标识当前线程的数量。

先来看看提交一个任务时的入口逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 判断线程数量是否小于核心线程数量
if (workerCountOf(c) < corePoolSize) {
// 如果是,则调用addWorker添加线程执行任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// 走到这一步说明线程数量大于等于核心线程,或者addWorker失败
// 则向队列中追加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次检查,如果线程池为非RUNNING状态,则移除任务并触发拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 执行到这里,说明队列已满,添加失败,则调用addWorker添加线程执行任务
else if (!addWorker(command, false))
// 添加失败,则触发拒绝策略
reject(command);
}

所以新建线程并且执行任务的逻辑在addWorker函数里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Worker,内部类,继承自AQS,并且实现了Runnable接口
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;

Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

// run函数最终调用到了ThreadPoolExecutor的runWorker函数
public void run() {
runWorker(this);
}
}

// core为true表示往核心线程中添加任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);

// 当满足如下情况是,添加新任务失败:
// 1. rs为STOP、TIDYING、TERMINATED;
// 2. rs>=SHUTDOWN,并且firstTask!=null;
// 3. rs>=SHUTDOWN,并且缓存队列为空
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 如果满足一下条件,则添加新任务失败:
// 1. 如果线程数量大于29位能表达的最大刷领
// 2. 或者线程数量大于最大数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS将线程数量增加1
if (compareAndIncrementWorkerCount(c))
// 添加成功之后,跳出retry;失败则继续for (;;)循环代码
break retry;
c = ctl.get();
// 如果CAS失败,并且线程池状态发生变化,则continue外层循环
// 否则则只是CAS失败,可能是CAS冲突,继续执行内层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

// 执行到这里,表示线程数已经添加,可以新建一个worker了。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一个Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 并发访问线程池的workers变量,因此需要加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

// 判断线程池状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将Worker添加到workers中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动新添加的Worker,首先执行firstTask,然后不停的从队列中取任务执行
// start()方法会调用到Worker的run函数中,最终调用到外层的runWorker函数
t.start();
workerStarted = true;
}
}
} finally {
// 线程启动失败,则从wokers中移除刚添加的worker并递减wokerCount
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker函数负责创建一个线程,并启动线程执行任务,执行任务的代码最终会调用到ThreadPoolExecutor的runWorker函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 创建Worker时,执行的setState(-1)禁止了中断,所以这里需要unlock,从而允许中断
w.unlock();
// 表示任务是否异常退出,默认为true,如果正常执行完毕,则会被设置为false
boolean completedAbruptly = true;
try {
// 优先执行构造Worker时的task;执行完毕之后使用getTask取任务
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 调用任务执行前的钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 没有任务可执行,执行清理工作
processWorkerExit(w, completedAbruptly);
}
}

线程会优先执行初始化Worker时传入的task,执行完毕之后,会不断取出新的task并执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 满足一下情况,则不会取任务
// 1. rs >= STOP
// 2. RS = SHUTDOWN,并且队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 将线程数减1,然后在外面会调用processWorkerExit来清理线程
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 如果核心线程也会被回收,或者线程数量大于核心线程数,则表示后面的逻辑需要有超时机制
// 不能让线程在没有任务处理的时候一直阻塞
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 使用超时或者阻塞的方式获取任务,取决于timed变量
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

为啥禁止用Executors

理解了ThreadPoolExecutor的工作方式之后,我们再来看看为啥强制不允许Executors来创建线程池。

Executors将ThreadPoolExecutor的构造函数进行封装,然后提供了一些参数更少的静态函数,用以返回几种适用于不同场景的线程池。

  • FixedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }

    FixedThreadPool的核心线程数等于总线程数,即是所有线程都是核心线程,因此keepAliveTime参数也并不会生效。同时线程池使用了一个无界队列,当线程都在运行时,任务会堆积在队列中。这种线程池适合任务量比较固定的场景。

  • SingleThreadPool

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }

    SingleThreadPool只有一个永不被释放的核心线程来执行任务,并且使用了一个无界队列。这种线程池适合需要顺序执行任务的场景。

  • CachedThreadPool

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

    CachedThreadPool的核心线程数为0,线程总数为Integer.MAX_VALUE,并且由于SynchronousQueue是无法存储任务的,因此提交任务时,便会立即执行。如果线程池中有空闲线程,则会使用空闲线程来执行,否则会新创建一个线程来执行。而空闲线程过了60秒之后则会被释放掉。这种线程池适合任务量大,但是耗时较少的场景。

  • ScheduledThreadPool

    1
    2
    3
    public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
    }

    ScheduledThreadPool的线程总数是Integer.MAX_VALUE,并且使用了一个可扩容的无解延迟队列。

上面的几种不同的线程池,要么是使用的无界队列,导致可能堆积大量的请求;要么线程池的最大线程数为Integer.MAX_VALUE,可能创建大量线程。如果使用不当,两种情况都有可能会导致OOM异常,影响应用的稳定性。

execute()和submit()

提交任务到线程池,有两种方式:

  • 调用ThreadPoolExecutor类的public void execute(Runnable command)方法,提交一个Runnable任务;
  • 调用ThreadPoolExecutor父类AbstractExecutorService的public Future<?> submit(Runnable task)方法,提交一个Runnable任务。

从函数定义上可以看出,execute()方法没有返回值,而submit()会返回一个Future对象,可以使用返回的Future对象,调用get()方法,来阻塞式的获取任务执行结果。

除了返回值不一样之外,还有一点需要注意:异常。使用execute()方法提交的任务,其内部是安排一个线程直接来执行任务,因此如果遇到异常,则会打印出来;而使用submit()方法,内部会封装成一个RunnableFuture类型(此处即是封装成了一个FutureTask)的ftask,然后将这个ftask提交执行:

1
2
3
4
5
6
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

可以理解为线程池执行的是FutureTask的run,而在FetureTask的run方法内部,再执行的任务,而run方法内部有这么一段异常捕获代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void run() {
// ...
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
// ...
}

当任务抛出异常之后,异常并没有继续往外抛,而是被捕获,并且记录了下来,当调用FetureTask的get()方法时,才会抛出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
public V get() throws InterruptedException, ExecutionException {
// ...
return report(s);
}

private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

参考



-=全文完=-