在上篇文章中我介绍了Finagle中的Future/Service/Filter. 这篇文章里, 我们将构建一个基于Http协议的echo服务端和客户端, 下篇文章将构建一个基于thrift协议的客户端和服务端. 这两篇文章对应的源代码地址在 Github 上. 代码中有Java和Scala版本两套版本的实现, 但是这里我们只会介绍Java版本.
首先来看echo应用的Server端代码, 打开 java-finagle-example/src/main/java/com/akkafun/finagle/Server.java
:
public class Server extends Service<Request, Response> { //1 @Override public Future<Response> apply(Request request) { //2 System.out.println("request: " + request.getContentString()); Response response = Response.apply(Version.Http11$.MODULE$, Status.Ok()); response.setContentString(request.getContentString()); return Future.value(response); } public static void main(String[] args) throws Exception { Server service = new Server(); ListeningServer server = Http.server(). //3 withLabel("echo-server"). withTracer(ZipkinTracer.mk("192.168.99.100", 9410, DefaultStatsReceiver$.MODULE$, 1.0f)). serve(new InetSocketAddress(8081), service); Await.result(server); } }
在Finagle中, 实现一个RPC服务非常简单. 只需要继承Service抽象类, 实现它的apply方法. Service抽象类有两个类型参数, 第一个类型参数代表的是请求对象, 第二个类型参数代表的是返回对象. 这两个对象的具体类型与Service实现类使用的具体协议有关. 例如我们在echo服务中使用Http协议, 对应的Request类就是 com.twitter.finagle.http.Request
, 对应的Response类是 com.twitter.finagle.http.Response
. 如果是thrift协议, 则这两个类型参数在Service实现类中都是 scala.Array<scala.Byte>
(Array和Byte都是scala中的类, 对应Java中的数组与byte).
apply方法中, 我们首先使用Response的工厂方法构造一个Response对象. 然后将Request中的请求内容原封不动的设置到Response中, 再将Response设置到Future中返回. 需要最后一步的原因是apply方法的返回值类型是 Future<Response>
, 但是我们在这个方法中不需要进行异步操作, 所以可以直接使用 Future.value(response)
将对象包装成Future返回. 另外, 细心的你应该发现了一行比较碍眼的代码: Response.apply(Version.Http11$.MODULE$, Status.Ok())
, 其中Version的用法很古怪. 这是Java调用Scala伴生对象的副作用, Scala有一些语法和特性在Java中没有对应的概念, 这种情况下Java调用Scala的代码就会比较晦涩.
为了启动Service实例, 我们需要构造一个 com.twitter.finagle.ListeningServer
. withLabel
设置服务名称, withTracer
设置监控信息, 这个等后面介绍zipkin的时候在解释. 最后指定端口启动服务.
现在来看echo应用的Client端代码, 打开 java-finagle-example/src/main/java/com/akkafun/finagle/Client.java
:
import static scala.compat.java8.JFunction.*; public class Client { public static void main(String[] args) throws TimeoutException, InterruptedException { Service<Request, Response> service = Http.client(). //1 withLabel("echo-client"). withTracer(ZipkinTracer.mk("192.168.99.100", 9410, DefaultStatsReceiver$.MODULE$, 1.0f)). newService("127.0.0.1:8081"); //create a "Greetings!" request. Reader data = Reader$.MODULE$.fromStream( //2 new ByteArrayInputStream("Greetings!".getBytes(StandardCharsets.UTF_8))); Request request = Request.apply(Version.Http11$.MODULE$, Method.Post$.MODULE$, "/", data); Future<Response> responseFuture = Await.ready(service.apply(request)); //3 responseFuture.onSuccess(func(response -> { //4 System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; })); responseFuture.onFailure(func(e -> { System.out.println("error: " + e.toString()); return BoxedUnit.UNIT; })); responseFuture.ensure(func(() -> { service.close(); //IDE may complain here, just ignore return BoxedUnit.UNIT; })); } }
这部分代码和我们之前的Server类代码很像. 在Server类中, 我们创建了一个Service实例并监听了8081端口, 现在客户端通过newService创建了一个Service的stub.
这部分代码用来构造一个消息内容为Greetings的Http请求.
service.apply(request)
就是一次客户端到服务端的RPC调用. 这个调用的返回值是 Future<Response>
.
而 service.apply(request)
是一个异步操作, 主线程调用这个方法并不会阻塞, 有可能主线程退出了实际调用还没有完成. 所以这里就要用到 Await.ready
了. Await.ready
的作用是等待一个Future执行完成再返回, 是一个同步操作. 通过调用 Await.ready
我们就能将一个异步操作转化成一个同步操作.
接下来我们在Future上注册请求成功与失败的回调函数. 请求成功的回调函数中只是简单的打印出响应的消息内容.
这里有个细节需要说明一下. Future的onSuccess方法需要传入一个Scala的函数特质: scala.Function1[Response, BoxedUnit]
. 如果是Java6或7, 我们可以这样实现这个特质:
responseFuture.onSuccess(new AbstractFunction1<Response, BoxedUnit>(){ @Override public BoxedUnit apply(Response response) { System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; } });
在Java8中, 这种匿名类我们一般会使用Lambda代替, 理想情况下写法是这样:
responseFuture.onSuccess(response -> { System.out.println(String.format("response status: %s, response string: %s", response.status().toString(), response.contentString())); return BoxedUnit.UNIT; });
可惜的是这种写法编译不会通过, 因为只有符合 FunctionalInterface
定义的接口才能使用Lambda表达式(什么是 FunctionalInterface
, 请参考 Javadoc ), 而在Scala2.11中, scala.Function1
不是一个 FunctionalInterface
(Scala2.12会兼容Java8). 为了在这里使用Lambda, 我们使用了 scala-java8-compat 这个库, 调用 scala.compat.java8.JFunction.func
方法将一个 FunctionalInterface
转化成 scala.Function1
.
可以看出, 在Java中调用Finagle的API不是很方便. 所以Finagle适合以Scala为主, Java为辅的项目. 如果项目全是Java, 则值得为Finagle主要的API写一层Java的适配层, 来屏蔽Java调用Scala代码会出现的一些晦涩代码.
现在我们启动服务端和客户端来看看运行结果. 首先启动Server类, 然后启动Client. Client运行完毕自动结束, 你应该能在Client的控制台看到如下输出:
response status: Status(200), response string: Greetings!
Server控制台的输出:
request: Greetings!
Http协议比较适合用于对外提供服务, 并且一般会使用REST. 在Finagle中使用REST可以使用 Finch 库. 这个库轻量小巧, API简单, 提供了一套很方便的对Http消息进行操作的DSL. 如果是内网服务调用, 一般推荐使用结构紧凑, 传输效率高的协议. 比如protocol buffer, thrift或Avro. Finagle对thrift有很好的支持, 下篇文章我将介绍在Finagle中如何开发thrift应用.