注:文章中使用的 Dubbo 源码版本为2.5.4
零、文章目录
- Dubbo的网络分层抽象
- Dubbo如何保证Client端与Server端的连通性
- Dubbo编解码协议--解决TCP粘包拆包问题
- Dubbo的请求响应模式,如何将异步IO变为同步RPC
- Dubbo线程模型总结
一、Dubbo的网络分层抽象
上图为Dubbo整体设计的分层抽象。
网络通信位于Remoting模块:
- Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上;
- Remoting 内部再划为 Transport 传输层 和 Exchange 信息交换层;
- Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输;
- Exchange 层是在传输层之上封装了 Request-Response 语义;
Transport 网络传输层:
- 抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec;
- Channel 为网络层通道的抽象,其具备单向消息传输的能力,且可以绑定和获取属性;
- Codec 为编解码协议的抽象,其定义了encode()和decode()方法,做网络byte流和通信消息体的转换;
- Client 为网络层客户端的抽象,其继承了Channel接口,同时定义了重连方法reconnect;
- Server 为网络层服务端的抽象,其具备单向消息传输消息的能力,且可以获取绑定在其上的所有网络通道Channel;
- Transporter 为网络传输层入口,提供了创建Server和Client的两个核心接口;
Exchange信息交换层:
- 封装请求响应模式,同步转异步。以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer;
- Request,Response 为请求响应模式消息体的封装,RPC请求响应信息的承载者;
- ExchangeChannel 为信息交换层通道的抽象,其继承了Channel接口,并添加了具有同步请求响应语义的request()方法;
- ExchangeClient 为信息交换层客户端的抽象,继承了ExchangeChannel接口,具有了发送同步RPC请求并处理响应的能力;
- ExchangeServer 为信息交换层服务端的抽象;
- Exchanger 为信息交换层入口,提供了创建ExchangeServer和ExchangeClient两个核心接口;
二、Dubbo如何保证Client端与Server端的连通性
Dubbo采用双向心跳的方式检测Client端与Server端的连通性。
2.1 心跳请求的发送
在信息交换层服务端实现类HeaderExchangeServer和客户端实现类HeaderExchangeClient中包含一个 心跳定时任务HeartBeatTask:
- HeaderExchangeServer和HeaderExchangeClient启动时会创建一个定时线程池执行心跳定时任务,关闭时会同时关闭该心跳定时任务;
- HeartBeatTask会循环检测Client或Server中绑定的网络通道Channel,将通道中 最近一次接收或发送消息 的时间与当前时刻做比较,如果两者相隔超过了一个 心跳周期 ,则主动构建并通过Channel 向对端发送一个 心跳请求消息;
2.2 心跳请求的接收及处理
-
在请求接收的Handler处理链路中,包含有一个 心跳消息处理器HeartbeatHandler:
-
对于所有类型的请求消息,该处理器都会更新对应通道中 最近一次接收消息 的时间;
-
对于心跳请求消息,该处理器接收心跳请求并构建对应的心跳响应通过通道Channel发送回去;
2.3 心跳超时的检测及处理
在信息交换层服务端实现类HeaderExchangeServer和客户端实现类HeaderExchangeClient中包含一个 心跳定时任务HeartBeatTask:
- HeartBeatTask会循环检测Client或Server中绑定的网络通道Channel,当发现通道中 最近一次接收消息 的时间与当前检测时间间隔超过 心跳超时时间 时,会触发心跳超时逻辑的执行;
- 对于服务端,心跳超时发生时,会调用channel.close()主动断连对应通道;
- 对于客户端,心跳超时发生时,会调用client.reconnect()执行网络通道重连逻辑。如果重连失败则不做处理等待下次心跳超时检测时再次触发重连逻辑;
三、Dubbo编解码协议,解决TCP粘包拆包问题
- Dubbo在TCP协议的基础上添加了自己的消息协议头,以进行Request、Response消息的编解码,解决TCP的粘包拆包问题。
- 粘包拆包问题的说明请见 netty学习系列八:拆包器
3.1 请求消息协议头说明
- 0-1byte:一个魔数数字MAGIC,就是一个固定的数字
- 2byte:请求的双向或单向标记
- 3byte:无
- 4-11byte:请求ID,long类型。异步变同步的全局唯一ID,用来做consumer和provider的来回通信标记
- 12-15byte:消息体长度,int类型。也就是消息头+请求数据的长度
3.2 响应消息协议头说明
- 0-1byte:一个魔数数字MAGIC,就是一个固定的数字
- 2byte:序列化组件类型,它用于和客户端约定序列化编码号
- 3byte:它是Response的结果响应码,例如OK=20
- 4-11byte:请求ID,long类型。异步变同步的全局唯一ID,用来做consumer和provider的来回通信标记
- 12-15byte:消息体长度,int类型。也就是消息头+请求数据的长度
3.3 粘包拆包的解决
基本原理就是不断从TCP缓冲区中读取数据,并将新读取到的数据向后追加到 本地消息缓存 中,然后进行解码处理:
- 如果当前本地消息缓存中不足以拼接成一个业务数据包,那就保留数据,继续从tcp缓冲区中读取数据;
- 如果当前本地消息缓存中能够拼接成一个业务数据包,那就将对应数据解码成一个完整的业务数据包并传递给业务逻辑处理,本地消息缓存中剩余的多余数据仍然保留,以便和下次读到的数据尝试拼接;
- TCP协议保证了TCP数据报的 不丢不重不乱序 ,这是Dubbo编解码方式的前提;
四、Dubbo的请求响应模式,如何将异步IO变为同步RPC
3.1 发起RPC调用请求的业务线程,是如何同步阻塞等待直到RPC响应返回的?
- 业务请求线程调用HeaderExchangeClient.request()方法发送RPC请求消息到网络,然后直接调用DefaultFuture.get()方法阻塞等待RPC执行结果;
- get()阻塞等待的本质:循环检测Response结果是否被设置成功,如果不成功使用Condition.await()阻塞直到结果返回;
- NettyClient接收到RPC响应消息时,会调用DefaultFuture.received()方法,该方法中触发了Condition.signal()通知业务请求线程解除阻塞等待状态;
3.2 对于全双工的网络通信,在多线程并发请求响应的情况下,如果找到RPC响应Response对应的RPC请求Request?
- 对于不同的服务消费者客户端,请求响应自然与其网络通道Channel绑定,不会存在消费者A接收到消费者B的RPC响应的情况;
- 对于同一服务消费者客户端,在RPC请求Request构建时生成并携带全局唯一自增ID,RPC响应Response会携带该ID返回。消费者客户端只需维护 “唯一ID与RPC请求的关系Map<Long, DefaultFuture> FUTURES”即可定位RPC响应对应的RPC调用上下文;
五、Dubbo线程模型总结
-
蓝色代表“发送RPC请求”过程,由 业务请求线程 执行,通过 NettyChannel
将请求数据放入 Netty
的IO任务队列后,构建 ResponseFuture
并返回。此时RPC请求发送及响应接收并未真正完成;
-
紫色是基于Netty的网络消息收发过程,通过当前网络通道绑定的 NioEventLoop
线程轮询完成;
-
橙色代表“接收RPC响应”过程,该过程在 Dubbo业务线程池 中执行,处理RPC响应消息并交由 ResponseFuture
触发接收响应的逻辑;
-
绿色代表“获取RPC调用结果”过程,由 业务请求线程 执行,阻塞直到从 ResponseFuture
中获取到RPC响应结果;
-
红色代表“接收并处理RPC请求”过程,在 Dubbo业务线程池 中执行。RPC请求消息由 HeaderExchangeHandler
处理,通过 DubboInvoker
反射执行 实际接口实现类 得到执行结果,并封装成 Response
交由网络通道发送RPC响应。
原文
https://xiaozhuanlan.com/topic/6238415790