前置文章:
前一篇文章简单介绍了通过动态代理完成了 Client
端契约接口调用转换为发送 RPC
协议请求的功能。这篇文章主要解决一个遗留的技术难题:请求-响应同步化处理。
需要的依赖如下:
JDK1.8+ Netty:4.1.44.Final SpringBoot:2.2.2.RELEASE
图中已经忽略了编码解码器和其他入站出站处理器,不同颜色的线程代表完全不相同的线程,不同线程之间的处理逻辑是完全异步,也就是 Netty IO
线程( n-l-g-1
)接收到 Server
端的消息并且解析完成的时候,用户调用线程( u-t-1
)无法感知到解析完毕的消息包,那么这里要做的事情就是让用户调用线程( u-t-1
)获取到 Netty IO
线程( n-l-g-1
)接收并且解析完成的消息包。
这里可以用一个简单的例子来说明模拟 Client
端调用线程等待 Netty IO
线程的处理结果再同步返回的过程。
@Slf4j public class NettyThreadSyncTest { @ToString private static class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); @Getter private final long timeoutMilliseconds; @Getter private final String requestId; @Setter @Getter private volatile boolean sendRequestSucceed = false; @Setter @Getter private volatile Throwable cause; @Getter private volatile Object response; private final CountDownLatch latch = new CountDownLatch(1); public ResponseFuture(String requestId, long timeoutMilliseconds) { this.requestId = requestId; this.timeoutMilliseconds = timeoutMilliseconds; } public boolean timeout() { return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds; } public Object waitResponse(final long timeoutMilliseconds) throws InterruptedException { latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS); return response; } public void putResponse(Object response) throws InterruptedException { this.response = response; latch.countDown(); } } static ExecutorService REQUEST_THREAD; static ExecutorService NETTY_IO_THREAD; static Callable<Object> REQUEST_TASK; static Runnable RESPONSE_TASK; static String processBusiness(String name) { return String.format("%s say hello!", name); } private static final Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap(); @BeforeClass public static void beforeClass() throws Exception { String requestId = UUID.randomUUID().toString(); String requestContent = "throwable"; REQUEST_TASK = () -> { try { // 3秒没有得到响应认为超时 ResponseFuture responseFuture = new ResponseFuture(requestId, 3000); RESPONSE_FUTURE_TABLE.put(requestId, responseFuture); // 这里忽略发送请求的操作,只打印日志和模拟耗时1秒 Thread.sleep(1000); log.info("发送请求成功,请求ID:{},请求内容:{}", requestId, requestContent); // 更新标记属性 responseFuture.setSendRequestSucceed(true); // 剩余2秒等待时间 - 这里只是粗略计算 return responseFuture.waitResponse(3000 - 1000); } catch (Exception e) { log.info("发送请求失败,请求ID:{},请求内容:{}", requestId, requestContent); throw new RuntimeException(e); } }; RESPONSE_TASK = () -> { String responseContent = processBusiness(requestContent); try { ResponseFuture responseFuture = RESPONSE_FUTURE_TABLE.get(requestId); if (null != responseFuture) { log.warn("处理响应成功,请求ID:{},响应内容:{}", requestId, responseContent); responseFuture.putResponse(responseContent); } else { log.warn("请求ID[{}]对应的ResponseFuture不存在,忽略处理", requestId); } } catch (Exception e) { log.info("处理响应失败,请求ID:{},响应内容:{}", requestId, responseContent); throw new RuntimeException(e); } }; REQUEST_THREAD = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "REQUEST_THREAD"); thread.setDaemon(true); return thread; }); NETTY_IO_THREAD = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "NETTY_IO_THREAD"); thread.setDaemon(true); return thread; }); } @Test public void testProcessSync() throws Exception { log.info("异步提交请求处理任务......"); Future<Object> future = REQUEST_THREAD.submit(REQUEST_TASK); // 模拟请求耗时 Thread.sleep(1500); log.info("异步提交响应处理任务......"); NETTY_IO_THREAD.execute(RESPONSE_TASK); // 这里可以设置超时 log.info("同步获取请求结果:{}", future.get()); Thread.sleep(Long.MAX_VALUE); } }
执行 testProcessSync()
方法,控制台输出如下:
2020-01-18 13:17:07 [main] INFO c.t.client.NettyThreadSyncTest - 异步提交请求处理任务...... 2020-01-18 13:17:08 [REQUEST_THREAD] INFO c.t.client.NettyThreadSyncTest - 发送请求成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,请求内容:throwable 2020-01-18 13:17:09 [main] INFO c.t.client.NettyThreadSyncTest - 异步提交响应处理任务...... 2020-01-18 13:17:09 [NETTY_IO_THREAD] WARN c.t.client.NettyThreadSyncTest - 处理响应成功,请求ID:71f47e27-c17c-458d-b271-4e74fad33a7b,响应内容:throwable say hello! 2020-01-18 13:17:09 [main] INFO c.t.client.NettyThreadSyncTest - 同步获取请求结果:throwable say hello!
上面这个例子里面的线程同步处理主要参考主流的 Netty
框架客户端部分的实现逻辑: RocketMQ
(具体是 NettyRemotingClient
类)以及 Redisson
(具体是 RedisExecutor
类),它们就是用这种方式使得异步线程处理转化为同步处理。
按照前面的例子,首先新增一个 ResponseFuture
用于承载已发送但未响应的请求:
@ToString public class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); @Getter private final long timeoutMilliseconds; @Getter private final String requestId; @Setter @Getter private volatile boolean sendRequestSucceed = false; @Setter @Getter private volatile Throwable cause; @Getter private volatile ResponseMessagePacket response; private final CountDownLatch latch = new CountDownLatch(1); public ResponseFuture(String requestId, long timeoutMilliseconds) { this.requestId = requestId; this.timeoutMilliseconds = timeoutMilliseconds; } public boolean timeout() { return System.currentTimeMillis() - beginTimestamp > timeoutMilliseconds; } public ResponseMessagePacket waitResponse(final long timeoutMilliseconds) throws InterruptedException { latch.await(timeoutMilliseconds, TimeUnit.MILLISECONDS); return response; } public void putResponse(ResponseMessagePacket response) throws InterruptedException { this.response = response; latch.countDown(); } }
接着需要新增一个 HashMap
去缓存这些返送成功但是未得到响应处理的 ResponseFuture
:
Map<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap();
这里的 KEY
选用 requestId
,而 requestId
之前已经定义为 UUID
,确保每个请求不会重复。为了简单起见,目前所有的逻辑都编写在契约代理工厂 ContractProxyFactory
,添加下面的功能:
sendRequestSync()
处理消息包的发送和同步响应, RequestMessagePacket
转换为调用代理目标方法返回值类型的逻辑暂时也编写在此方法中。 ResponseFuture
,清理方法为 scanResponseFutureTable()
。 修改后的 ContractProxyFactory
如下:
@Slf4j public class ContractProxyFactory { private static final RequestArgumentExtractor EXTRACTOR = new DefaultRequestArgumentExtractor(); private static final ConcurrentMap<Class<?>, Object> CACHE = Maps.newConcurrentMap(); static final ConcurrentMap<String /* request id */, ResponseFuture> RESPONSE_FUTURE_TABLE = Maps.newConcurrentMap(); // 定义请求的最大超时时间为3秒 private static final long REQUEST_TIMEOUT_MS = 3000; private static final ExecutorService EXECUTOR; private static final ScheduledExecutorService CLIENT_HOUSE_KEEPER; private static final Serializer SERIALIZER = FastJsonSerializer.X; @SuppressWarnings("unchecked") public static <T> T ofProxy(Class<T> interfaceKlass) { // 缓存契约接口的代理类实例 return (T) CACHE.computeIfAbsent(interfaceKlass, x -> Proxy.newProxyInstance(interfaceKlass.getClassLoader(), new Class[]{interfaceKlass}, (target, method, args) -> { RequestArgumentExtractInput input = new RequestArgumentExtractInput(); input.setInterfaceKlass(interfaceKlass); input.setMethod(method); RequestArgumentExtractOutput output = EXTRACTOR.extract(input); // 封装请求参数 RequestMessagePacket packet = new RequestMessagePacket(); packet.setMagicNumber(ProtocolConstant.MAGIC_NUMBER); packet.setVersion(ProtocolConstant.VERSION); packet.setSerialNumber(SerialNumberUtils.X.generateSerialNumber()); packet.setMessageType(MessageType.REQUEST); packet.setInterfaceName(output.getInterfaceName()); packet.setMethodName(output.getMethodName()); packet.setMethodArgumentSignatures(output.getMethodArgumentSignatures().toArray(new String[0])); packet.setMethodArguments(args); Channel channel = ClientChannelHolder.CHANNEL_REFERENCE.get(); return sendRequestSync(channel, packet, method.getReturnType()); })); } /** * 同步发送请求 * * @param channel channel * @param packet packet * @return Object */ static Object sendRequestSync(Channel channel, RequestMessagePacket packet, Class<?> returnType) { long beginTimestamp = System.currentTimeMillis(); ResponseFuture responseFuture = new ResponseFuture(packet.getSerialNumber(), REQUEST_TIMEOUT_MS); RESPONSE_FUTURE_TABLE.put(packet.getSerialNumber(), responseFuture); try { // 获取到承载响应Packet的Future Future<ResponseMessagePacket> packetFuture = EXECUTOR.submit(() -> { channel.writeAndFlush(packet).addListener((ChannelFutureListener) future -> responseFuture.setSendRequestSucceed(true)); return responseFuture.waitResponse(REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp)); }); ResponseMessagePacket responsePacket = packetFuture.get( REQUEST_TIMEOUT_MS - (System.currentTimeMillis() - beginTimestamp), TimeUnit.MILLISECONDS); if (null == responsePacket) { // 超时导致响应包获取失败 throw new SendRequestException(String.format("ResponseMessagePacket获取超时,请求ID:%s", packet.getSerialNumber())); } else { ByteBuf payload = (ByteBuf) responsePacket.getPayload(); byte[] bytes = ByteBufferUtils.X.readBytes(payload); return SERIALIZER.decode(bytes, returnType); } } catch (Exception e) { log.error("同步发送请求异常,请求包:{}", JSON.toJSONString(packet), e); if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new SendRequestException(e); } } } static void scanResponseFutureTable() { log.info("开始执行ResponseFutureTable清理任务......"); Iterator<Map.Entry<String, ResponseFuture>> iterator = RESPONSE_FUTURE_TABLE.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, ResponseFuture> entry = iterator.next(); ResponseFuture responseFuture = entry.getValue(); if (responseFuture.timeout()) { iterator.remove(); log.warn("移除过期的请求ResponseFuture,请求ID:{}", entry.getKey()); } } log.info("执行ResponseFutureTable清理任务结束......"); } static { int n = Runtime.getRuntime().availableProcessors(); EXECUTOR = new ThreadPoolExecutor(n * 2, n * 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), runnable -> { Thread thread = new Thread(runnable); thread.setDaemon(true); thread.setName("CLIENT_REQUEST_EXECUTOR"); return thread; }); CLIENT_HOUSE_KEEPER = new ScheduledThreadPoolExecutor(1, runnable -> { Thread thread = new Thread(runnable); thread.setDaemon(true); thread.setName("CLIENT_HOUSE_KEEPER"); return thread; }); CLIENT_HOUSE_KEEPER.scheduleWithFixedDelay(ContractProxyFactory::scanResponseFutureTable, 5, 5, TimeUnit.SECONDS); } }
接着添加一个客户端入站处理器,用于通过 reuqestId
匹配目标 ResponseFuture
实例,同时设置 ResponseFuture
实例中的 response
属性为响应包,同时释放闭锁:
@Slf4j public class ClientHandler extends SimpleChannelInboundHandler<ResponseMessagePacket> { @Override protected void channelRead0(ChannelHandlerContext ctx, ResponseMessagePacket packet) throws Exception { log.info("接收到响应包,内容:{}", JSON.toJSONString(packet)); ResponseFuture responseFuture = ContractProxyFactory.RESPONSE_FUTURE_TABLE.get(packet.getSerialNumber()); if (null != responseFuture) { responseFuture.putResponse(packet); } else { log.warn("接收响应包查询ResponseFuture不存在,请求ID:{}", packet.getSerialNumber()); } } }
最后,客户端启动类 ClientApplication
中添加 ClientHandler
到 Netty
的处理器流水线中即可:
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); ch.pipeline().addLast(new LengthFieldPrepender(4)); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new RequestMessagePacketEncoder(FastJsonSerializer.X)); ch.pipeline().addLast(new ResponseMessagePacketDecoder()); ch.pipeline().addLast(new ClientHandler()); } });
先运行之前- 《基于Netty和SpringBoot实现一个轻量级RPC框架-Server篇》 中编写好的 ServerApplication
,再启动 ClientApplication
,日志输出如下:
// 服务端 2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 服务端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 11, cap: 11/144)]) 2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 查找目标实现方法成功,目标类:club.throwable.server.contract.DefaultHelloService,宿主类:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello 2020-01-18 14:32:59 [nioEventLoopGroup-3-2] INFO club.throwable.server.ServerHandler - 服务端输出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"/"throwable say hello!/"","serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1} // 客户端 2020-01-18 14:32:59 [nioEventLoopGroup-2-1] INFO club.throwable.client.ClientHandler - 接收到响应包,内容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":{"contiguous":true,"direct":true,"readOnly":false,"readable":true,"writable":false},"serialNumber":"21d131d26fc74f91b4691e0207826b90","version":1} 2020-01-18 14:32:59 [main] INFO c.throwable.client.ClientApplication - HelloService[throwable]调用结果:"throwable say hello!" 2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] INFO c.t.client.ContractProxyFactory - 开始执行ResponseFutureTable清理任务...... 2020-01-18 14:33:04 [CLIENT_HOUSE_KEEPER] WARN c.t.client.ContractProxyFactory - 移除过期的请求ResponseFuture,请求ID:21d131d26fc74f91b4691e0207826b90
可见异步线程模型已经被改造为同步化,现在可以通过契约接口通过 RPC
同步调用服务端。
Client
端的请求-响应同步化处理基本改造完毕,到此为止,一个 RPC
框架大致已经完成,接下来会对 Client
端和 Server
端进行一些改造,让契约相关组件托管到 IOC
容器,实现契约接口自动注入等等功能。
Demo
项目地址:
(本文完e-a-20200118 c-2-d)