目标:介绍之后解读远程通讯模块的内容如何编排、介绍dubbo-remoting-api中的包结构设计以及最外层的的源码解析。
服务治理框架中可以大致分为服务通信和服务管理两个部分,前面我先讲到有关注册中心的内容,也就是服务管理,当然dubbo的服务管理还包括监控中心、 telnet 命令,它们起到的是人工的服务管理作用,这个后续再介绍。接下来我要讲解的就是跟服务通信有关的部分,也就是远程通讯模块。我在 《dubbo源码解析(一)Hello,Dubbo》 的"(六)dubbo-remoting——远程通信模块“中提到过一些内容。该模块中提供了多种客户端和服务端通信的功能,而在对NIO框架选型上,dubbo交由用户选择,它集成了mina、netty、grizzly等各类NIO框架来搭建NIO服务器和客户端,并且利用dubbo的SPI扩展机制可以让用户自定义选择。如果对SPI不太了解的朋友可以查看 《dubbo源码解析(二)Dubbo扩展机制SPI》 。
接下来我们先来看看dubbo-remoting的包结构:
我接下来解读远程通讯模块的内容并不是按照一个包一篇文章的编排,先来看看dubbo-remoting-api的包结构:
可以看到,大篇幅的逻辑在dubbo-remoting-api中,所以我对于dubbo-remoting-api的解读会分为下面五个部分来说明,其中第五点会在本文介绍,其他四点会分别用四篇文章来介绍:
为什么我要把一个api分成这么多文章来讲解,我们先来看看下面的图:
我们可以看到红框内的是远程通讯的框架,序列化我会在后面的主题中介绍,而Exchange层和Transport层在框架设计中起到了很重要的作用,也是支撑Remoting的核心,所以我要分开来介绍。
除了上述的五点外,根据惯例,我还是会分别介绍dubbo支持的实现客户端和服务端通信的七种方案,也就是说该远程通讯模块我会用12篇文章详细的讲解。
dubbo抽象出一个端的概念,也就是Endpoint接口,这个端就是一个点,而点对点之间是可以双向传输。在端的基础上在衍生出通道、客户端以及服务端的概念,也就是下面要介绍的Channel、Client、Server三个接口。在传输层,其实Client和Server的区别只是在语义上区别,并不区分请求和应答职责,在交换层客户端和服务端也是一个点,但是已经是有方向的点,所以区分了明确的请求和应答职责。两者都具备发送的能力,只是客户端和服务端所关注的事情不一样,这个在后面会分开介绍,而Endpoint接口抽象的方法就是它们共同拥有的方法。这也就是它们都能被抽象成端的原因。
来看一下它的源码:
public interface Endpoint { // 获得该端的url URL getUrl(); // 获得该端的通道处理器 ChannelHandler getChannelHandler(); // 获得该端的本地地址 InetSocketAddress getLocalAddress(); // 发送消息 void send(Object message) throws RemotingException; // 发送消息,sent是是否已经发送的标记 void send(Object message, boolean sent) throws RemotingException; // 关闭 void close(); // 优雅的关闭,也就是加入了等待时间 void close(int timeout); // 开始关闭 void startClose(); // 判断是否已经关闭 boolean isClosed(); } 复制代码
该接口是通道接口,通道是通讯的载体。还是用自动贩卖机的例子,自动贩卖机就好比是一个通道,消息发送端会往通道输入消息,而接收端会从通道读消息。并且接收端发现通道没有消息,就去做其他事情了,不会造成阻塞。所以channel可以读也可以写,并且可以异步读写。channel是client和server的传输桥梁。channel和client是一一对应的,也就是一个client对应一个channel,但是channel和server是多对一对关系,也就是一个server可以对应多个channel。
public interface Channel extends Endpoint { // 获得远程地址 InetSocketAddress getRemoteAddress(); // 判断通道是否连接 boolean isConnected(); // 判断是否有该key的值 boolean hasAttribute(String key); // 获得该key对应的值 Object getAttribute(String key); // 添加属性 void setAttribute(String key, Object value); // 移除属性 void removeAttribute(String key); } 复制代码
可以看到Channel继承了Endpoint,也就是端抽象出来的方法也同样是channel所需要的。上面的几个方法很好理解,我就不多介绍了。
@SPI public interface ChannelHandler { // 连接该通道 void connected(Channel channel) throws RemotingException; // 断开该通道 void disconnected(Channel channel) throws RemotingException; // 发送给这个通道消息 void sent(Channel channel, Object message) throws RemotingException; // 从这个通道内接收消息 void received(Channel channel, Object message) throws RemotingException; // 从这个通道内捕获异常 void caught(Channel channel, Throwable exception) throws RemotingException; } 复制代码
该接口是负责channel中的逻辑处理,并且可以看到这个接口有注解@SPI,是个可扩展接口,到时候都会在下面介绍各类NIO框架的时候会具体讲到它的实现类。
public interface Client extends Endpoint, Channel, Resetable { // 重连 void reconnect() throws RemotingException; // 重置,不推荐使用 @Deprecated void reset(com.alibaba.dubbo.common.Parameters parameters); } 复制代码
客户端接口,可以看到它继承了Endpoint、Channel和Resetable接口,继承Endpoint的原因上面我已经提到过了,客户端和服务端其实只是语义上的不同,客户端就是一个点。继承Channel是因为客户端跟通道是一一对应的,所以做了这样的设计,还继承了Resetable接口是为了实现reset方法,该方法,不过已经打上@Deprecated注解,不推荐使用。除了这些客户端就只需要关注一个重连的操作。
这里插播一个公共模块下的接口Resetable:
public interface Resetable { // 用于根据新传入的 url 属性,重置自己内部的一些属性 void reset(URL url); } 复制代码
该方法就是根据新的url来重置内部的属性。
public interface Server extends Endpoint, Resetable { // 判断是否绑定到本地端口,也就是该服务器是否启动成功,能够连接、接收消息,提供服务。 boolean isBound(); // 获得连接该服务器的通道 Collection<Channel> getChannels(); // 通过远程地址获得该地址对应的通道 Channel getChannel(InetSocketAddress remoteAddress); @Deprecated void reset(com.alibaba.dubbo.common.Parameters parameters); } 复制代码
该接口是服务端接口,继承了Endpoint和Resetable,继承Endpoint是因为服务端也是一个点,继承Resetable接口是为了继承reset方法。除了这些以外,服务端独有的是检测是否启动成功,还有事获得连接该服务器上所有通道,这里获得所有通道其实就意味着获得了所有连接该服务器的客户端,因为客户端和通道是一一对应的。
这两个都是编解码器,那么什么叫做编解码器,在网络中只是讲数据看成是原始的字节序列,但是我们的应用程序会把这些字节组织成有意义的信息,那么网络字节流和数据间的转化就是很常见的任务。而编码器是讲应用程序的数据转化为网络格式,解码器则是讲网络格式转化为应用程序,同时具备这两种功能的单一组件就叫编解码器。在dubbo中Codec是老的编解码器接口,而Codec2是新的编解码器接口,并且dubbo已经用CodecAdapter把Codec适配成Codec2了。所以在这里我就介绍Codec2接口,毕竟人总要往前看。
@SPI public interface Codec2 { //编码 @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException; //解码 @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, ChannelBuffer buffer) throws IOException; enum DecodeResult { // 需要更多输入和忽略一些输入 NEED_MORE_INPUT, SKIP_SOME_INPUT } } 复制代码
因为是编解码器,所以有两个方法分别是编码和解码,上述有以下几个关注的:
public interface Decodeable { //解码 public void decode() throws Exception; } 复制代码
该接口是可解码的接口,该接口有两个作用,第一个是在调用真正的decode方法实现的时候会有一些校验,判断是否可以解码,并且对解码失败会有一些消息设置;第二个是被用来message核对用的。后面看具体的实现会更了解该接口的作用。
@SPI(AllDispatcher.NAME) public interface Dispatcher { // 调度 @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // The last two parameters are reserved for compatibility with the old configuration ChannelHandler dispatch(ChannelHandler handler, URL url); } 复制代码
该接口是调度器接口,dispatch是线程池的调度方法,这边有几个注意点:
@SPI("netty") public interface Transporter { // 绑定一个服务器 @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; // 连接一个服务器,即创建一个客户端 @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; } 复制代码
该接口是网络传输接口,有以下几个注意点:
public class Transporters { static { // check duplicate jar package // 检查重复的 jar 包 Version.checkDuplicate(Transporters.class); Version.checkDuplicate(RemotingException.class); } private Transporters() { } public static Server bind(String url, ChannelHandler... handler) throws RemotingException { return bind(URL.valueOf(url), handler); } public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; // 创建handler if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 调用Transporter的实现类对象的bind方法。 // 例如实现NettyTransporter,则调用NettyTransporter的connect,并且返回相应的server return getTransporter().bind(url, handler); } public static Client connect(String url, ChannelHandler... handler) throws RemotingException { return connect(URL.valueOf(url), handler); } public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 调用Transporter的实现类对象的connect方法。 // 例如实现NettyTransporter,则调用NettyTransporter的connect,并且返回相应的client return getTransporter().connect(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } } 复制代码
这三个类是远程通信的异常类:
为了不影响篇幅,这三个类源码我就不介绍了,因为比较简单。