打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
快速搞定线程池源码

回复“面试”获取最新资料

回复“加群”邀您进技术交流群

何为线程池?

顾名思义,线程池就是存放一定量线程的容器,当有待执行任务的时候直接从线程池中取出线程执行任务,任务执行完成之后将线程回放至线程池中。

线程池的优点:降低了线程频繁创建、销毁的开销,提高系统的响应速度,方便统一管理创建的线程。

java.util.concurrent.ThreadPoolExecutor

线程池(ThreadPoolExecutor)提供 4 个默认的构造方法,固定参数 5 个。如下图:

核心参数解释如下:

  • 核心线程数量:corePoolSize

  • 最大线程数量:maximumPoolSize

  • 非核心线程空闲等待时间:keepAliveTime

  • 等待时间单位:timeUnit

  • 等待阻塞队列:blockingQueue<Runnable>

  • (可选)线程工厂创建线程:threadFactory

  • (可选)线程池拒绝策略:rejectedExecutionHandler

线程池原理

corePoolSize

线程池中默认存活的线程数量。不同的线程池对于核心线程数量有不同的要求,也与 allowCoreThreadTimeout 参数有关。

当 allowCoreThreadTimeout= true 时,核心线程没有任务且存活时间超过空闲等待时间后终止。

maximumPoolSize

线程池中允许的最大线程数量。

当 currentThreadNumber >= corePoolSize,且任务队列已满时,线程池会创建新线程来处理任务;当 currentThreadNumber =maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常。

keepAliveTime

线程池中空闲线程允许存活的时间,超过配置时间将会被终止。
  • blockingQueue:线程池缓存队列存放待处理的线程任务。

  • ArrayBlockingQueue:指定大小的等待队列(FIFO);有界队列,创建时需要指定队列的大小。

  • LinkedBlockingQueue:基于链表的等待队列(FIFO);无界队列,创建时不指定队列大小,默认为 Integer.MAX_VALUE。

  • PriorityBlockingQueue:带有优先级的等待队列

  • SynchronizedQueue:不存放对象的等待队列;同步移交队列,直接新建一个线程来执行新来的任务。

threadFactory

线程工厂,创建线程使用。

rejectedExecutionHandler

当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize 时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
  • AbortPolicy(默认策略):丢弃任务并抛出 RejectedExecutionException

  • CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

  • DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务(舍弃最老的请求,即将队列头部任务舍弃)。

  • DiscardPolicy:不做任何处理,直接丢弃任务。

怎么创建线程池?

线程池的创建主要有 2 种方式

  • 基于 ThreadPoolExecutor 的构造方法创建

  • Executors 执行器创建


Executors 执行器创建线程池是在 ThreadPoolExecutor 构造方法上进行简单的封装,特殊场景根据需要自行创建。可以把Executors理解成一个工厂类 
阿里开发规范中是建议使用ThreadPoolExecutor 来创建线程池。
// ThreadPoolExecutor 最基础的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
{
// 核心线程、最大线程数量必须大于 0,且 最大线程数量 大于等于 核心线程数量,空闲等待时间大于 0
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

常见线程池有以下 4 种:

  • newFixedThreadPool:固定大小的线程池

  • newSingleThreadExecutor:单个线程线程池

  • newCachedThreadPool:缓存线程池

  • newScheduledThreadPool:调度线程池

newFixedThreadPool

固定大小的线程池线程数量不存在变化,待处理的任务过多时会存放到缓存队列中。线程池会维护一定数量的线程,当创建的线程过多时阻塞队列中等待执行的线程数量会大量堆积,系统资源不足时容易发生 OOM。
// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
// 固定大小是指: 核心线程数量 = 最大线程数量
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

直接使用线程池构造方法创建固定线程池,线程池中有且仅有固定数量的线程去执行待执行的任务。当“待执行的任务数量 > maximumPoolSize + blockingQueue.size()”时,会抛出异常 java.util.concurrent.RejectedExecutionException。

// 阻塞队列大小为 5
LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(5);
// 创建线程数量为 3 的固定大小线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, blockingQueue);
// 待提交执行任务数量为 10,允许执行任务的数量为 8
for (int i = 0; i< 10; i++) {
poolExecutor.submit(
() -> {
System.out.println(Thread.currentThread().getName());
});
}

newSingleThreadExecutor

单线程线程池有且仅有一个线程,若有多余的任务提交到线程池中则会被暂存到阻塞队列,待线程空闲时再去执行,当线程在执行过程中失败而终止时,会创建个新的线程去执行缓存队列中的任务。
ExecutorService threadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
// 核心线程数、最大线程数都为 1,阻塞队列大小为 Integer.MAX_VALUE
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

newCachedThreadPool

缓存线程池初始时不存在线程,根据需要创建线程,当线程池中不存在空闲可用线程时会创建新的线程,线程池中超过 60s 且未使用的线程将被终止并删除。因此,合理的空闲等待时间,线程池可以维护一定数量的线程有利于提高性能。
ExecutorService threadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
// 线程池核心线程数为 0,最大为 Integer.MAX_VALUE,空闲等待时间未 60s
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newScheduledThreadPool

调度线程池,可以以固定的频率执行任务或者固定的延时执行任务。
ExecutorService threadPool = Executors.newScheduledThreadPool(3);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
// 设置核心线程数量,默认最大线程数量为 Integer.MAX_VALUE
return new ScheduledThreadPoolExecutor(corePoolSize);
}


