转载

微服务框架Finagle介绍 Part2: 在Finagle中开发基于Http协议的应用

在上篇文章中我介绍了Finagle中的Future/Service/Filter. 这篇文章里, 我们将构建一个基于Http协议的echo服务端和客户端, 下篇文章将构建一个基于thrift协议的客户端和服务端. 这两篇文章对应的源代码地址在 Github 上. 代码中有Java和Scala版本两套版本的实现, 但是这里我们只会介绍Java版本.

首先来看echo应用的Server端代码, 打开 java-finagle-example/src/main/java/com/akkafun/finagle/Server.java :

public class Server extends Service<Request, Response> {                             //1      @Override     public Future<Response> apply(Request request) {                                 //2         System.out.println("request: " + request.getContentString());         Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok());         response.setContentString(request.getContentString());         return Future.value(response);     }      public static void main(String[] args) throws Exception {         Server service = new Server();          ListeningServer server = Http.server().                                      //3                 withLabel("echo-server").                 withTracer(ZipkinTracer.mk("192.168.99.100",                     9410, DefaultStatsReceiver$.MODULE$, 1.0f)).                 serve(new InetSocketAddress(8081), service);          Await.result(server);     } } 
  1. 在Finagle中, 实现一个RPC服务非常简单. 只需要继承Service抽象类, 实现它的apply方法. Service抽象类有两个类型参数, 第一个类型参数代表的是请求对象, 第二个类型参数代表的是返回对象. 这两个对象的具体类型与Service实现类使用的具体协议有关. 例如我们在echo服务中使用Http协议, 对应的Request类就是 com.twitter.finagle.http.Request , 对应的Response类是 com.twitter.finagle.http.Response . 如果是thrift协议, 则这两个类型参数在Service实现类中都是 scala.Array<scala.Byte> (Array和Byte都是scala中的类, 对应Java中的数组与byte).

  2. apply方法中, 我们首先使用Response的工厂方法构造一个Response对象. 然后将Request中的请求内容原封不动的设置到Response中, 再将Response设置到Future中返回. 需要最后一步的原因是apply方法的返回值类型是 Future<Response> , 但是我们在这个方法中不需要进行异步操作, 所以可以直接使用 Future.value(response) 将对象包装成Future返回. 另外, 细心的你应该发现了一行比较碍眼的代码: Response.apply(Version.Http11$.MODULE$, Status.Ok()) , 其中Version的用法很古怪. 这是Java调用Scala伴生对象的副作用, Scala有一些语法和特性在Java中没有对应的概念, 这种情况下Java调用Scala的代码就会比较晦涩.

  3. 为了启动Service实例, 我们需要构造一个 com.twitter.finagle.ListeningServer . withLabel 设置服务名称, withTracer 设置监控信息, 这个等后面介绍zipkin的时候在解释. 最后指定端口启动服务.

现在来看echo应用的Client端代码, 打开 java-finagle-example/src/main/java/com/akkafun/finagle/Client.java :

import static scala.compat.java8.JFunction.*;  public class Client {      public static void main(String[] args) throws TimeoutException, InterruptedException {         Service<Request, Response> service = Http.client().                             //1                 withLabel("echo-client").                 withTracer(ZipkinTracer.mk("192.168.99.100",                     9410, DefaultStatsReceiver$.MODULE$, 1.0f)).                 newService("127.0.0.1:8081");          //create a "Greetings!" request.         Reader data = Reader$.MODULE$.fromStream(                                       //2             new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8)));         Request request = Request.apply(Version.Http11$.MODULE$,             Method.Post$.MODULE$, "/", data);          Future<Response> responseFuture = Await.ready(service.apply(request));          //3         responseFuture.onSuccess(func(response -> {                                     //4             System.out.println(String.format("response status: %s, response string: %s",                     response.status().toString(), response.contentString()));             return BoxedUnit.UNIT;         }));         responseFuture.onFailure(func(e -> {             System.out.println("error: " + e.toString());             return BoxedUnit.UNIT;         }));         responseFuture.ensure(func(() -> {             service.close();             //IDE may complain here, just ignore             return BoxedUnit.UNIT;         }));      } } 
  1. 这部分代码和我们之前的Server类代码很像. 在Server类中, 我们创建了一个Service实例并监听了8081端口, 现在客户端通过newService创建了一个Service的stub.

  2. 这部分代码用来构造一个消息内容为Greetings的Http请求.

  3. service.apply(request) 就是一次客户端到服务端的RPC调用. 这个调用的返回值是 Future<Response> .

    service.apply(request) 是一个异步操作, 主线程调用这个方法并不会阻塞, 有可能主线程退出了实际调用还没有完成. 所以这里就要用到 Await.ready 了. Await.ready 的作用是等待一个Future执行完成再返回, 是一个同步操作. 通过调用 Await.ready 我们就能将一个异步操作转化成一个同步操作.

  4. 接下来我们在Future上注册请求成功与失败的回调函数. 请求成功的回调函数中只是简单的打印出响应的消息内容.

    这里有个细节需要说明一下. Future的onSuccess方法需要传入一个Scala的函数特质: scala.Function1[Response, BoxedUnit] . 如果是Java6或7, 我们可以这样实现这个特质:

responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){     @Override     public BoxedUnit apply(Response response) {         System.out.println(String.format("response status: %s, response string: %s",                 response.status().toString(), response.contentString()));         return BoxedUnit.UNIT;     } }); 

在Java8中, 这种匿名类我们一般会使用Lambda代替, 理想情况下写法是这样:

responseFuture.onSuccess(response -> {     System.out.println(String.format("response status: %s, response string: %s",             response.status().toString(), response.contentString()));     return BoxedUnit.UNIT; }); 

可惜的是这种写法编译不会通过, 因为只有符合 FunctionalInterface 定义的接口才能使用Lambda表达式(什么是 FunctionalInterface , 请参考 Javadoc ), 而在Scala2.11中, scala.Function1 不是一个 FunctionalInterface (Scala2.12会兼容Java8). 为了在这里使用Lambda, 我们使用了 scala-java8-compat 这个库, 调用 scala.compat.java8.JFunction.func 方法将一个 FunctionalInterface 转化成 scala.Function1 .

可以看出, 在Java中调用Finagle的API不是很方便. 所以Finagle适合以Scala为主, Java为辅的项目. 如果项目全是Java, 则值得为Finagle主要的API写一层Java的适配层, 来屏蔽Java调用Scala代码会出现的一些晦涩代码.

现在我们启动服务端和客户端来看看运行结果. 首先启动Server类, 然后启动Client. Client运行完毕自动结束, 你应该能在Client的控制台看到如下输出:

response status: Status(200), response string: Greetings! 

Server控制台的输出:

request: Greetings! 

Http协议比较适合用于对外提供服务, 并且一般会使用REST. 在Finagle中使用REST可以使用 Finch 库. 这个库轻量小巧, API简单, 提供了一套很方便的对Http消息进行操作的DSL. 如果是内网服务调用, 一般推荐使用结构紧凑, 传输效率高的协议. 比如protocol buffer, thrift或Avro. Finagle对thrift有很好的支持, 下篇文章我将介绍在Finagle中如何开发thrift应用.

原文  http://skaka.me/blog/2016/05/01/finagle2/
正文到此结束
Loading...