打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
线程池踩坑

本文目录:

1. 概述

我们这里的队列都指线程池使用的阻塞队列 BlockingQueue 的实现。

什么是有界队列?就是有固定大小的队列。比如设定了固定大小的 LinkedBlockingQueue,又或者大小为 0,只是在生产者和消费者中做中转用的 SynchronousQueue。

什么是无界队列?指的是没有设置固定大小的队列。这些队列的特点是可以直接入列,直到溢出。当然现实几乎不会有到这么大的容量(超过 Integer.MAX_VALUE),所以从使用者的体验上,就相当于 “无界”。比如没有设定固定大小的 LinkedBlockingQueue。

所以无界队列的特点就是可以一直入列,不存在队列满负荷的现象。

这个特性,在我们自定义线程池的使用中非常容易出错。而出错的根本原因是对线程池内部原理的不了解。

比如有这么一个案例,我们使用了无界队列创建了这样一个线程池:

ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

配置的参数如下:

  • 核心线程数 2
  • 最大线程数 4
  • 空闲线程保活时间 60s
  • 使用无界队列 LinkedBlockingQueue

然后对这个线程池我们提出一个问题:使用过程中,是否会达到最大线程数 4?

2. 验证

我们写了个 Demo 验证一下,设定有 10 个任务,每个任务执行 10s。

任务的执行代码如下,用 Thread.sleep 操作模拟执行任务的阻塞耗时。

/** * @author lidiqing * @since 17/9/17. */public class BlockRunnable implements Runnable {    private final String mName;    public BlockRunnable(String name) {        mName = name;    }    public void run() {        System.out.println(String.format("[%s] %s 执行", Thread.currentThread().getName(), mName));        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

然后在 main 方法中把这 10 个任务扔进刚刚设计好的线程池中:

 public static void main(String[] args) {        ExecutorService executor =  new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());        for (int i = 0; i < 10; i++) {            executor.execute(new BlockRunnable(String.valueOf(i)));        }    }

结果输出如下:

[pool-1-thread-2] 1 执行[pool-1-thread-1] 0 执行[pool-1-thread-2] 2 执行[pool-1-thread-1] 3 执行[pool-1-thread-1] 5 执行[pool-1-thread-2] 4 执行[pool-1-thread-2] 7 执行[pool-1-thread-1] 6 执行[pool-1-thread-1] 8 执行[pool-1-thread-2] 9 执行

发现了什么问题?这里最多出现两个线程。当放开到更多的任务时,也依然是这样。

3. 剖析

我们回到线程池 ThreadPoolExecutor 的 execute 方法来找原因。

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }

上面代码的核心就是任务进入等待队列 workQueue 的时机。答案就是,执行 execute 方法时,如果发现核心线程数已满,是会先执行 workQueue.offer(command) 来入列。

也就是 当核心线程数满了后,任务优先进入等待队列。如果等待队列也满了后,才会去创建新的非核心线程

所以我们上面设计的线程池,使用了无界队列,会直接导致最大线程数的配置失效。

可以用一张图来展示整个 execute 阶段的过程:

所以上面的线程池,实际使用的线程数的最大值始终是 corePoolSize ,即便设置了 maximumPoolSize 也没有生效。 要用上 maximumPoolSize ,允许在核心线程满负荷下,继续创建新线程来工作 ,就需要选用有界任务队列。可以给 LinkedBlockingQueue 设置容量,比如 new LinkedBlockingQueue(128) ,也可以换成 SynchronousQueue。

举个例子,用来做异步任务的 AsyncTask 的内置并发执行器的线程池设计如下:

public abstract class AsyncTask<Params, Progress, Result> {         private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();    // We want at least 2 threads and at most 4 threads in the core pool,    // preferring to have 1 less than the CPU count to avoid saturating    // the CPU with background work    private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));    private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;    private static final int KEEP_ALIVE_SECONDS = 30;    private static final ThreadFactory sThreadFactory = new ThreadFactory() {        private final AtomicInteger mCount = new AtomicInteger(1);        public Thread newThread(Runnable r) {            return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());        }    };    private static final BlockingQueue<Runnable> sPoolWorkQueue =            new LinkedBlockingQueue<Runnable>(128);    /**     * An {@link Executor} that can be used to execute tasks in parallel.     */    public static final Executor THREAD_POOL_EXECUTOR;    static {        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,                sPoolWorkQueue, sThreadFactory);        threadPoolExecutor.allowCoreThreadTimeOut(true);        THREAD_POOL_EXECUTOR = threadPoolExecutor;    }    ...}

我们可以看到,AsyncTask 的这个线程池设计,是希望在达到核心线程数之后,能够继续增加工作线程,最大达到 CPU_COUNT * 2 + 1 个线程,所以使用了有界队列,限制了任务队列最大数量为 128 个。

所以使用 AsyncTask 的并发线程池的时候要注意,不适宜短时间同时大量触发大量任务的场景。

因为当核心线程、任务队列、非核心线程全部满负荷工作的情况下,下一个进来的任务会触发 ThreaPoolExecutor 的 reject 操作,默认会使用 AbortPolicy 策略,抛出 RejectedExecutionException 异常。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
AsyncTask源码分析
java自带线程池和队列详细讲解
Android 并发二三事之Java线程池
Java线程池实现原理与技术II
基于优先级队列java线程池
Java多线程
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服