private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); //首先尝试从缓存中获取 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //从export中拿到之前的url 即为dubbo协议的url //创建 Invoker 为委托类对象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>( originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); //写入缓存 bounds.put(key, exporter); } } } return exporter; } 复制代码
如上代码,它先尝试从缓存中获取,如果没有则调用 protocol.export
去暴露。
在这里的 protocol
对象其实是一个自适应扩展类对象 Protocol$Adaptive
,我们调用它的 export
方法,它会根据协议名称获取对应的扩展实现类,在这里它是 DubboProtocol
。
不知诸位是否还有印象,我们在第二章节已经说过。通过 ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
这句代码获取到的其实是Wrapper包装类的对象, ProtocolListenerWrapper
ProtocolListenerWrapper.export
方法主要是获取服务暴露监听器,在服务暴露和取消服务暴露时可以获得通知。
public class ProtocolListenerWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if ("registry".equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //获取ExporterListener类型的扩展点加载器 ExtensionLoader<ExporterListener> extensionLoader = ExtensionLoader.getExtensionLoader(ExporterListener.class); //获取监听器 List<ExporterListener> activateExtension = extensionLoader. getActivateExtension(invoker.getUrl(), "exporter.listener"); //调用ProtocolFilterWrapper.export继续暴露 Exporter<T> export = protocol.export(invoker); List<ExporterListener> exporterListeners = Collections.unmodifiableList(activateExtension); //循环监听器 通知方法。返回ListenerExporterWrapper对象 ListenerExporterWrapper<T> listenerExporterWrapper = new ListenerExporterWrapper<>(export, exporterListeners); return listenerExporterWrapper; } } 复制代码
比如,我们可以创建一个自定义的监听器。
public class MyExporterListener1 implements ExporterListener { public void exported(Exporter<?> exporter) throws RpcException { System.out.println("111111111111111-------服务暴露"); } public void unexported(Exporter<?> exporter) { System.out.println("111111111111111-------取消服务暴露"); } } 复制代码
然后创建扩展点配置文件,文件名称为: org.apache.dubbo.rpc.ExporterListener
内容为: listener1=org.apache.dubbo.demo.provider.MyExporterListener1
然后在Dubbo配置文件中,这样定义: <dubbo:provider listener="listener1" />
那么,当服务暴露完成后,你将会获得通知。
上一步在 ProtocolListenerWrapper.export
方法中,返回之前还调用了 ProtocolFilterWrapper.export
。它主要是为了创建包含各种Filter的调用链。
public class ProtocolFilterWrapper implements Protocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //创建Filter 过滤链的 Invoker Invoker<T> tInvoker = buildInvokerChain(invoker, "service.filter","provider"); //调用DubboProtocol继续暴露 Exporter<T> export = protocol.export(tInvoker); //返回 return export; } } 复制代码
这里的重点是buildInvokerChain方法,它来创建调用链拦截器。每次远程方法执行,该拦截都会被执行,在Dubbo中已知的Filter有
org.apache.dubbo.rpc.filter.EchoFilter org.apache.dubbo.rpc.filter.GenericFilter org.apache.dubbo.rpc.filter.GenericImplFilter org.apache.dubbo.rpc.filter.TokenFilter org.apache.dubbo.rpc.filter.AccessLogFilter org.apache.dubbo.rpc.filter.CountFilter org.apache.dubbo.rpc.filter.ActiveLimitFilter org.apache.dubbo.rpc.filter.ClassLoaderFilter org.apache.dubbo.rpc.filter.ContextFilter org.apache.dubbo.rpc.filter.ConsumerContextFilter org.apache.dubbo.rpc.filter.ExceptionFilter org.apache.dubbo.rpc.filter.ExecuteLimitFilter org.apache.dubbo.rpc.filter.DeprecatedFilter 复制代码
此时的invoker经过各种Filter的包装,就变成了下面这个样子:
当然了,我们也可以自定义Filter。比如像下面这样:
public class MyFilter1 implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { System.out.println("调用之前:"+invoker.getUrl().toFullString()); Result result = invoker.invoke(invocation); System.out.println("调用之后:"+invoker.getUrl().toFullString()); return result; } } 复制代码
然后创建扩展点配置文件,文件名称为: resources/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter
内容为: myfilter1=org.apache.dubbo.demo.provider.MyFilter1
然后在Dubbo配置文件中,这样定义: <dubbo:provider filter="myfilter1"/>
需要注意的是,这样配置之后,myfilter1会在默认的Filter之后。如果你希望在默认的Filter前面,那么你可以这样配置 <dubbo:provider filter="myfilter1,default"/>
经过上面各种的搞来搞去,终于可以真正的暴露服务了。调用 DubboProtocol.export
,我们重点两部分:创建DubboExporter和启动服务器。
public class DubboProtocol extends AbstractProtocol { public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); //服务标识 //例如:com.viewscenes.netsupervisor.service.InfoUserService:20880 String key = serviceKey(url); //创建 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //将 <key, exporter> 键值对放入缓存中 exporterMap.put(key, exporter); //省略无关代码... // 启动通信服务器 openServer(url); //优化序列化 optimizeSerialization(url); return exporter; } } 复制代码
事实上,创建DubboExporter的过程非常简单,就是调用构造函数赋值而已。
public class DubboExporter<T> extends AbstractExporter<T> { public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; } } 复制代码
private void openServer(URL url) { //获取IP:端口 ,并将它作为服务器实例的key String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { //先从缓存中获取 ExchangeServer server = serverMap.get(key); if (server == null) { //创建服务器实例 serverMap.put(key, createServer(url)); } else { //重置服务器 server.reset(url); } } } 复制代码
如上代码,Dubbo先从缓存中获取已启动的服务器实例,未命中的话就去创建。如果已经存在服务器实例,就根据url的内容重置服务器。我们重点分析创建的过程。
private ExchangeServer createServer(URL url) { //服务器关闭时 发送readonly事件 url = url.addParameterIfAbsent("channel.readonly.sent","true"); //设置心跳检测 url = url.addParameterIfAbsent("heartbeat", "60000"); //获取服务器参数 默认为netty String str = url.getParameter("server","netty"); //通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)){ throw new RpcException("Unsupported server type: " + str + ", url: " + url); } //设置服务器编解码器为dubbo url = url.addParameter("codec", "dubbo"); ExchangeServer server; try { //创建ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader. getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; } 复制代码
上面的代码主要分为两部分:设置默认参数和创建服务器实例。设置参数没什么好说的,下面调用到 HeaderExchanger.bind
方法,它只是设置封装Handler处理器。
public class HeaderExchanger implements Exchanger { public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //封装Handler处理器 HeaderExchangeHandler headerExchangeHandler = new HeaderExchangeHandler(handler); DecodeHandler decodeHandler = new DecodeHandler(headerExchangeHandler); //创建服务器 Server bind = Transporters.bind(url, decodeHandler); //封装为HeaderExchangeServer对象返回 HeaderExchangeServer server = new HeaderExchangeServer(bind); return server; } } 复制代码
我们只需关注 Transporters.bind
,它负责启动服务器。
public class Transporters { public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //获取自适应 Transporter 实例 Transporter adaptiveExtension = ExtensionLoader. getExtensionLoader(Transporter.class).getAdaptiveExtension(); //调用NettyServer.bind return adaptiveExtension.bind(url, handler); } } 复制代码
如上代码,它首先获取自适应 Transporter 实例,即 TransporterAdaptive
。然后根据传入的url参数来加载哪个Transporter,在Dubbo中默认是NettyTransporter。需要注意的是,根据Dubbo版本的不同,有可能使用Netty的版本也不一样。
比如,笔者在Dubbo2.7快照版本中(还未发行),看到的Netty配置文件是这样,说明它默认使用的就是Netty4:
netty4=org.apache.dubbo.remoting.transport.netty4.NettyTransporter netty= org.apache.dubbo.remoting.transport.netty4.NettyTransporter 复制代码
在Dubbo2.6版本中,看到的Netty配置文件是这样,说明你只要不指定Netty4,那就使用Netty3
netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter 复制代码
不过这些都无伤大雅,我们以Netty3接着看....
public class NettyTransporter implements Transporter { public Server bind(URL url, ChannelHandler listener){ //创建 NettyServer return new NettyServer(url, listener); } } public class NettyServer extends AbstractServer implements Server { public NettyServer(URL url, ChannelHandler handler) { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, "DubboServerHandler"))); } } 复制代码
我们看到, 在 NettyTransporter.bind
方法里,它调用的是 NettyServer
构造函数,紧接着又调用父类的构造函数。
public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); //获取 ip 和端口 String bindIp = getUrl().getParameter("bind.ip", getUrl().getHost()); int bindPort = getUrl().getParameter("bind.port", getUrl().getPort()); if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) { // 设置 ip 为 0.0.0.0 bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter("accepts", 0); this.idleTimeout = url.getParameter("idle.timeout", 600000); try { //调用子类方法 开启服务器 doOpen(); } } } 复制代码
如上代码,在父类的构造函数里面主要是设置了一些参数,无需多说。接着我们再看子类的doOpen实现。
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); // 创建 boss 和 worker 线程池 // 设置线程的名称 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory( boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); //创建 ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // 设置 PipelineFactory bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // 绑定到指定的 ip 和端口上 channel = bootstrap.bind(getBindAddress()); } 复制代码
以上方法就通过Netty启动了通信服务器。熟悉Netty的朋友对这段代码一定不陌生,如果想了解更多,我们需要关注一下它的处理器。
ChannelHandler是Netty中的核心组件之一。在这里,Dubbo使用 NettyHandler
作为消息处理器。它继承自 SimpleChannelHandler
,这说明Netty接收到的事件都会由此类来处理。比如: 客户端连接、客户端断开连接、数据读取、网络异常...
我们重点来看数据读取方法。
@Sharable public class NettyHandler extends SimpleChannelHandler { public NettyHandler(URL url, ChannelHandler handler) { this.url = url; this.handler = handler; } //接收到消息 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } } 复制代码
当Netty的Selector轮询到数据读取事件后,将调用 messageReceived
方法。在这里,它调用的是 handler.received
,由构造函数可得知,此处的 handler
对象其实是 NettyServer
对象的实例。
中间它会经过 AllChannelHandler
,在这里会在线程池中分配一个线程去处理。
public class AllChannelHandler extends WrappedChannelHandler { public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } } 复制代码
ChannelEventRunnable实现Runnable接口,我们看它的run方法。其实也很简单,就是根据事件状态,继续往下调用。
public class ChannelEventRunnable implements Runnable { public void run() { switch (state) { case CONNECTED: try { handler.connected(channel); } break; case DISCONNECTED: try { handler.disconnected(channel); } break; case SENT: try { handler.sent(channel, message); } break; case RECEIVED: try { handler.received(channel, message); } break; case CAUGHT: try { handler.caught(channel, exception); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } } 复制代码
再深入的过程我想不必再深究了,无非是业务逻辑处理。不过还有另外一个问题,这个线程池是什么样的?大小多少呢? 通过跟踪,我们发现它是在其父类中被初始化的。它也是通过ExtensionLoader加载的
public class WrappedChannelHandler implements ChannelHandlerDelegate { protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; ExtensionLoader<ThreadPool> extensionLoader = ExtensionLoader.getExtensionLoader(ThreadPool.class); ThreadPool adaptiveExtension = extensionLoader.getAdaptiveExtension(); executor = (ExecutorService) adaptiveExtension.getExecutor(url); } } 复制代码
然后我们看 ThreadPool
接口标注了默认实现 @SPI("fixed")
,它是一个固定数量的线程池。
public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { //设置线程池参数 String name = url.getParameter("threadname", "Dubbo"); int threads = url.getParameter("threads", 200); int queues = url.getParameter("queues",0); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } } 复制代码
由此我们可以回答上面的问题了,Dubbo中的线程池是固定线程数量大小为200的线程池。如果线程池满了怎么办?我们再看下它的拒绝策略。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); dumpJStack(); throw new RejectedExecutionException(msg); } 复制代码
学到了吗?
到此,关于服务暴露的过程就分析完了。整个过程比较复杂,大家在分析的过程中耐心一些。并且多写 Demo 进行断点调试,以便能够更好的理解代码逻辑。
服务注册就是把已经暴露的服务信息注册到第三方平台,以供消费者使用。我们把目光回到 RegistryProtocol.export
方法,我们以zookeeper注册中心为例。
首先,需要根据配置文件的信息获取到注册中心的url,比如以zookeeper为例: zookeeper://192.168.139.131:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo_producer1&client=zkclient&dubbo=2.6.2......
我们直接来到 ZookeeperRegistry
,这里的重点是调用 connect
方法创建Zookeeper 客户端。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { //省略部分代码... //创建zookeeper客户端 zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { //重新连接事件 recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); } 复制代码
在这里有一点需要注意,Dubbo官网说,连接zookeeper缺省使用zkclient。
从 2.2.0
版本开始缺省为 zkclient 实现,以提升 zookeeper 客户端的健状性。
但从代码上看,它默认使用的是curator客户端。 @SPI("curator")
这一点比较费解,所以如果想使用zkclient,要在配置文件中指定: <dubbo:registry address="zookeeper://192.168.139.131:2181?client=zkclient"/>
然后我们接着往下继续看,最终调用zkclient的方法完成zookeeper客户端的创建。
public ZkclientZookeeperClient(URL url) { //异步调用ZkClient创建客户端 client = new ZkClientWrapper(url.getBackupAddress(), 30000); //监听zookeeper状态 client.addListener(new IZkStateListener() { @Override public void handleStateChanged(KeeperState state) throws Exception { ZkclientZookeeperClient.this.state = state; if (state == KeeperState.Disconnected) { stateChanged(StateListener.DISCONNECTED); } else if (state == KeeperState.SyncConnected) { stateChanged(StateListener.CONNECTED); } } @Override public void handleNewSession() throws Exception { stateChanged(StateListener.RECONNECTED); } }); client.start(); } 复制代码
创建节点很简单,就是将服务配置数据写入到 Zookeeper 的某个路径的节点下。
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } } 复制代码
我们看下zookeeper中已经创建好的节点信息:
至此,Dubbo中的服务暴露全过程我们已经分析完了。由于篇幅问题,笔者将它们分为了上下两篇。字数比较多,逻辑也较为复杂,如果文章有不妥错误之处,希望大家提出宝贵意见。
我们再回忆一下整个流程: