一个懂Akka、RxJava,看得懂《七周七并发》的人,和普通程序员完全是两个世界的人。那作为一个羞涩的普通程序员,怎么在自己的服务化体系里,满足自己的异步化需求呢 ?我的思路是这样的:
1. 先认清自己的需求,不要一开始就站到某个技术上,再倒叙自己的需求。
2. 然后从浅入深,分析可以用什么样的技术满足该层次的需求,依然避免一下就冲到某个技术上,比如Akka入门到放弃。
3. 最后找出一条相对平稳的技术路线,能够贯穿各个层次的需求,而且最好不那么考验使用者的智商。
服务处理的过程中,总会包含如下几类的远程调用:
服务层:RESTful,自家的RPC框架
数据层:JDBC,Memcached,Redis,ZooKeeper...
消息层:Kafka,RabbitMQ...
有些客户端已经支持异步,比如大部分的RPC框架、RESTful Client、SpyMemcached、ZooKeeper、Kafka 等。 在异步接口之中,又分返回Future 与 设置用户自定义CallBack类两种风格。
但有些客户端还是同步的,比如JDBC,Jedis,在讨论方案时需要把它们也考虑进去。
并行执行的机会有两种:
一是服务提供者没有提供批量接口,比如商品查询,只能以不同的ID分次调用(题外话,始终服务方提供批量接口会更好些)。
一是两个调用之间没有依赖关系,一个调用的参数并不依赖另一个调用的返回结果,比如查询商品信息与查询库存。
并行调用又分两个层次:
一是 简单并行 ,把所有可并行的服务一下子都撒出去,然后等待所有的异步调用返回,简单的Future.get()就够用。
还有一种特例的场景是,先调一个异步接口,然后做一些同步的事情,再回头Future.get()拿之前异步调用的结果,这也达到了节省总响应时间的效果。
二是 调用编排 ,比如一开始并行A、B、C、D服务,一旦A与B返回则根据结果调用E,一旦C返回则调用F,最后组装D、E、F的结果返回给客户端。
如果各个服务返回的时间不固定,如果还是简单的并行,没办法做到最高效的调度,必须有一种机制,定义A与B返回立刻调用D,C返回则立刻调用E。此时就需要RxJava,或JDK8的CompleteableFuture,或Guava的ListenenableFuture,基于Callback机制来编排了。
这个概念并不陌生,NIO就是这个思路。
但是即使你用了Netty的NIO 或 Servlet3.0的异步Servlet 或,也只是解决了传输层面用少量传输线程处理海量并发的传输,并在入口层的编程模式上提供了异步化的可能性。
如果你的服务线程里存在阻塞的远程调用,那线程还是会等待在远程调用上而无法处理海量请求,即使异步化如Future.get(),也依然是等待。
所以,你必须有办法在阻塞等待时把线程给交回去,比如服务线程里采用全异步化的Callback模式,或引入Akka的Actor模式,或基于Quasar引入纤程的概念。
在我看来 , 客户端简单并行->客户端并行编排->服务端少量线程处理海量请求。对于大部分普通项目,是由浅入深三个层次的需求。
最简单就是并发的调用一堆返回Future的异步接口,再一个个Get回来,最慢的那个会阻塞住其他:
FutureproductFuture = productService.query(id); Future stockFuture = stockService.query(id); ....... Product product = productFuture.get(); Long stock = stockFuture.get();
HttpClient有一个 HttpAsyncClient 的子项目提供Http的异步访问:
HttpGet request1 = new HttpGet("http://www.apache.org/"); Futurefuture = httpclient.execute(request1, null); HttpResponse response1 = future.get();
Spring的RestTemplate同样有AsyncRestTemplate:
Future> futureEntity = template.getForEntity("http://www.apache.org/" , String.class); ResponseEntity entity = futureEntity.get();
但如果异步接口只能设置自定义Callback函数,不返回Future呢? 比如Thrift就是这样。
Callback函数在这种并行调用场景里并不好用,因此建议还是要转换回Future接口使用。
转换的方法很简单,自己实现一个默认的Callback类,里面包含一个Future,然后实现Callback接口所定义的onSucess()和onFail()函数,将结果赋值到Future中。
DefaultFutureCallbackcallback=new DefaultFutureCallback (); myService.getPrice(1L, callback); String result = callback.getFuture().get();
至于Future类的实现,随便抄个HttpClient的BasicFuture就好了。
同步接口如JDBC与Jedis,只能在调用它们的地方实现Callable接口,异步的跑在另一个线程池里来完成并行调度。但这其实引入了线程调度的消耗,不得而为之,不可滥用。
Futurefuture = executor.submit(new Callable (){ @Override public Product call() throws Exception { return productDao.query(id); } });
题外话,executor中返回的Future的实现类叫FutureTask,它能以Callable或Runnable + 预设Result作为参数构建,自己本身又是个Runnable,然后executor.submit() 又接受Callable 或 Runnable+预设Result做参数。所以尽量避免自己构建这个FutureTask类传给executor,由executor内部构建就好。
如果是JDK8,用Lambda就可以写得短一些。
Future<Product> future = executor.submit(()->productDao.query(id));
《浅入浅出,谈谈服务化体系中的异步(上)》 ,转载请保留链接