NettyServer整个类图如下:
首先从全貌上大概看一下NettyServer对象所持有的属性:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } 复制代码
直接调用父类的public AbstractServer(URL url, ChannelHandler handler)方法,从前面的文章中得知, ChannelHandlers.wrap方法会对ChannelHandler handler进行封装,主要是加入事件分发模式(Dispatch)。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); // @1 localAddress = getUrl().toInetSocketAddress(); // @2 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // @3 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); // @4 try { doOpen(); // @5 if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } 复制代码
代码@1:调用父类的构造方法,主要初始化AbstractPeer(channelHandler、url)和AbstractEndpoint(codec2、timeout、idleTimeout )
代码@2:根据URL中的host与端口,创建localAddress。
代码@3:如果配置了< dubbo:parameter key = "bind.ip" value = ""/> 与 < dubbo:parameter key = "bind.port" />,则用该IP与端口创建bindAddress,通常用于多网卡,如果未配置,bindAddress与 localAddress绑定的IP与端口一样。
代码@4:初始化accepts与idleTimeout ,这两个参数未被其他地方使用。
代码@5,调用doOpen方法,正式在相应端口建立网络监听。
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ServerBootstrap(); // @1 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // @2 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // @3 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // @4 channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) // @5 .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // @6 channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); } 复制代码
代码@1:创建Netty服务端启动帮助类ServerBootstrap.
代码@2:创建服务端Boss线程,线程名:.NettyServerBoss,主要负责客户端的连接事件,主从多Reactor线程模型中的主线程(连接事件)。
代码@3:创建服务端Work线程组,线程名:NettyServerWorker-序号,线程个数取自参数:iothreads,默认为(CPU核数+1)与32取小值,顾名思义,IO线程数,主要处理读写事件,编码、解码都在IO线程中完成。
代码@4:创建用户Handler,这里是NettyServerHandler。 代码@5:Netty启动的常规写法,关注如下内容:
addLast("decoder", adapter.getDecoder()) : 添加解码器 addLast("encoder", adapter.getEncoder()) :添加编码器 addLast("handler", nettyServerHandler) :添加业务Handler。 复制代码
这里简单介绍一下流程:
如果对Netty想深入学习的话,请移步到作者的 《源码分析Netty系列》
根据 Dubbo 服务端初始化流程,我们可知,Dubbo 为了封装各种不同的网络实现客户端(netty、mina)等,引入了 Exchangers 层,存在 ExchangeServer,其实现 Server 并内部持有具体的 Server 实现端,例如 NettyServer。
接下来,我们重点来关注一下 HeaderExchangeServer. 核心属性如下:
public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } startHeartbeatTimer(); } 复制代码
说明,主要是通过heartbeat参数设置心跳间隔,如果不配置,则不启动心跳检测。从上面看来HeaderExchangeServer内部持有Server,并封装了心跳的功能,在这里就不细细分析了。
作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号: 中间件兴趣圈 目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入 中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。