如:newFixedThreadPool
[java] view plain copy
public
static
ExecutorService newFixedThreadPool(int
nThreads) {return
new
ThreadPoolExecutor(nThreads, nThreads,new
LinkedBlockingQueue如:newCachedThreadPool[java] view plain copy
public
static
ExecutorService newCachedThreadPool() {return
new
ThreadPoolExecutor(0, Integer.MAX_VALUE,new
SynchronousQueue如:newSingleThreadExecutor[java] view plain copy
public
static
ExecutorService newSingleThreadExecutor() {return
new
FinalizableDelegatedExecutorServicenew
ThreadPoolExecutor(1, 1,new
LinkedBlockingQueue可以发现,其实它们调用的都是同一个接口ThreadPoolExecutor方法,只不过传入参数不一样而已。下面就来看看这个神秘的ThreadPoolExecutor。
首先来看看它的一些基本参数:
[java] view plain copy
public
class
ThreadPoolExecutorextends
AbstractExecutorService {volatile
int
runState;static
final
int
RUNNING = 0;static
final
int
SHUTDOWN = 1;static
final
int
STOP = 2;static
final
int
TERMINATED = 3;private
final
BlockingQueueprivate
final
ReentrantLock mainLock =new
ReentrantLock();private
final
Condition termination = mainLock.newCondition();private
final
HashSetnew
HashSetprivate
volatile
long
keepAliveTime;private
volatile
int
corePoolSize;private
volatile
int
maximumPoolSize;private
volatile
int
poolSize;private
volatile
RejectedExecutionHandler handler;private
volatile
ThreadFactory threadFactory;private
int
largestPoolSize;private
long
completedTaskCount;初始化线程池大小 有以下四种方法:
从源码中可以看到其实最终都是调用了以下的方法:
[java] view plain copy
public
ThreadPoolExecutor(int
corePoolSize,int
maximumPoolSize,long
keepAliveTime,if
(corePoolSize < 0="">throw
new
IllegalArgumentException();if
(workQueue ==null
|| threadFactory ==null
|| handler ==null
)throw
new
NullPointerException();this
.corePoolSize = corePoolSize;this
.maximumPoolSize = maximumPoolSize;this
.workQueue = workQueue;this
.keepAliveTime = unit.toNanos(keepAliveTime);this
.threadFactory = threadFactory;this
.handler = handler;这里很简单,就是设置一下各个参数,并校验参数是否正确,然后抛出对应的异常。接下来我们来看看最重要的方法execute,其源码如下:
[java] view plain copy
public
void
execute(Runnable command) {if
(command ==null
)throw
new
NullPointerException();if
(poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 判断1if
(runState == RUNNING && workQueue.offer(command)) { // 判断2if
(runState != RUNNING || poolSize == 0) // 判断3else
if
(!addIfUnderMaximumPoolSize(command)) // 判断4笔者在上面加了点注释。下面我们一个一个判断来看首先判断1
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
(1)当poolSize >= corePoolSize 不成立时,表明当前线程数小于核心线程数目,左边返回fasle.接着执行右边判断!addIfUnderCorePoolSize(command)
它做了如下操作
[java] view plain copy
private
boolean
addIfUnderCorePoolSize(Runnable firstTask) {null
;final
ReentrantLock mainLock =this
.mainLock;//加锁try
{if
(poolSize < corepoolsize="" &&="" runstate="=">finally
{return
t !=null
;发现它又调用addTread[java] view plain copy
private
Thread addThread(Runnable firstTask) { //调用这个方法之前加锁new
Worker(firstTask);//线程包装成一个workboolean
workerStarted =false
;if
(t !=null
) {if
(t.isAlive()) // 线程应该是未激活状态throw
new
IllegalThreadStateException();int
nt = ++poolSize;//当前运行线程数目加1if
(nt > largestPoolSize)try
{true
;finally
{if
(!workerStarted)return
t;其实Work是真实去调用线程方法的地方,它是对Thread类的一个包装,每次Thread类调用其start方法时,就会调用到work的run方法。其代码如下,[java] view plain copy
private
void
runTask(Runnable task) { //真正发起线程方法的地方final
ReentrantLock runLock =this
.runLock;try
{[java] view plain copy
if
((runState >= STOP || //判断的判断boolean
ran =false
;try
{true
;null
);//处理后catch
(RuntimeException ex) {if
(!ran)throw
ex;finally
{public
void
run() {try
{true
;null
;while
(task !=null
|| (task = getTask()) !=null
) {null
;finally
{this
);发现要执行一个线程真的很不容易,如果addIfUnderCorePoolSize返回true,刚表明成功添加一条线程,并调用了其start方法,那么整个调用到此结束。如果返回fasle.那么就进入判断2.(2)当poolSize >= corePoolSize成立时,整个判断返回true。接着执行判断2
判断2
[java] view plain copy
if
(runState == RUNNING && workQueue.offer(command)) { // 判断2如果当前线程池在运行状态,并且将当前线程加入到缓冲队列中。workQueue的offer是一个非阻塞方法。如查缓冲队列满了的话,返回为false.否则返回true;如果上面两个都 为true,表明线程被成功添加到缓冲队列中,并且当前线程池在运行。进入判断3
判断3
[java] view plain copy
if
(runState != RUNNING || poolSize == 0)当线程被加入到线程池中,进入判断3.如果这时线程池没有在运行或者运行的线程为为0。那么就调用ensureQueuedTaskHandled,它做的其实是判断下是否在拒绝这个线程的执行。[java] view plain copy
private
void
ensureQueuedTaskHandled(Runnable command) {final
ReentrantLock mainLock =this
.mainLock;boolean
reject =false
;null
;try
{int
state = runState;if
(state != RUNNING && workQueue.remove(command)) //线程池没有在运行,且缓冲队列中有这个线程true
;else
if
(state < stop="">null
);finally
{if
(reject)判断4[java] view plain copy
else
if
(!addIfUnderMaximumPoolSize(command))在判断2为false时执行,表明当前线程池没有在运行或者该线程加入缓冲队列中失败,那么就会尝试再启动下该线程,如果还是失败,那就根据拒绝策略来处理这个线程。其源码如下:[java] view plain copy
private
boolean
addIfUnderMaximumPoolSize(Runnable firstTask) {null
;final
ReentrantLock mainLock =this
.mainLock;try
{if
(poolSize < maximumpoolsize="" &&="" runstate="=" running)="" 如果当前运行线程数目="" 小于最大线程池大小="" 并且="">finally
{return
t !=null
;一般调用这个方法是发生在缓冲队列已满了,那么线程池会尝试直接启动该线程。当然,它要保存当前运行的poolSize一定要小于maximumPoolSize。否则,最后。还是会拒绝这个线程!以上大概就是整个线程池启动一条线程的整体过程。
总结:
ThreadPoolExecutor中,包含了一个任务缓存队列和若干个执行线程,任务缓存队列是一个大小固定的缓冲区队列,用来缓存待执行的任务,执行线程用来处理待执行的任务。每个待执行的任务,都必须实现Runnable接口,执行线程调用其run()方法,完成相应任务。
ThreadPoolExecutor对象初始化时,不创建任何执行线程,当有新任务进来时,才会创建执行线程。
构造ThreadPoolExecutor对象时,需要配置该对象的核心线程池大小和最大线程池大小:
当目前执行线程的总数小于核心线程大小时,所有新加入的任务,都在新线程中处理
当目前执行线程的总数大于或等于核心线程时,所有新加入的任务,都放入任务缓存队列中
当目前执行线程的总数大于或等于核心线程,并且缓存队列已满,同时此时线程总数小于线程池的最大大小,那么创建新线程,加入线程池中,协助处理新的任务。
当所有线程都在执行,线程池大小已经达到上限,并且缓存队列已满时,就rejectHandler拒绝新的任务
联系客服