问题现象
<task:annotation-driven executor="executor"
scheduler="scheduler" />
<task:executor id="executor" pool-size="16-128"
keep-alive="60" rejection-policy="CALLER_RUNS"
queue-capacity="1000" />
@Async()
public Future<Result4Calculate> calculateByLendId(int lendrequestId) {
// 标记1
// 调用REST服务;监控调用时间。
}
// 获取Future后的处理如下
try {
keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);
// 后续处理
} catch (Exception e) {
// 标记2
// 异常报警
}
原因分析
任务调度逻辑
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
如果正在运行的线程数量小于corePoolSize(最小线程数),则尝试启动一个新线程,并将当入参command作为该线程的第一个task。否则进入步骤二。
如果没有按步骤1执行,那么尝试把入参command放入workQueue中。如果能成功入队,做后续处理;否则,进入步骤三。
如果没有按步骤2执行,那么将尝试创建一个新线程,然后做后续处理。
汇总分析
据THREAD的配置,ThreadPoolExecutor中的corePoolSize = 16。
在异步调度过程中,线程池数量没有增长(最多是16个)。
这一点是通过日志中的线程名称确认的。日志中,异步线程的id从executor-1、executor-2一直到executor-16,但17及以上的都没有出现过。
当并发数超过16时,ThreadPoolExecutor会按照步骤二进行任务调度,即把任务放入队列中,但没有及时创建新线程来执行这个任务。
这一点是推测。在后面的测试中会验证这一点。
队列中的任务出现积压、时间累积,导致某一个任务超时后,后续大量任务都超时。但是超时并没有阻止任务执行;任务仍然会继续通过rest client调用rest server,并被监控代码记录下时间。
模拟重现
<!-- 调用rest接口时,使用此异步执行器。避免占用全局的线程池 -->
<task:executor id="keplerRestExecutor" pool-size="2-128"
keep-alive="60" rejection-policy="CALLER_RUNS"
queue-capacity="1000" />
@Test
public void test_multi_thread() {
System.out.println("start");
for (int i = 0; i < 10; i++) {
new Thread(
() -> {
long start = System.currentTimeMillis();
System.out.println("committed.");
Future<Result4Calculate> result =
BizOverdueCalculateServiceTest.
this.bizOverdueCalculateService.
calculateByLendId(1231);
System.out.println("to get. cost:"
+ (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
try {
result.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException
| TimeoutException e) {
e.printStackTrace();
}
System.out.println("getted. cost:"
+ (System.currentTimeMillis() - start));
}, "thread_" + i).start();
}
System.out.println("all started");
try {
Thread.sleep(10001);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 按进件id试算。 * <p>
* 结清日期默认为系统当前日期
*
* @param lendrequestId
* @return
*/
@Async("keplerRestExecutor")
public Future<Result4Calculate> calculateByLendId(int lendrequestId) {
long start = System.currentTimeMillis();
Future<Result4Calculate> f =
this.calculateByLendId(lendrequestId,
new Date());
System.out.println(Thread.currentThread().getName() + ", active count:"
+ this.keplerRestExecutor.getActiveCount() + ", queue size :"
+ this.keplerRestExecutor.getThreadPoolExecutor().getQueue().size()
+ " rest cost: " + (System.currentTimeMillis() - start));
return f;
}
测试类并发的发起10个rest调用任务后,只有两个任务会被线程池中的工作线程立即执行,其它八个任务都进入队列。
线程池中始终只有两个工作线程。
队列中每个任务的执行时间都不超时,但执行过若干个任务后,后续任务全部超时。
全部提交后,只有两个线程在执行,其它8个任务全部在队列中:active count:2, queue size :8。
线程池中始终只有keplerRestExecutor-1、keplerRestExecutor-2两个线程。active count也始终为2。
任务的实际执行时间(rest cost)都在1s上下。但从第9(每次测试,这个数字会略有不同)个任务开始,result.get(5, TimeUnit.SECONDS)方法出现超时。
start
committed.
committed.
committed.
committed.
committed.
all started
committed.
committed.
committed.
committed.
committed.
to get. cost:37
to get. cost:33
to get. cost:33
to get. cost:37
to get. cost:37
to get. cost:33
to get. cost:37
to get. cost:33
to get. cost:37
to get. cost:35
keplerRestExecutor-1, active count:2, queue size :8 rest cost: 1437
getted. cost:1444
keplerRestExecutor-2, active count:2, queue size :7 rest cost: 1437
getted. cost:1444
keplerRestExecutor-1, active count:2, queue size :6 rest cost: 1155
getted. cost:2599
keplerRestExecutor-2, active count:2, queue size :5 rest cost: 1155
getted. cost:2600
keplerRestExecutor-1, active count:2, queue size :4 rest cost: 1140
getted. cost:3739
keplerRestExecutor-2, active count:2, queue size :3 rest cost: 1140
getted. cost:3740
keplerRestExecutor-1, active count:2, queue size :2 rest cost: 1176
getted. cost:4915
keplerRestExecutor-2, active count:2, queue size :1 rest cost: 1176
getted. cost:4916
java.util.concurrent.TimeoutException
getted. cost:5001
getted. cost:5001
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
keplerRestExecutor-1, active count:2, queue size :0 rest cost: 1175
keplerRestExecutor-2, active count:1, queue size :0 rest cost: 1175
rejection-policy的几种配置
解决方案
提高初始线程数。
关闭队列。
更换线程池。
设定为CALLER_RUNS。
设定为DISCARD。
关闭队列方案测试代码
<!-- 调用kepler的rest接口时,使用此异步执行器。避免占用全局的线程池 -->
<task:executor id="keplerRestExecutor" pool-size="2-128"
keep-alive="60" rejection-policy="CALLER_RUNS"
queue-capacity="0" />
<task:scheduler id="scheduler" pool-size="32" />
任务一经提交,就会创建10个工作线程来分别执行。
队列大小始终为0.
不会出现超时。
可能会出现后续任务中,active count 小于10的情况。
关闭队列后的日志输出
start
committed.
committed.
committed.
committed.
committed.
all started
committed.
committed.
committed.
committed.
committed.
to get. cost:3
to get. cost:7
to get. cost:7
to get. cost:3
to get. cost:7
to get. cost:7
to get. cost:7
to get. cost:7
to get. cost:4
to get. cost:4
keplerRestExecutor-7, active count:10, queue size :0 rest cost: 2177
getted. cost:2182
keplerRestExecutor-4, active count:9, queue size :0 rest cost: 2182
getted. cost:2187
keplerRestExecutor-9, active count:8, queue size :0 rest cost: 2185
getted. cost:2190
keplerRestExecutor-1, active count:7, queue size :0 rest cost: 2190
getted. cost:2196
keplerRestExecutor-3, active count:6, queue size :0 rest cost: 2191
getted. cost:2196
keplerRestExecutor-2, active count:5, queue size :0 rest cost: 2191
keplerRestExecutor-5, active count:4, queue size :0 rest cost: 2191
getted. cost:2196
getted. cost:2196
keplerRestExecutor-10, active count:3, queue size :0 rest cost: 2192
getted. cost:2197
keplerRestExecutor-8, active count:2, queue size :0 rest cost: 2192
getted. cost:2197
keplerRestExecutor-6, active count:1, queue size :0 rest cost: 2193
getted. cost:2198
联系客服