Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
Netty是一个异步事件驱动的网络应用程序框架 用于快速开发可维护的高性能协议服务器和客户端。
Dubbo RocketMQ Zookeeper Hadoop Spring-Cloud-Gateway 大数据、微服务、游戏、直播、IM、物联网等
Selector 对应多个 SelectableChannel SelectableChannel 对应一个 Selector SelectionKey 与 SelectableChannel 一一对应
SelectionKey是一个用来记录SelectableChannel和Selector之间关系的对象,它由SelectableChannel的register()方法返回,并存储在Selector的多个集合中。它不仅记录了两个对象的引用,还包含了SelectableChannel感兴趣的操作,即OP_READ,OP_WRITE,OP_ACCEPT和OP_CONNECT。
ChannelPipeline是ChannelHandler的编排管理容器,它内部维护了一个ChannelHandler的链表和迭代器,可以方便地实现ChannelHandler的查找、添加、替换和删除。消息经过ChannelPipeline在ChannelHandler中的处理流程
ChannelPipeline p = ...; p.addLast("1", new InboundHandlerA()); p.addLast("2", new InboundHandlerB()); p.addLast("3", new OutboundHandlerA()); p.addLast("4", new OutboundHandlerB()); p.addLast("5", new InboundOutboundHandlerX()); 复制代码
inbound event order : 1, 2, 5 outbound action order : 5, 4, 3
在实际业务场景中,ChannelPipeline通常需要添加如下几类ChannelHandler。
对于耗时的业务逻辑执行,例如访问数据库、中间件、第三方系统等,则需要切换到业务线程池中,避免阻塞Netty的NioEventLoop线程,导致消息无法接收和发送。
业务调用write方法后,经过ChannelPipeline职责链处理,消息被投递到发送缓冲区待发送,调用flush之后会执行真正的发送操作,底层通过调用Java NIO的SocketChannel进行非阻塞write操作,将消息发送到网络上
为了尽可能地提升性能,Netty采用了串行无锁化设计,在I/O线程内部进行串行操作,避免多线程竞争导致性能下降
ChannelOutboundBuffer是Netty的发送缓冲队列,它基于链表来管理待发送的消息
发送次数限制当SocketChannel无法一次将所有待发送的ByteBuf/ByteBuffer写入网络时,需要决定是注册SelectionKey.OP_WRITE在下一次Selector轮询时继续发送,还是在当前位置循环发送,等到所有消息都发送完成再返回。
消息发送高低水位控制当消息队列中积压的待发送消息总字节数到达高水位时,修改Channel的状态为不可写,调用ChannelPipeline发送通知事件fireChannelWritabilityChanged,业务可以监听该事件及时获取链路可写状态
利用Netty的高低水位机制,可以防止在发送队列处于高水位时继续发送消息,导致积压更严重,甚至发生内存泄漏。在业务中合理利用Netty的高低水位机制,可以提升系统的可靠性
假设客户端向服务端连续发送了两个数据包,用packet1和packet2来表示,那么服务端收到的数据可以分为三种: 第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象。
第二种情况,接收端只收到一个数据包,但是包含了发送端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于接收端来说很难处理。
第三种情况,这种情况有两种表现形式,如下图。接收端收到了两个数据包,但是这两个数据包要么是不完整的,要么就是多出来一块,这种情况即发生了拆包和粘包。这两种情况如果不加特殊处理,对于接收端同样是不好处理的。
发生TCP粘包、拆包主要是由于下面一些原因:
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { Object decoded = null; if (in.readableBytes() >= frameLength) { decoded = in.readRetainedSlice(frameLength); } if (decoded != null) { out.add(decoded); } } 复制代码
使用定界符来分隔消息,比如/r/n
使用消息中表示长度字段来定义消息的长度
计算framePayloadLength示例:
if (framePayloadLen1 == 126) { if (in.readableBytes() < 2) { return; } framePayloadLength = in.readUnsignedShort(); if (framePayloadLength < 126) { protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)"); return; } } else if (framePayloadLen1 == 127) { if (in.readableBytes() < 8) { return; } framePayloadLength = in.readLong(); if (framePayloadLength < 65536) { protocolViolation(ctx, in, "invalid data frame length (not using minimal length encoding)"); return; } } else { framePayloadLength = framePayloadLen1; } 复制代码
Netty提供了两个指针变量用于支持顺序读取read()和写入write()操作:readerIndex用于标识读取索引,writerIndex用于标识写入索引。两个位置指针将ByteBuf缓冲区划分为三部分.
顺序读写 read write调用ByteBuf的read操作时,从readerIndex处开始读取。从readerIndex到writerIndex的空间为可读的字节缓冲区;从writerIndex到capacity的空间为可写的字节缓冲区;从0到readerIndex的空间是已经读取的缓冲区,可以调用discard操作来重用这部分空间,以节约内存,防止ByteBuf不断扩张。
随机读写 get set可以随机指定读写的索引位置, ByteBuf会对其索引和长度等进行合法性校验,内部不会更改readerIndex和writerIndex,需要使用者自己控制。另外,set操作与write操作不同的是,它不支持动态扩展缓冲区,所以使用者必须保证当前的缓冲区可写的字节数大于需要写入的字节数,否则会抛出数组或者缓冲区越界异常。
Netty4开始引入了引用计数的特性,缓冲区的生命周期可由引用计数管理,当缓冲区不再有用时,可快速返回给对象池或者分配器用于再次分配,从而大大提高性能,进而保证请求的实时处理。
在代码中,可以使用retain()使引用计数增加1,使用release()使引用计数减少1,这两个方法都可以指定参数表示引用计数的增加值和减少值。当我们使用引用计数为0的对象时,将抛出异常。
谁负责释放对象通用的原则是:谁最后使用含有引用计数的对象,谁负责释放或销毁该对象。一般来说,有以下两种情况:
派生缓冲区通过duplicate(),slice()等等生成的派生缓冲区ByteBuf会共享原生缓冲区的内部存储区域。此外,派生缓冲区并没有自己独立的引用计数而需要共享原生缓冲区的引用计数。也就是说,当我们需要将派生缓冲区传入下一个组件时,一定要注意先调用retain()方法。
另外,实现ByteBufHolder接口的对象与派生缓冲区有类似的地方:共享所Hold缓冲区的引用计数,所以要注意对象的释放。在Netty,这样的对象包括DatagramPacket,HttpContent和WebSocketframe。
在API Gateway、RPC和流式处理框架中,请求和响应消息往往是“朝生熄灭”的,特别是频繁地申请和释放大块byte数组,会加重GC的负担及加大CPU资源占用率,通过内存池技术重用这些临时对象,可以降低GC频次和减少耗时,同时提升系统的吞吐量。
Netty的内存池整体上参照jemalloc实现
PooledArena:代表内存中一大块连续的区域,PoolArena由多个Chunk组成,每个Chunk由多个Page组成。为了提升并发性能,内存池中包含一组PooledArena。
一个PoolArena内存块是由两个SubPagePools(用来存储零碎内存)和多个ChunkList组成,两个SubpagePools数组分别为tinySubpagePools和smallSubpagePools。每个ChunkList里包含多个Chunk按照双向链表排列,每个Chunk里包含多个Page(默认2048个),每个Page(默认大小为8k字节)由多个Subpage组成。
每个ChunkList里包含的Chunk数量会动态变化,比如当该chunk的内存利用率变化时会向其它ChunkList里移动。
内存分配的入口是PooledByteBufAllocator.buffer,返回PooledByteBuf,总体分为两步
最终内存分配工作被委托给PoolArena(为了提高多线程并发,PoolArena是跟线程绑定的,不是一一对应关系)
该二叉树将PoolChunk分11层, 第一层为1个16M, 第二层为2个8MB,第三层为4个4MB的内存块, 直到第11层为2048个8KB的内存块, 8kb的内存块称之为page。
PoolChunk用memoryMap和depthMap来表示二叉树,其中memoryMap存放的是PoolSubpage的分配信息,depthMap存放的是二叉树的深度。depthMap初始化之后就不再变化,而memoryMap则随着PoolSubpage的分配而改变。初始化时,memoryMap和depthMap的取值相同,都是节点所在层高。
节点的分配情况有如下三种可能:
分配一个内存大小为 chunkSize/2^k时,则先在第K层找第一个未使用的节点(从左到右),举例:
节点被标记为被占用之后,依次向上遍历更新父节点,直到根节点。将父节点的memoryMap[id]位置信息修改为两个子节点中的较小值
对于不足一个pagesize大小的内存分配,则由PoolSubpage进行处理,每一个PoolSubpage都会与PoolChunk里面的一个叶子节点映射起来, 然后将PoolSubpage根据用户申请的ElementSize化成几等分, 之后只要再次申请ElementSize大小的内存, 将直接从这个PoolSubpage中分配 (添加到PoolArena对应的SubPagePools中)。
PoolThreadCache的内存来自于当前线程释放的
当PooledByteBuf引用计数减为0时,会触发调用deallocate Recycler.recycle 归还 ByteBuf PoolArena.free 归还 内存 (byte[])
由于JVM并没有意识到Netty实现的引用计数对象,它仍会将这些引用计数对象当做常规对象处理,也就意味着,当不为0的引用计数对象变得不可达时仍然会被GC自动回收。一旦被GC回收,引用计数对象将不再返回给创建它的对象池,这样便会造成内存泄露。
为了便于用户发现内存泄露,Netty提供了相应的检测机制并定义了四个检测级别:
堆内存是在JVM堆上分配的,由JVM负责进行垃圾回收 直接内存是直接在Native堆上分配,并不由JVM负责进行垃圾回收
创建和释放Direct Buffer的代价比Heap Buffer要高 Direct Buffer减少用户态和内核态之间的数据copy
对于涉及大量I/O的数据读写,建议使用Direct Buffer;而对于用于后端的业务消息编解码模块建议使用Heap Buffer
netty中的Zero-copy与传统意义的zero-copy不太一样,注意不要混淆:
在协议传输过程中,通常需要拆包、合并包,常见的做法就是通过System.arrayCopy来复制需要的数据,但这样需要付出内容复制的开销. Netty通过ByteBuf.slice和Unpooled.wrappedBuffer等方法拆分、合并Buffer,做到无需拷贝数据。
JDK 的 Future 对象,该接口的方法如下:
Netty 扩展了 JDK 的 Future 接口,扩展的方法如下:
Future 对象有两种状态尚未完成和已完成,其中已完成又有三种状态:成功、失败、用户取消。
Future 接口中的方法都是 getter 方法而没有 setter 方法,也就是说这样实现的 Future 子类的状态是不可变的,如果我们想要变化,Netty 提供的解决方法是:使用可写的 Future 即 Promise。
Promise 接口继承自 Future 接口,它提供的 setter 方法与常见的 setter 方法大为不同。Promise 从 Uncompleted 到 Completed 的状态转变有且只能有一次,也就是说 setSuccess 和 setFailure 方法最多只会成功一个,此外,在 setSuccess 和 setFailure 方法中会通知注册到其上的监听者。
Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }); 复制代码
shutdownGracefully仅仅通过自旋CAS更改内部state NioEventLoop的run方法内部在执行一轮I/O操作以及Task之后,会判断state的值,如果需要关闭则执行closeAll和confirmShutdown方法,执行完毕则退出循环(即I/O线程)
closeAll处理逻辑
confirmShutdown处理逻辑
最后NioEventLoop线程退出执行并关闭selector
名称 | 描述 | 参考值 |
---|---|---|
fs.file-max | 系统级别所有进程可以打开的文件描述符的数量限制 | 1000000 |
soft nofile | 可打开的文件描述符的最大数(超过会警告) | 1000000 |
hard nofile | 可打开的文件描述符的最大数(超过会报错) | 1000000 |
net.ipv4.tcp_tw_reuse | 允许将TIME-WAIT sockets重新用于新的TCP连接,默认为0,表示关闭 (只对客户端起作用,开启后客户端在1s内回收) | 1 |
net.ipv4.tcp_tw_recycle | 表示开启TCP连接中TIME-WAIT sockets的快速回收,默认为0,表示关闭 (挂在公网NAT后面的Server不建议开启,且仅在 timestamps 开启时有效) | 0 |
net.ipv4.tcp_timestamps | 开启TCP时间戳 (默认开启) | 1 |
net.ipv4.ip_local_port_range | 本地可用端口范围 (默认:32768到61000) | 1024 65000 |
net.ipv4.tcp_max_tw_buckets | 表示系统同时保持TIME_WAIT套接字的最大数量 | 262144 |
net.ipv4.tcp_fin_timeout | 停留在FIN_WAIT_2状态的最大时长 | 60 |
net.ipv4.tcp_max_syn_backlog | SYN backlog 队列的最大长度,默认1024 | 8192 |
net.core.somaxconn | listen backlog 队列的最大长度,默认128 | 262144 |
net.core.netdev_max_backlog | 网络接口接收数据包的速率比内核处理这些包的速率快时,允许发送到队列的数据包的最大数目,一般默认值为128 | 262144 |
net.ipv4.tcp_rmem | 每个TCP连接分配的读缓冲区内存大小 | 4096 87380 4194304 |
net.ipv4.tcp_wmem | 每个TCP连接分配的写缓冲区内存大小 | 4096 87380 4194304 |
net.ipv4.tcp_mem | 内核分配给TCP连接的内存,单位是page | 64608 1048576 2097152 |
net.ipv4.tcp_syncookies | 当出现SYN等待队列溢出时,启用cookies来处理,可防范少量SYN攻击,默认为0,表示关闭 | 1 |
net.ipv4.tcp_sack | TCP的选择性确认SCK机制,数据接收方通知发送方成功接收的数据包段,发送就可以 | 1 |
net.ipv4.tcp_windows_scaling | 是否要支持超过64KB的窗口 | 1 |
设置合理的线程数I/O工作线程数 默认值(CPU核数 * 2) 业务线程池
设置合理的心跳周期Netty通过IdleStateHandler来提供链路空闲检测机制