经过前面三篇 Protobuf 相关文章的学习,相信大家已经对 Protobuf 有所掌握。前文说过, ProtoBuf 很适合做数据存储或 RPC 数据交换格式。可以用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
本节将介绍在 Java 中如何使用 gRPC 和 Protouf。gRpc 也是 Google 推出的 RPC 框架,由于师出同门,Protobuf 和 gRPC 可以非常容易的结合在一起,甚至于使用 Protobuf Maven 插件就可以自动生成 gRPC 代码。
如果你还没有看过前序文章,点击这里查看:
源码地址: https://github.com/jitwxs/blog_sample
本文给大家演示的例子是:根据年龄查询用户列表。
首先在 src/main 目录下(和 java 目录同级)创建 proto 文件夹,用于存放 .proto 文件,该目录是 Protobuf Maven 插件默认扫描的文件夹。
创建 sex_enum.proto 文件,用于表示性别枚举。
syntax = "proto3";
option java_package = "jit.wxs.demo.enums";
option java_outer_classname = "SexEnumProto";
enum SexEnum {
INVALID = 0;
MALE = 1;
FEMALE = 2;
}
创建 user.proto 文件,用于表示用户实体。
syntax = "proto3";
option java_package = "jit.wxs.demo.dto";
option java_outer_classname = "UserProto";
import "sex_enum.proto";
message User {
int32 age = 1; // 年龄
string name = 2; // 姓名
SexEnum sex = 3; // 性别枚举
}
这两个文件内容都比较简单,简单介绍下:
syntax = "proto3"; option java_package option java_outer_classname import "sex_enum.proto";
下面创建 user_rpc.proto 文件,用于定义根据年龄查询用户列表的 RPC 接口。
syntax = "proto3";
option java_package = "jit.wxs.demo.rpc";
option java_outer_classname = "UserRPCProto";
import "user.proto";
message Request {
int32 age = 1;
}
message Response {
int32 code = 1;
string msg = 2;
repeated User user = 3;
}
service UserRPCService {
rpc listByAge(Request) returns(Response);
}
定义一个名为 UserRPCService 的 Service 服务,用于存放所有 RPC 接口。在其中定义一个名为 listByAge 的 RPC 接口,该接口的入参为 Request 和 Response。这里为了简单省事,直接将 Request 和 Response 定义在这个文件中了。
编辑 POM 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jit.wxs</groupId>
<artifactId>grpc</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<protobuf.version>3.11.0</protobuf.version>
<grpc.version>1.26.0</grpc.version>
</properties>
<dependencies>
<!-- Protobuf Dependency -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- gRPC Dependency Start -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- gRPC Dependency End -->
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<!-- for protobuf -->
<goal>compile</goal>
<!-- for grpc -->
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
os-maven-plugin 插件,用于获取当前运行系统信息。 protobuf-maven-plugin 插件,用于生成 Protobuf Java 类和 gRPC Java 类。
<protocArtifact> :Protobuf 执行文件路径。 <pluginId> + <pluginArtifact> :指定 gRPC 插件执行文件路径。 <goal>compile</goal> :生成 Protobuf Java 类命令。 <goal>compile-custom</goal> :生成 gRPC Java 类命令。 同时选中 protobuf:compile 和 protobuf:compile-custom 命令,右击 Run Maven Build 运行,即可得到生成的 Java 类,如下图所示。
将这些生成类从 target 目录 移动 到项目的 java 目录下即可。
注意我们之前写的 user_rpc.proto 文件,经过编译后得到了 UserRPCProto.java 和 UserRPCServuceGrpc.java 两个类。前者就是普通的 Protobuf 实体类,而后者就是我们定义的 gRPC 接口类。
首先我们创建 UserRPCServuceGrpc 这个接口类的实现类:
package jit.wxs.demo.rpc.impl;
import io.grpc.stub.StreamObserver;
import jit.wxs.demo.dto.UserProto;
import jit.wxs.demo.enums.SexEnumProto;
import jit.wxs.demo.rpc.UserRPCProto;
import jit.wxs.demo.rpc.UserRPCServiceGrpc;
/**
* @author jitwxs
* @date 2019年12月20日 0:53
*/
public class UserRPCServiceImpl extends UserRPCServiceGrpc.UserRPCServiceImplBase {
@Override
public void listByAge(UserRPCProto.Request request, StreamObserver<UserRPCProto.Response> responseObserver) {
System.out.println("Server receive request.");
// 模拟业务逻辑
UserRPCProto.Response response = UserRPCProto.Response.newBuilder()
.setCode(0)
.setMsg("success")
.addUser(UserProto.User.newBuilder()
.setName("wangwu")
.setAge(request.getAge())
.setSex(SexEnumProto.SexEnum.MALE))
.addUser(UserProto.User.newBuilder()
.setName("limei")
.setAge(request.getAge())
.setSex(SexEnumProto.SexEnum.FEMALE))
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
在 listByAge() 方法中编写当我们接收到 request 请求时的行为,并将结果放入 response 中。这里我模拟了下业务逻辑,返回了两个 age 等于请求参数的 age 的用户。
既然是 RPC 框架,那么就会有服务端和客户端。首先创建服务端:
package jit.wxs.demo.test;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import jit.wxs.demo.rpc.impl.UserRPCServiceImpl;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* Grpc 服务端
* @author jitwxs
* @date 2019年12月20日 1:03
*/
public class UserGRPCServer {
private static final Logger logger = Logger.getLogger(UserGRPCServer.class.getName());
private Server server;
static final int PORT = 3333;
public static void main(String[] args) throws IOException, InterruptedException {
final UserGRPCServer server = new UserGRPCServer();
server.start();
server.blockUntilShutdown();
}
private void start() throws IOException {
server = ServerBuilder.forPort(PORT)
.addService(new UserRPCServiceImpl())
.build()
.start();
logger.info("Server started, listening on " + PORT);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
UserGRPCServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}
start() 方法中,Server 服务绑定了 3333 端口,并将 UserRPCServiceImpl 这个 Service 添加到了 Server 中。
添加一个 Shutdown 的回调方法,在回调方法中,调用 stop() 方法停止 server 服务。
最后编写客户端去请求服务端。
package jit.wxs.demo.test;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import jit.wxs.demo.rpc.UserRPCProto;
import jit.wxs.demo.rpc.UserRPCServiceGrpc;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Grpc 客户端
* @author jitwxs
* @date 2019年12月20日 1:06
*/
public class UserGRPCClient {
private static final Logger logger = Logger.getLogger(UserGRPCClient.class.getName());
private final ManagedChannel channel;
private final UserRPCServiceGrpc.UserRPCServiceBlockingStub blockingStub;
public static void main(String[] args) throws Exception {
UserGRPCClient client = new UserGRPCClient("localhost", UserGRPCServer.PORT);
try {
client.request(26);
} finally {
client.shutdown();
}
}
/** Construct client connecting to HelloWorld server at {@code host:port}. */
public UserGRPCClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid needing certificates.
.usePlaintext()
.build());
}
/** Construct client for accessing HelloWorld server using the existing channel. */
public UserGRPCClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = UserRPCServiceGrpc.newBlockingStub(channel);
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
/** Send Request */
public void request(int age) {
logger.info("Will try to query age = " + age + " ...");
UserRPCProto.Request request = UserRPCProto.Request.newBuilder().setAge(age).build();
UserRPCProto.Response response;
try {
response = blockingStub.listByAge(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Response: " + response.toString());
}
}
ManagedChannel request()
整个项目代码目录结构如下:
└─src
└─main
├─java
│ └─jit
│ └─wxs
│ └─demo
│ ├─dto
│ │ UserProto.java
│ │
│ ├─enums
│ │ SexEnumProto.java
│ │
│ ├─rpc
│ │ │ UserRPCProto.java
│ │ │ UserRPCServiceGrpc.java
│ │ │
│ │ └─impl
│ │ UserRPCServiceImpl.java
│ │
│ └─test
│ UserGRPCClient.java
│ UserGRPCServer.java
│
├─proto
│ sex_enum.proto
│ user.proto
│ user_rpc.proto
│
└─resources
首先启动 UserGRPCServer 服务端,然后启动 UserGRPCClient 客户端。
服务端首先接收到客户端请求,输出:
Server receive request.
随后客户端接收到服务端响应,输出:
Response: msg: "success"
user {
age: 26
name: "wangwu"
sex: MALE
}
user {
age: 26
name: "limei"
sex: FEMALE
}