public ScheduledThreadPoolExecutor(int corePoolSize) {
// ThreadPoolExecutor 父类构造方法
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

scheduleAtFixedRate:固定频率执行任务,即任务执行的频率保持不变。

// command:需要执行的任务;initialDelay:初始执行延迟时间;period:后续任务执行延迟时间;unit:延迟时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务执行延迟 1s【周期性的操作,period = 0,所有的任务都将在初始延迟后执行,没有周期性可言】
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleAtFixedRate(
()-> {
try {
// 假设任务执行时间为 3s,当 workTime <= period 时,延迟 period 时间后执行;当 workTime > period ,延迟 workTime 时间后执行
Thread.sleep(3000);
System.out.println("scheduleAtFixedRate " + Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format(new Date()) );
} catch (InterruptedException e) {
e.printStackTrace();
}
},
3,
1,
TimeUnit.SECONDS);

scheduleWithFixedDelay:固定的延时执行任务,指上一次执行成功之后和下一次开始执行的之前的时间。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务将在上一个任务执行成功 1 是后执行;
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
threadPool.scheduleWithFixedDelay(
()-> {
try {
Thread.sleep(1000);
System.out.println("scheduleWithFixedDelay " + Thread.currentThread().getName() + " " + DateFormat.getTimeInstance().format(new Date()) );
} catch (InterruptedException e) {
e.printStackTrace();
}
},
3,
1,
TimeUnit.SECONDS);

线程池有哪些状态

通过获取线程池状态,可以判断线程池是否是运行状态、可否添加新的任务以及优雅地关闭线程池等。

  • RUNNING: 线程池的初始化状态,可以添加待执行的任务。

  • SHUTDOWN:线程池处于待关闭状态,不接收新任务仅处理已经接收的任务。

  • STOP:线程池立即关闭,不接收新的任务,放弃缓存队列中的任务并且中断正在处理的任务。

  • TIDYING:线程池自主整理状态,调用 terminated() 方法进行线程池整理。

  • TERMINATED:线程池终止状态。

判断线程池状态常用方法总结如下。

shutdown 和 shutdownNow 的联系和区别:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,不接收新的任务。

  • shutdownNow():立即终止线程池,尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

isShutdown、isTerminated、isTerminating 的区别:

  • isShutDown():调用 shutdown() 或 shutdownNow() 方法后返回 true。

  • isTerminated():线程池中不存在任何需要执行的任务时,返回 true。

  • isTerminating() 线程池调用 shutdown() 或 shutdownNow() 后但是没有完全终止返回 true。

常用方法

线程池任务执行 execute()和 submit() 方法。

  • execute():ThreadPoolExecutor 类实现 Executor 接口中的方法,无返回值。

  • submit():ExecutorService 接口中的方法,有返回值,可以利用这一特性对任务的 Future.get() 抛出异常进行处理。

Executor 框架成员及关系图如下:

线程池监控

使用线程池可以提高系统并发时的吞吐量,提高系统性能。但是使用不当会造成系统资源占用过高,线程池缓存队列堆积大量待执行任务、缓存线程池中存在大量的耗时任务等会造成内存溢出、系统访问缓慢等问题。因此监控线程池就显得极为重要,下面根据源码解读常用方法进行线程池的监控。

核心线程数量:

// 线程池维持线程的最小存活数量与 allowCoreThreadTimeOut 参数有关
public int getCorePoolSize() {
return corePoolSize;
}

线程池最大的线程数量:

public int getMaximumPoolSize() {
return maximumPoolSize;
}

线程池最多创建的线程数量:

public int getLargestPoolSize() {
// 获取主线程锁,获取调用此方法时,线程池中曾创建的最大的线程数量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}

// 添加待执行任务方法
private boolean addWorker(Runnable firstTask, boolean core){
...
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加待执行的任务
workers.add(w);
// 获取线程池中存在的工作线程的大小
int s = workers.size();
if (s > largestPoolSize)
// 赋值
largestPoolSize = s;
workerAdded = true;
}
...
}

线程池当前存在的线程数量:

public int getPoolSize() {
// 获取主线程锁,获取调用此方法时,线程池中存在线程数量
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
} finally {
mainLock.unlock();
}
}

// 最近运行状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

线程池已完成任务数量:

// Worker 类属性获取线程任务计数器
volatile long completedTasks;

public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
// 遍历线程池中的线程
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}

线程池存在的任务总量:

public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
// w.islocked() 获取当前任务的状态,调用 isHeldExclusively() 判断
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}

// Work 内部类
public void lock() { acquire(1); } // 加锁
public boolean tryLock() { return tryAcquire(1); }// 尝试获取锁
public void unlock() { release(1); }// 释放锁

public boolean isLocked() { return isHeldExclusively(); }
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}

好了,以上便是今天对线程池源码相关的分享。

码字不易。老田期待你来个:点赞...转发...

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Java线程池核心ThreadPoolExecutor的使用和原理分析
万字长文爆肝线程池
深入分析线程池的实现原理(文末送书)
ThreadPoolExecutor使用和思考(上)-AA
Android 并发二三事之Java线程池
java自带线程池和队列详细讲解
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服