ByteBufAllocator接口为ByteBuf分配器,用于分配新的ByteBuf存储IO数据。
接口定义了 ByteBuf ioBuffer(int initialCapacity) 方法,用于分配一个ByteBuf
实现了ByteBuf ioBuffer(int initialCapacity)方法,根据系统配置实际调用自己的抽象方法
ByteBuf newDirectBuffer(int initialCapacity,int maxCapacity); ByteBuf newHeapBuffer(int initialCapacity,int maxCapacity);
其子类UnpooledByteBufAllocator和PooledByteBufAllocator实现了上述抽象方法。
非池化的ByteBuf分配器,具体实现了抽象方法。
根据平台是否支持Unsafe方式,将实例化出一个UnpooledDirectByteBuf/UnpooledUnsafeDirectByteBuf。
UnpooledDirectByteBuf是一个基于NIO的Buffer,其内部持有一个NIO ByteBuffer buffer,并通过ByteBuffer.allocateDirect(initcapacity)方法进行实例化。
根据平台是否支持Unsafe方式,将实例化出一个UnpooledHeapByteBuf/UnpooledUnsafeHeapByteBuf。
UnpooledHeapByteBuf是一个基于java heap的Buffer,其内部直接在java heap中申请byte[] array空间进行IO数据存储。
RecvByteBufAllocator接口用于分配一块大小合理的buffer空间,存储Channel读入的IO数据。具体功能交由内部接口Handle定义。
interface Handle { /** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small * enough not to waste its space. */ ByteBuf allocate(ByteBufAllocator alloc); /** * Increment the number of messages that have been read for the current read loop. * numMessages The amount to increment by. */ void incMessagesRead(int numMessages); /** * Set the bytes that have been read for the last read operation. * This may be used to increment the number of bytes that have been read. * bytes The number of bytes from the previous read operation. This may be negative if an read error * occurs. If a negative value is seen it is expected to be return on the next call to * #lastBytesRead()}. A negative value will signal a termination condition enforced externally * to this class and is not required to be enforced in #continueReading()}. */ void lastBytesRead(int bytes); /** * Determine if the current read loop should should continue. * true} if the read loop should continue reading. false} if the read loop is complete. */ boolean continueReading(); /** * The read has completed. */ void readComplete(); }
allocate(ByteBufAllocator alloc)方法用于创建存放读入IO数据的ByteBuf。
readComplete()在读操作完成后调用,在实现类HandleImpl中执行record(int actualReadBytes)做了调整分配空间大小的逻辑。
接口RecvByteBufAllocator的实现类,能根据前一次实际读取的字节数量,自适应调整当前缓存分配的大小。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //上面省略... if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } //下面省略... }
NioEventLoop在处理其selector监听到的OP_READ事件时,会执行上面的代码逻辑,将OP_READ事件最终交由Unsafe处理,即执行NioByteUnsafe.read()方法。
public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
循环执行以下逻辑直到跳出--->
ByteBuf byteBuf = allocHandle.allocate(allocator);
doReadBytes(byteBuf)
实际调用了NioSocketChannel.doReadBytes(ByteBuf byteBuf)方法。
进一步调用ByteBuf.writeBytes(ScatteringByteChannel in, int length)方法。
最终底层调用nio的ReadableByteChannel.read(ByteBuffer dst)方法。
if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; }
若本次读取到的数据长度==0,表示本次OP_READ事件的数据已读取完毕,退出循环。
若本次读取到的数据长度<0,表示对端已断开socket连接,退出循环,执行NioByteUnsafe.closeOnRead()方法关闭Channel,关闭Channel的过程中执行了pipeline.fireChannelInactive()和pipeline.fireChannelUnregistered()。
pipeline.fireChannelRead(byteBuf); byteBuf = null;
<----循环结束。结束条件:a、localReadAmount == 0或-1,b、循环读取到ByteBuf个数超过指定阈值
allocHandle.readComplete();
pipeline.fireChannelReadComplete();