随着Java 9、Spring 5以及Spring Boot 2对于响应式编程(Reactive Stack)的支持,Reactor模式也日渐趋于流行,虽然当前并未在业界全面落地应用,但随着Java 8函数式编程、Lambda表达式、Stream数据流式处理的深入人心,加之Spring框架对于事件驱动模型的有力支持,日后响应式编程的全面落地当只是时间问题。而响应式编程的一个很重要的特性就是它对异步化的完美封装使得开发人员只需要专注于数据流的处理和业务逻辑的实现而不用过于关心异步化背后复杂的线程管理,但这既是响应式编程模式的优点也是它的缺点,导致它的学习曲线较高,代码也并不是那么好理解,它是一种可以说是完全不同于我们已经习惯了的同步编程思维的新的编程模式,如果不能完全深刻地理解便不敢轻易应用于成熟的业务中,这也是导致它虽然已经出现了有一些年头了但至今尚未能够大规模落地实践的一个很重要的原因。
而对于Java语言,响应式框架中最为流行的当是来源于微软的LINQ扩展函数库的RxJava了。它在微服务的服务治理组件来源于Netflix公司的服务容错框架Hystrix的实现中得到了全面的使用,从而使其在微服务的全面落地过程中声名鹊起,如果想要研究Hystrix的具体实现便不得不首先学习RxJava的一些基本概念、操作符和API使用。由于RxJava和响应式编程尚未在公司的业务中全面应用,本篇并不对它们做过多深究,这篇文章主要是从实践的角度谈谈在当前的同步编程模式下如何结合现有在使用的技术应用异步业务处理。
同步:线程阻塞等待结果的响应再进行下一步的处理。这个我们已经非常熟悉了,它在我们的业务处理中占据了主流,也符合我们的思维流程。
异步:主线程不需要阻塞等待响应而能够继续进行后续的处理,而把这一部分需要一定耗时的业务逻辑交予子线程处理,在子线程处理完毕后再通知主线程或另外的线程结果进行后续的处理或无需任何后续处理。
从上面的描述中可以看出,同步与异步最大的区别便在于会不会阻塞住当前线程,使其处于无谓的等待中。
那么,我们为什么要使用异步,异步相比于同步有什么好处?这个问题可以使用我们在中学时代学过的著名数学家华罗庚关于烧水泡茶的两个算法来说明:
算法一:
第一步、烧水;第二步、水烧开后,洗刷茶具;第三步、沏茶
算法二:
第一步、烧水;第二步、烧水过程中,洗刷茶具;第三步、水烧开后沏茶
这两个算法我们一眼便能看出哪个更节省时间。其实我们的日常编码基本也都是这样的类似工序的安排,如何更好地安排好业务处理步骤在更快的时间内做出响应便是我们需要做的优化。上面的算法一便是完全同步的算法,上一步完成了才能进行下一步,这样虽然非常有利于我们的理解和对于工序的控制,但却不能充分利用资源浪费了很多无谓的时间在等待中什么也没做。而算法二则是结合了同步和异步,充分利用了烧水过程中的等待时间来洗刷茶具,有效地利用了时间资源,从而比算法一要节省不少时间,能够更快地做出响应(沏好茶)。对于有依赖关系的步骤同步不可避免,但是对于没有任何依赖关系的步骤完全可以使用异步进行并行处理以提高处理效率在更快的时间内做出响应。因此异步的好处便是可以充分利用资源并行或并发处理提高处理效率节省响应时间,对应于计算机来说,便是充分利用多核CPU的处理能力在同一时间内做更多的计算,提高CPU的资源利用率,提高系统的吞吐量和降低系统的响应时间。因为我们的业务处理中,大部分的逻辑都是IO密集型的,比如存取数据库、远程服务调用(RPC或HTTP等)、读写文件等,而非CPU密集型的(对于大量的数据需要CPU计算的可使用ForkJoinPool/ForkJoinTask进行任务分解来并行计算),IO密集型的任务正如上面的烧水过程一样,需要一定的耗时,会使线程阻塞等待,此时它并不占用CPU资源,因此在这个等待的过程中是浪费了CPU宝贵的资源的。如果对这样的IO调用进行异步化并行或并发处理,此时主线程便不会阻塞而会继续处理后面的逻辑,从而很好地利用了CPU。而这个在互联网环境下应对高并发请求的场景中尤为重要,是我们一切优化所努力的方向。缓存、消息队列(MQ)等中间件的作用也在于此。
从第二部分的描述中也可以看出,异步化的前提是要正确梳理出业务处理步骤中的依赖关系,哪些步骤是有依赖关系的,哪些步骤是没有依赖关系的,对于有依赖关系的步骤使用同步的方式处理,对于没有依赖关系的步骤则可以使用异步进行并行或并发处理。当然,有依赖关系的几个步骤组合起来在大的处理时序中可能与其他的步骤之间又是没有依赖关系的,此时在更高一层级中大的步骤之间也可以使用异步的方式,需要我们根据具体的业务逻辑具体问题具体分析灵活处理。
那么,在Java服务端的开发中,都有哪些异步化的手段呢?
1、多线程线程池(ThreadPoolExecutor)的方式
代码如下:
ExecutorService executor = Executors.newFixedThreadPool(5); // 不需要关心处理结果 executor.execute(() -> System.out.println("业务处理步骤...")); // 需要处理结果进行后续的逻辑处理 Future<String> future = executor.submit(() -> { System.out.println("业务处理步骤..."); return "业务处理结果"; }); // 中间的其他处理步骤 // 对异步任务的结果进行处理 try { System.out.println("异步任务的结果:" + future.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } 复制代码
这种方式虽然简单明了也很常用,但是它的主要问题在于把业务逻辑处理和线程处理代码耦合在了一起,使得线程的代码侵入到了我们的业务中,不利于线程的统一管理和维护,没有一个统一的线程集中管理收口很容易造成线程的无限制滥用,从而引起系统问题。而且,Future的使用非常不灵活,它的获取结果的get()方法还是阻塞式的,有点鸡肋,因此并不建议大量使用。当然,也可以结合Google的Guava库提供的SettableFuture和ListenableFuture进行改造,但这毕竟需要另外的工作量,还有很多问题需要处理。
下面重点来说说在Java服务端开发异步化实践中个人觉得比较好的两种方式:
2、Spring的事件机制和@Async注解异步任务支持
结合Spring的事件机制和@Async注解异步方法调用可以实现异步化的处理,主要代码如下:
首先声明开启异步功能支持和异步任务执行线程池配置,还可以配置异步任务异常处理器
@Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Bean @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("EventExecutor-"); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { //return new EventAsyncUncaughtExceptionHandler(); return null; } static class EventAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { } } }复制代码
基础事件定义
public class BaseEvent extends ApplicationEvent { private static final long serialVersionUID = 5570783330749390897L; public BaseEvent(Object source) { super(source); } }复制代码
事件发布器
@Component public class EventPublisher { @Autowired private ApplicationContext context; public void publishEvent(BaseEvent event){ context.publishEvent(event); } }复制代码
具体的业务事件
@Getter @Setter @ToString public class MisRecognitionEvent extends BaseEvent { private static final long serialVersionUID = 8645189838824356680L; private MisRecognitionInfo record; public MisRecognitionEvent(Object source, MisRecognitionInfo record) { super(source); this.record = record; } } 复制代码
具体的业务代码中发布可异步处理的事件消息
MisRecognitionInfo info = new MisRecognitionInfo(); MisRecognitionEvent misRecognitionEvent = new MisRecognitionEvent(this, info); eventPublisher.publishEvent(misRecognitionEvent);复制代码
事件处理器
@Component @Slf4j public class FaceMonitorEventListener { @EventListener @Async public void handleMisRecognitionEvent(MisRecognitionEvent event) { MisRecognitionInfo info = event.getRecord(); // 事件处理逻辑 } }复制代码
上面的模式适合主流程不关心异步流程的结果(比如记录日志,报警通知等),如果需要在主流程中对异步流程的结果进行处理,可以直接使用@Async异步方法,而不需要使用事件的机制
@Component @Slf4j public class AsyncRpcService { @Async AsyncResult<String> getData(String dataId) { return new AsyncResult<String>("data"); } }复制代码
具体的业务代码中获取异步任务的结果并做处理
AsyncResult<String> asyncRpcResult = asyncRpcService.getData("dataId"); asyncRpcResult.addCallback((result) -> System.out.println("异步任务结果:" + result), (ex) -> System.out.println("异步任务异常:" + ex.getMessage()));复制代码
3、Java 8的CompletableFuture的异步模式
CompletableFuture的API介绍请参考这篇文章( Java 8 CompletableFuture 教程 )。
Java 8的CompletableFuture扩展了Future的功能,提供了对异步任务执行完成后的回调和对多个Future的链式编排,从而可以实现非常丰富的异步任务处理场景。
场景一:
单个异步任务的处理
(1)调用线程不需要关心异步任务的处理结果
// 使用默认线程池 CompletableFuture.runAsync(() -> System.out.println("没有返回结果的异步任务...")); // 使用指定线程池 CompletableFuture.runAsync(() -> System.out.println("没有返回结果的异步任务..."), executor);复制代码
(2)对异步任务的结果进行处理
CompletableFuture.supplyAsync(() -> { System.out.println("异步任务处理..."); return "异步任务的返回结果"; }, executor).whenComplete((result, exception) -> { if (result != null) { System.out.println("处理异步任务结果"); } if (exception != null) { System.out.println("处理异步任务异常"); } });复制代码
场景二:
(1)多个异步任务都完成后的处理
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务一..."); return "异步任务一的结果"; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务二..."); return "异步任务二的结果"; }, executor); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务三..."); return "异步任务三的结果"; }, executor); List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(future1); futureList.add(future2); futureList.add(future3); CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])); // 所有异步任务都完成后的处理 allFutures.whenComplete((result, exception) -> { if (exception != null) { System.out.println("其中的一个异步任务发生了异常:" + exception.getMessage()); } // 每个异步任务的结果需要单独获取 List<String> futureResults = futureList.stream().map(future -> future.join()).collect(Collectors.toList()); System.out.println("所有异步任务的结果集合:" + futureResults); });复制代码
需要注意的是,如果其中任何一个异步任务发生了异常,则allFutures也会以该异常作为返回的结果,后面也就获取不到每个异步任务的结果了,因此这种情况下最好在每个异步任务里处理好异常。
(2)多个异步任务中的任何一个完成后的处理
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务一..."); try { TimeUnit.SECONDS.sleep(2); System.out.println("异步任务一耗时2秒钟..."); } catch (InterruptedException e) { e.printStackTrace(); } return "异步任务一的结果"; }, executor); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务二..."); try { TimeUnit.SECONDS.sleep(1); System.out.println("异步任务二耗时1秒钟..."); } catch (InterruptedException e) { e.printStackTrace(); } return "异步任务二的结果"; }, executor); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("异步任务三..."); try { TimeUnit.SECONDS.sleep(3); System.out.println("异步任务三耗时3秒钟..."); } catch (InterruptedException e) { e.printStackTrace(); } return "异步任务三的结果"; }, executor); List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(future1); futureList.add(future2); futureList.add(future3); CompletableFuture<Object> allFutures = CompletableFuture.anyOf(futureList.toArray(new CompletableFuture[futureList.size()])); // 所有异步任务都完成后的处理 allFutures.whenComplete((result, exception) -> { if (exception != null) { System.out.println("其中的一个异步任务发生了异常:" + exception.getMessage()); } if (result != null) { System.out.println("多个异步任务最快完成的那个任务返回的结果:" + result); } });复制代码
CompletableFuture.supplyAsync(() -> { System.out.println("异步任务一..."); return "异步任务一的返回结果"; }, executor).thenApplyAsync(result -> { System.out.println("异步任务二依赖于异步任务一的结果..."); return result + ":" + "异步任务二的返回结果"; }, executor).thenCombineAsync(CompletableFuture.supplyAsync(() -> { System.out.println("异步任务三..."); return "异步任务三的返回结果"; }, executor), (result1, result2) -> { System.out.println("异步任务四依赖于异步任务二和异步任务三的结果..."); return result1 + ":" + result2 + ":" + "异步任务四的返回结果"; }, executor).whenComplete((result, exception) -> { if (result != null) { System.out.println("最终结果:" + result); } if (exception != null) { System.out.println("处理异步任务异常"); } });复制代码
thenApplyAsync:接收前一个任务的返回结果并重新返回一个结果
thenAcceptAsync:只接收前一个任务的返回结果
thenRunAsync:不接收前一个任务的返回结果,只在前一个任务完成后做一些处理
需要注意的是,链式调用后面一个任务执行的前提是前一个任务没有发生异常,否则便终止了。
thenCombineAsync
thenAcceptBothAsync
runAfterBothAsync
applyToEitherAsync
acceptEitherAsync
runAfterEitherAsync
CompletableFuture的异步模式(***Async方法)分别提供了使用默认线程池和指定线程池两种方式。默认线程池使用的是ForkJoinPool的commonPool:
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1); /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }复制代码
如果当前系统不支持并行计算(多核处理器),则会为每个任务创造一个线程。commonPool的线程数则是CPU核数-1:
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }复制代码
由此可见,默认线程池的方式适合CPU密集型的任务,而我们大多数的业务处理都是IO密集型的,因此最好使用显式指定线程池的方式。
总结一下,事件和@Async注解的方式更适合异步处理逻辑比较复杂需要与主逻辑解耦的场景,而CompletableFuture的方式则更适合简单轻量的异步处理逻辑或与主逻辑有较强的关联关系的场景。
当然,在实践中还发现有依赖于消息中间件的基于消息的发布订阅模式的异步化处理方式,但由于这种方式比较重,通常用在一些特殊的跨系统的异步解耦的场景中,并不适合在单系统应用中大量使用,在此不再详述。