问题描述
需求信息
最近在工作中有个需求,先在A服务页面增加一条数据,然后去B服务查询此数据的详细信息
解决方案
为了使A服务的新增数据接口快速响应,在查询B服务数据详情的地方使用了线程池异步查询与更新。
问题现象
在验证时发现数据库中的数据字段不全,经分析缺少的都是需要从B服务查询并更新的字段
初步定位
猜测应该是查询B服务时出了一些异常,而由于不规范使用线程池导致异常没有抛出,直接打到了控制台,故A服务的日志系统并看不到错误日志。
问题解决
- 查询B服务数据详情时暂时去掉使用线程池,改为同步调用
- 增加容错定时任务,定时查询需要从B服务获取缺失字段的数据进行更新
上线后观察,新增的数据不再有部分字段缺失的情况;问题解决。
问题复盘
A服务线程池的使用
线程池定义
1 2 3 4 5
|
private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(), new ThreadFactoryBuilder().setNameFormat("queryDataDetail-%d").build());
|
线程池使用
1 2 3 4 5 6 7 8 9 10
| @Override @Transactional(rollbackFor = Exception.class) public void add(String contract) { checkIfExist(contract); Entity entity = saveData(contract); executorService.execute(() -> queryDataDetail(entity)); }
|
如此使用有何问题
我重新写了一个测试方法如下:
1、定义一个会一直抛异常的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Rule public SystemOutRule systemOutRule = new SystemOutRule().enableLog();
private String runWithException() { Thread thread = Thread.currentThread(); log.info("thread is {}", thread); log.info("eh={}", thread.getUncaughtExceptionHandler()); throw new NicaiException("出错啦!"); }
|
2、使用线程池调用上面的方法
1 2 3 4 5 6 7 8
| @Test public void run() throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(this::runWithException); TimeUnit.MILLISECONDS.sleep(100L); Assert.assertFalse(systemOutRule.getLog().contains("出错啦!")); }
|
3、上面的单测断言是成功的,那么异常跑哪里去了?上面的单测在控制台的输出如下:
4、可以看出上面的异常信息是直接输出到了控制台,而不是由程序输出到控制台,主要原因是主程序没有捕获到此异常导致的。(具体原因还没有深入)
如何解决线程池的异常捕获问题
上面的测试可以说明到为什么日志里面查不到错误日志,那么如何捕获线程里的异常呢?
方法1:使用UncaughtExceptionHandler
1、在创建线程池的时候,设置传入的ThreadFactory的UncaughtExceptionHandler属性,此UncaughtExceptionHandler会处理线程中的异常;下面的例子我直接打印了出来异常原因和异常栈。
1 2 3 4 5 6 7 8 9 10
| @Test public void runWithUncaughtExceptionHandler() throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool( new ThreadFactoryBuilder() .setUncaughtExceptionHandler((t, e) -> log.info("UncaughtExceptionHandler caught, error_message={}", e.getMessage(), e)) .build()); executorService.execute(this::runWithException); TimeUnit.MILLISECONDS.sleep(100L); Assert.assertTrue(systemOutRule.getLog().contains("出错啦!")); }
|
2、上面的单测运行结果如下:(可以和上面的运行结果进行比对)
3、从上面的运行结果可以看出异常信息是由程序捕获后再输出出来,这样就不会导致查不到异常日志了。
方法2:使用guava扩展的FutureCallback
1、guava对jdk的线程做了一些扩展,其中一个就是FutureCallback,使用方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Test public void runWithGuavaThreadPool() throws InterruptedException { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); ListenableFuture<String> listenableFuture = executorService.submit(this::runWithException); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { log.info("success! result = {}", result); }
@Override public void onFailure(Throwable t) { log.error("guava FutureCallback caught, error_message={}", t.getMessage(), t); } }, executorService); TimeUnit.MILLISECONDS.sleep(100L); Assert.assertTrue(systemOutRule.getLog().contains("出错啦!")); }
|
2、上面的单测运行结果如下:
问题总结
1、通过上面的测试,优化A服务的线程池定义,使之在遇到异常时能够正常被捕获,能输出,方便问题定位;补偿定时任务也能对第一次查询异常进行容错,保证数据能够同步过来。
1 2 3 4 5 6 7
|
private static final ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(), new ThreadFactoryBuilder() .setUncaughtExceptionHandler((t, e) -> log.error("查询数据详情的线程池异常,error_message={}", e.getMessage(), e)) .setNameFormat("queryDataDetail-%d").build());
|
2、当然此问题更深层的问题还没有完全解答
- 为什么线程里的异常不会被捕获?
- UncaughtExceptionHandler的运行原理是什么?
- Guava的FutureCallback是如何运行的?
3、测试代码源码地址