在源码分析 Dubbo 通讯篇之网络核心类一文中已给出 Dubbo netty client 的启动流程,如下图:
以 Dubbo 协议为例,DubboProtocol#refer 中,在创建 Invoker 时,通过 getClient 方法,开始 Client(连接的)创建过程,先重点看一下:
private ExchangeClient[] getClients(URL url) { // @1 // whether to share connection boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // @2 // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; // @3 for (int i = 0; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); // @4 } else { clients[i] = initClient(url); // @5 } } return clients; } 复制代码
代码@1:参数 URL ,服务提供者 URL。
代码@2:获取 <dubbo:reference connections = "" />,默认0表示客户端对同一个服务提供者的所有服务,使用共享一个连接,如果该值有设置,则使用非共享的客户端,所谓的共享客户端,以 Netty 为例,也即客户端对同一服务提供者发起的不同服务,使用同一个客户端 ( NettyClient)进行请求的发送与接收。
代码@3,根据 connections,创建 ExchangeClients 数组。
代码@4:如果使用共享连接,则调用 getSharedClient 获取共享连接,如果客户端未建立,则创建客户端。
代码@5,如果不使用共享连接,调用 initClient 创建客户端,其创建时序图如上图所示。 接下来,还是以 Netty4 为例,探究一下 Dubbo NettyClient 的创建细节。
首先从全貌上大概看一下 NettyClient 对象所持有的属性:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { // @1 super(url, wrapChannelHandler(url, handler)); // @2 } 复制代码
代码@1:url:服务提供者URL;ChannelHandler handler:事件处理器。
代码@2:wrapChannelHandler在讲解NettyServer时已重点分析,构造其事件转发模型(Dispatch)。
接下来重点分析其父类的构造方法:
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // @1 send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); // @2 try { doOpen(); // @3 } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } try { // connect. connect(); // @4 if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { if (url.getParameter(Constants.CHECK_KEY, true)) { close(); throw t; } else { logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); } } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null, "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) // @5 .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); } 复制代码
代码@1:调用父类的构造其,初始化url、ChannelHandler。 代码@2:初始化send_reconnect 、shutdown_timeout、reconnect_warning_period(默认1小时打印一次日志) 代码@3:调用doOpen初始化客户端调用模型,后续重点分析。 代码@4:调用connect方法,向服务端发起TCP连接。 代码@5:获取线程池,并从缓存中移除。 1.2 doOpen
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // @1 bootstrap = new Bootstrap(); // @2 bootstrap.group(nioEventLoopGroup) // @3 .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(NioSocketChannel.class); if (getTimeout() < 3000) { // @4 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()); } bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyClientHandler); // @5 } }); } 复制代码
代码@1:创建 NettyClientHandler。
代码@2:创建 Netty 客户端启动实例 bootstrap.
代码@3:客户端绑定IO线程组(池),注意,一个 JVM 中所有的 NettyClient 共享其 IO 线程。
代码@4:设置连接超时时间,最小连接超时时间为3s。
代码@5:设置编码器、事件连接器,当触发事件后,将调用nettyClientHandler中相关的方法。
protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); // @1 try { boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS); // @2 if (ret && future.isSuccess()) { Channel newChannel = future.channel(); try { // Close old channel Channel oldChannel = NettyClient.this.channel; // copy reference if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { if (NettyClient.this.isClosed()) { try { if (logger.isInfoEnabled()) { logger.info("Close new netty channel " + newChannel + ", because the client closed."); } newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.cause() != null) { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause()); } else { throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server " + getRemoteAddress() + " client-side timeout " + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()); } } finally { if (!isConnected()) { //future.cancel(true); } } } 复制代码
代码@1:调用bootstrap.connect方法发起TCP连接。
代码@2:future.awaitUninterruptibly,连接事件只等待3S,这里写成固定了,显然没有与doOpen方法中ChannelOption.CONNECT_TIMEOUT_MILLIS保持一致。
关于 NettyClient 的介绍就将到这里了,下一篇将会分析编码解码。
作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号: 中间件兴趣圈 目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入 中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。