创建一个快速开始的Maven项目,导入Netty4.0版本的依赖(我的JDK是1.8,官方建议1.6以上)。Netty依赖如下:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> 复制代码
这里使用了门面日志框架Slf4j,所以需要引入slf4j和日志实现框架logback。
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> 复制代码
先上代码,代码里已有详细注释,具体的介绍后面再说。
public class NettyDiscardServer { private static final Logger logger = LoggerFactory.getLogger(NettyDiscardServer.class); final int port; // 创建一个服务器端的启动器 ServerBootstrap sb = new ServerBootstrap(); NettyDiscardServer(int port) { this.port = port; } public void runServer() { // 创建反应器线程组 // 开启一个线程(其实就是创建一个反应器类的对象)用于父通道,负责新连接的监听 EventLoopGroup boss = new NioEventLoopGroup(1); // 使用不带参数的构造函数,默认线程数是CPU核数 * 2 // 开启一个线程池用于子通道,负责通道的IO事件的处理 EventLoopGroup workers = new NioEventLoopGroup(); try { // 设置反应器的线程组,一个父线程组,一个子线程组 sb.group(boss, workers) // 设置NIO类型的通道 .channel(NioServerSocketChannel.class) // 设置服务端监听的端口 .localAddress("127.0.0.1", port) // 设置通道的参数 // SO_KEEPALIVE指打开TCP心跳检测 .option(ChannelOption.SO_KEEPALIVE, true) // Netty的内存在堆外直接内存上分配,可避免字节缓冲区的二次拷贝 .option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) // 装配子通道流水线 .childHandler(new ChannelInitializer<SocketChannel>() { // 当有新连接到达时会创建一个通道 protected void initChannel(SocketChannel ch) { // 向子通道流水线添加第一个Handler处理器 ch.pipeline().addLast(new NettyDiscardHandler()); // 向子通道流水线添加第二个Handler处理器 ch.pipeline().addLast(new NettyDiscardHandler1()); } }); // 开始绑定服务器 // 调用sync同步方法阻塞直到绑定成功 ChannelFuture channel = sb.bind().sync(); logger.info("服务器启动成功,监听端口:" + channel.channel().localAddress()); // 服务器监听通道会一直等待 通道关闭的异步任务结束 ChannelFuture close = channel.channel().closeFuture(); // 同步关闭 close.sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 释放所有使用的线程,释放所有资源(包括关闭通道和选择器) boss.shutdownGracefully(); workers.shutdownGracefully(); } } public static void main(String[] args) { new NettyDiscardServer(8888).runServer(); } } 复制代码
在Netty的反应器模式中,所有的业务处理都在Handler处理器中完成。这里写一个简单的入站处理(读取消息不回复),以及演示简单的通道流水线处理。这里两个类我都是做为上面服务器类的内部类实现的。
// 继承netty默认实现的入站处理器,在内部实现自己的业务逻辑 static class NettyDiscardHandler extends ChannelInboundHandlerAdapter { // 读取缓冲区中的数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 将数据转为ByteBuf(类似Java NIO中的ByteBuffer)类来处理 ByteBuf buf = (ByteBuf) msg; try { logger.info("收到消息 : "); // 循环读取缓冲区中的字符 while (buf.isReadable()) { System.out.print((char) buf.readByte()); } System.out.println(); } finally { // 通过 通道处理器上下文 将消息传播到下一个处理器节点 ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 遇到异常时关闭连接 cause.printStackTrace(); ctx.close(); } } static class NettyDiscardHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info("我是第二个处理器"); // 抛弃消息 ReferenceCountUtil.release(msg); } } 复制代码
实现一个简单的想服务器发送消息的客户端。
public class NettyDiscardClient { public static void main(String[] args) throws IOException { SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888)); channel.configureBlocking(false); // 自旋,等待连接完成 while (!channel.finishConnect()); // 向服务端发送一个消息 ByteBuffer buf = ByteBuffer.allocate(1024); buf.put("hello world!".getBytes()); buf.flip(); channel.write(buf); channel.close(); } } 复制代码
运行服务器和客户端。其中可以看到消息被两个处理器处理了,这就是通道的流水线处理。
[main] INFO discardServer.NettyDiscardServer - 服务器启动成功,监听端口:/127.0.0.1:8888 [nioEventLoopGroup-3-1] INFO discardServer.NettyDiscardServer - 收到消息 : hello world! [nioEventLoopGroup-3-1] INFO discardServer.NettyDiscardServer - 我是第二个处理器 复制代码
Netty提供了一个非常便利的工厂类Bootstrap,可以用它来方便的完成netty组件的组装和配置。
在Netty中,将有接收关系的NioServerSocketChannel和NioSocketChannel,叫做父子通道。
在Netty中,一个EventLoop就是一个反应器。由于Netty是多线程版本的反应器模式,所以使用EventLoopGroup来实现多线程版本的反应器。
EventLoopGroup一般常用两个构造函数,一个是带参数的(用于指定线程数),一个是不带参数的。下面来看一下内部源码。
// 带参数,直接指定线程数 public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } // 不带参数,内部调用带参数的构造函数,传入的参数为0 public NioEventLoopGroup() { this(0); } 复制代码
对于无参的构造函数,传入的线程数居然是0,带着疑问继续往源码里钻。经过层层调用,最后找到了,是调用了父类MultithreadEventLoopGroup的构造函数。
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); } 复制代码
可以发现,如果传入的线程数为0,则把默认的DEFAULT_EVENT_LOOP_THREADS赋值给线程数nThreads。这个DEFAULT_EVENT_LOOP_THREADS是多少呢,在父类中找找看。
static { // 将CPU核数 * 2 作为默认线程数 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); // 如果日志开启了debug模式,则打印出开启的线程数 if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } 复制代码
通过源码可以发现,如果传入一个无参的构造函数,则默认开始CPU核数 * 2个EventLoop线程。
另外,开篇的服务器端代码中,创建了两个线程组实例。其中一个用于负责新连接的监听和连接,查询父通道的IO事件(这里用带参数的构造函数创建了一个线程)。还有一个线程组用于负责查询所有子通道的IO事件,并执行Handler处理器中的业务处理。
Bootstrap可以方便的组装和配置Netty的组件。下面介绍一下流程。
ServerBootstrap sb = new ServerBootstrap(); 复制代码
EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup workers = new NioEventLoopGroup(); sb.group(boss, workers); // 对应的方法原型 ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 复制代码
sb.channel(NioServerSocketChannel.class) // 对应的方法原型 B channel(Class<? extends C> channelClass) 复制代码
sb.localAddress("127.0.0.1", port) // 对应的方法原型 B localAddress(String inetHost, int inetPort) 复制代码
sb.option(ChannelOption.SO_KEEPALIVE, true) sb.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) // 对应的方法原型 B option(ChannelOption<T> option, T value) 复制代码
sb.childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyDiscardHandler()); ch.pipeline().addLast(new NettyDiscardHandler1()); } }); 复制代码
ChannelFuture channel = sb.bind().sync(); 复制代码
// 获取一个closeFuture实例 ChannelFuture close = channel.channel().closeFuture(); // 自我阻塞,直到通道关闭,就返回。 close.sync(); 复制代码
boss.shutdownGracefully(); workers.shutdownGracefully(); 复制代码
在Reactor模式中的IO处理流程图如下: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XWq76zbM-1575900038358)( i.loli.net/2019/12/09/… )]
Netty中没有使用Java NIO的Channel,而是实现了自己的Channel通道组件。由于Netty还能处理OIO,所以也封装了对应OIO通道组件。
但是在Netty中使用最多的还是NioSocketChannel(异步非阻塞TCP Socket传输通道),NioServerSocketChannel(异步非阻塞TCP Socket服务器端监听通道)。
其中有个好玩的通道就是EmbeddedChannel(嵌入式通道),这个通道主要是用于单元测试的(它可以让开发人员在开发中方便的模拟入站与出站的操作,而不进行底层实际的传输)。因为Netty实际开发中主要就是开发业务处理器,但是对于业务处理器的测试需要用到客户端和服务器的操作,每次测试都要开启两个程序测试很麻烦,所以就有了EmbeddedChannel。除了不进行传输之外,其它处理流程和真实的传输通道的是一样的。
其中主要的API如下:
名称 | 说明 |
---|---|
writeInbound (Object... msgs) | 向通道中写入inbound入站数据,模拟通道收到客户端的数据 |
readInbound() | 从EmbeddedChannel通道中读取入站数据,也就是返回入站处理器处理完的数据,如果没有数据,返回NULL |
writeOutbound (Object... msgs) | 向通道中写入outbound出站数据,模拟通道发送数据到客户端。 |
readOutbound() | 从EmbeddedChannel通道中读取出站数据,类似readInbound。 |
flush() | 结束EmbeddedChannel,它会调用通道的close方法,并释放所有数据的底层资源。 |
Netty中的反应器有多个实现类,对应于Channel通道。NioSocketChannel对应的反应器类是NioEventLoop。
NioEventLoop有两个重要成员,一个是Thread线程类的成员,一个是Java NIO选择器的成员。
一个NioEventLoop拥有一个Thread线程,负责一个Java NIO选择器的IO事件轮询。但是一个NioEventLoop反应器可以对应多个NioSocketChannel通道。
Netty中有两大类的Handler处理器。他们都继承与ChannelHandler接口。
Handler处理器的生命周期:添加处理器到流水线 -> 注册到流水线中 -> 通道激活后回调 -> 入站方法回调 -> 底层连接关闭 -> 移除注册 -> 从流水线中删除。
在Netty中通道和处理器实例的关系为一对多。这里就用到了通道流水线(ChannelPipeline)。一个通道(有一个pipeline成员)拥有一个通道流水线。
通道流水线ChannelPipeline基于责任链模式设计的,内部是一个双向链表结构,每一个节点对应一个ChannelHandlerContext处理器上下文对象(处理器添加到流水线时,会创建一个通道处理器上下文,里面包裹了一个ChannelHandler处理器对象,并关联着ChannelPipeline流水线。用于控制流水线上处理器的传播),可以支持动态的增加和删除处理器上下文对象。
在通道流水线中,IO事件按照既定的顺序处理。入站处理器的执行次序为从前往后,而出站处理器的次序为从后往前。