经过前面三篇 Protobuf 相关文章的学习,相信大家已经对 Protobuf 有所掌握。前文说过, ProtoBuf 很适合做数据存储或 RPC 数据交换格式。可以用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
本节将介绍在 Java 中如何使用 gRPC 和 Protouf。gRpc 也是 Google 推出的 RPC 框架,由于师出同门,Protobuf 和 gRPC 可以非常容易的结合在一起,甚至于使用 Protobuf Maven 插件就可以自动生成 gRPC 代码。
如果你还没有看过前序文章,点击这里查看:
源码地址: https://github.com/jitwxs/blog_sample
本文给大家演示的例子是:根据年龄查询用户列表。
首先在 src/main
目录下(和 java 目录同级)创建 proto
文件夹,用于存放 .proto 文件,该目录是 Protobuf Maven 插件默认扫描的文件夹。
创建 sex_enum.proto
文件,用于表示性别枚举。
syntax = "proto3"; option java_package = "jit.wxs.demo.enums"; option java_outer_classname = "SexEnumProto"; enum SexEnum { INVALID = 0; MALE = 1; FEMALE = 2; }
创建 user.proto
文件,用于表示用户实体。
syntax = "proto3"; option java_package = "jit.wxs.demo.dto"; option java_outer_classname = "UserProto"; import "sex_enum.proto"; message User { int32 age = 1; // 年龄 string name = 2; // 姓名 SexEnum sex = 3; // 性别枚举 }
这两个文件内容都比较简单,简单介绍下:
syntax = "proto3"; option java_package option java_outer_classname import "sex_enum.proto";
下面创建 user_rpc.proto
文件,用于定义根据年龄查询用户列表的 RPC 接口。
syntax = "proto3"; option java_package = "jit.wxs.demo.rpc"; option java_outer_classname = "UserRPCProto"; import "user.proto"; message Request { int32 age = 1; } message Response { int32 code = 1; string msg = 2; repeated User user = 3; } service UserRPCService { rpc listByAge(Request) returns(Response); }
定义一个名为 UserRPCService
的 Service 服务,用于存放所有 RPC 接口。在其中定义一个名为 listByAge
的 RPC 接口,该接口的入参为 Request 和 Response。这里为了简单省事,直接将 Request 和 Response 定义在这个文件中了。
编辑 POM 文件如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jit.wxs</groupId> <artifactId>grpc</artifactId> <version>1.0-SNAPSHOT</version> <properties> <protobuf.version>3.11.0</protobuf.version> <grpc.version>1.26.0</grpc.version> </properties> <dependencies> <!-- Protobuf Dependency --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> <!-- gRPC Dependency Start --> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>${grpc.version}</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>${grpc.version}</version> </dependency> <!-- gRPC Dependency End --> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.2</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <!-- for protobuf --> <goal>compile</goal> <!-- for grpc --> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
os-maven-plugin
插件,用于获取当前运行系统信息。 protobuf-maven-plugin
插件,用于生成 Protobuf Java 类和 gRPC Java 类。
<protocArtifact>
:Protobuf 执行文件路径。 <pluginId>
+ <pluginArtifact>
:指定 gRPC 插件执行文件路径。 <goal>compile</goal>
:生成 Protobuf Java 类命令。 <goal>compile-custom</goal>
:生成 gRPC Java 类命令。 同时选中 protobuf:compile
和 protobuf:compile-custom
命令,右击 Run Maven Build 运行,即可得到生成的 Java 类,如下图所示。
将这些生成类从 target 目录 移动 到项目的 java 目录下即可。
注意我们之前写的 user_rpc.proto
文件,经过编译后得到了 UserRPCProto.java 和 UserRPCServuceGrpc.java 两个类。前者就是普通的 Protobuf 实体类,而后者就是我们定义的 gRPC 接口类。
首先我们创建 UserRPCServuceGrpc 这个接口类的实现类:
package jit.wxs.demo.rpc.impl; import io.grpc.stub.StreamObserver; import jit.wxs.demo.dto.UserProto; import jit.wxs.demo.enums.SexEnumProto; import jit.wxs.demo.rpc.UserRPCProto; import jit.wxs.demo.rpc.UserRPCServiceGrpc; /** * @author jitwxs * @date 2019年12月20日 0:53 */ public class UserRPCServiceImpl extends UserRPCServiceGrpc.UserRPCServiceImplBase { @Override public void listByAge(UserRPCProto.Request request, StreamObserver<UserRPCProto.Response> responseObserver) { System.out.println("Server receive request."); // 模拟业务逻辑 UserRPCProto.Response response = UserRPCProto.Response.newBuilder() .setCode(0) .setMsg("success") .addUser(UserProto.User.newBuilder() .setName("wangwu") .setAge(request.getAge()) .setSex(SexEnumProto.SexEnum.MALE)) .addUser(UserProto.User.newBuilder() .setName("limei") .setAge(request.getAge()) .setSex(SexEnumProto.SexEnum.FEMALE)) .build(); responseObserver.onNext(response); responseObserver.onCompleted(); } }
在 listByAge()
方法中编写当我们接收到 request 请求时的行为,并将结果放入 response 中。这里我模拟了下业务逻辑,返回了两个 age 等于请求参数的 age 的用户。
既然是 RPC 框架,那么就会有服务端和客户端。首先创建服务端:
package jit.wxs.demo.test; import io.grpc.Server; import io.grpc.ServerBuilder; import jit.wxs.demo.rpc.impl.UserRPCServiceImpl; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** * Grpc 服务端 * @author jitwxs * @date 2019年12月20日 1:03 */ public class UserGRPCServer { private static final Logger logger = Logger.getLogger(UserGRPCServer.class.getName()); private Server server; static final int PORT = 3333; public static void main(String[] args) throws IOException, InterruptedException { final UserGRPCServer server = new UserGRPCServer(); server.start(); server.blockUntilShutdown(); } private void start() throws IOException { server = ServerBuilder.forPort(PORT) .addService(new UserRPCServiceImpl()) .build() .start(); logger.info("Server started, listening on " + PORT); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // Use stderr here since the logger may have been reset by its JVM shutdown hook. System.err.println("*** shutting down gRPC server since JVM is shutting down"); try { UserGRPCServer.this.stop(); } catch (InterruptedException e) { e.printStackTrace(System.err); } System.err.println("*** server shut down"); } }); } private void stop() throws InterruptedException { if (server != null) { server.shutdown().awaitTermination(30, TimeUnit.SECONDS); } } /** * Await termination on the main thread since the grpc library uses daemon threads. */ private void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } }
start()
方法中,Server 服务绑定了 3333 端口,并将 UserRPCServiceImpl 这个 Service 添加到了 Server 中。
添加一个 Shutdown 的回调方法,在回调方法中,调用 stop()
方法停止 server 服务。
最后编写客户端去请求服务端。
package jit.wxs.demo.test; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.StatusRuntimeException; import jit.wxs.demo.rpc.UserRPCProto; import jit.wxs.demo.rpc.UserRPCServiceGrpc; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * Grpc 客户端 * @author jitwxs * @date 2019年12月20日 1:06 */ public class UserGRPCClient { private static final Logger logger = Logger.getLogger(UserGRPCClient.class.getName()); private final ManagedChannel channel; private final UserRPCServiceGrpc.UserRPCServiceBlockingStub blockingStub; public static void main(String[] args) throws Exception { UserGRPCClient client = new UserGRPCClient("localhost", UserGRPCServer.PORT); try { client.request(26); } finally { client.shutdown(); } } /** Construct client connecting to HelloWorld server at {@code host:port}. */ public UserGRPCClient(String host, int port) { this(ManagedChannelBuilder.forAddress(host, port) // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid needing certificates. .usePlaintext() .build()); } /** Construct client for accessing HelloWorld server using the existing channel. */ public UserGRPCClient(ManagedChannel channel) { this.channel = channel; blockingStub = UserRPCServiceGrpc.newBlockingStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } /** Send Request */ public void request(int age) { logger.info("Will try to query age = " + age + " ..."); UserRPCProto.Request request = UserRPCProto.Request.newBuilder().setAge(age).build(); UserRPCProto.Response response; try { response = blockingStub.listByAge(request); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); return; } logger.info("Response: " + response.toString()); } }
ManagedChannel request()
整个项目代码目录结构如下:
└─src └─main ├─java │ └─jit │ └─wxs │ └─demo │ ├─dto │ │ UserProto.java │ │ │ ├─enums │ │ SexEnumProto.java │ │ │ ├─rpc │ │ │ UserRPCProto.java │ │ │ UserRPCServiceGrpc.java │ │ │ │ │ └─impl │ │ UserRPCServiceImpl.java │ │ │ └─test │ UserGRPCClient.java │ UserGRPCServer.java │ ├─proto │ sex_enum.proto │ user.proto │ user_rpc.proto │ └─resources
首先启动 UserGRPCServer
服务端,然后启动 UserGRPCClient
客户端。
服务端首先接收到客户端请求,输出:
Server receive request.
随后客户端接收到服务端响应,输出:
Response: msg: "success" user { age: 26 name: "wangwu" sex: MALE } user { age: 26 name: "limei" sex: FEMALE }