是AbstractUnsafe使用的数据结构,用来存储待发送的数据。
在channel.unsafe实例化时,ChannelOutboundBuffer一起被初始化。每个channel都有一个自己的ChannelOutboundBuffer。
Channel channel --> 所绑定的Channel
Entry flushedEntry --> 表示下一个要被flush的Entry
Entry unflushedEntry --> 表示下一次要flush截止的Entry
Entry tailEntry
int flushed
int nioBufferCount
int nioBufferSize
long totalPendingSize --> 已存储的需要被write到socket发送缓存中的byte大小
int unwritable --> 表示当前channel的待发送缓存是否可以继续写入数据
ChannelOutboundBuffer中维护了节点元素为Entry的单向链表。
Entry为待发送数据的抽象,实际待发送数据保存在Entry的Object msg中。
AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的write(Object msg)方法,最终都会由HeadContext.write方法执行,最终交由AbstractUnsafe.write(Object msg,ChannelPromise promise)实现。
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
msg = filterOutboundMessage(msg);
内部调用AbstractNioByteChannel.filterOutboundMessage(Object msg)方法,如果msg是ByteBuf类型,则将其转换为DirectByteBuf的实现。(调用ByteBufAllocator.directBuffer(initialCapacity)分配一块直接缓存空间并将原msg中的字节流放入)
int size = pipeline.estimatorHandle().size(msg);
对于ByteBuf类型的msg,直接调用readableBytes()方法。
outboundBuffer.addMessage(msg, size, promise);
Entry entry = Entry.newInstance(msg, size, total(msg), promise); tailEntry = entry;
netty使用基于thread-local的轻量级对象池Recycler对Entry进行回收,避免多次实例化的垃圾回收和开销。
incrementPendingOutboundBytes(size, false);
若totalPendingSize超过了channel的高水位线:
-将unwritable状态更新为不可写;
-执行pipeline.fileChannelWritabilityChanged();
AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的flush()方法,都会由HeadContext.flush(ChannelHandlerContext ctx)方法执行,最终交由AbstractUnsafe.flush()实现。
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
outboundBuffer.addFlush();
将ChannelOutboundBuffer的unflushedEntry向后不断移动到tailEntry,操作结束后本次要flush的链表区间就是flushedEntry->unflushedEntry。
NioByteUnsafe.doWrite(ChannelOutboundBuffer in)
对Entry链表区间中的每个Entry.msg(即ByteBuf)执行以下逻辑--->
1)如果当前flushedEntry为空,则将OP_WRITE事件从对应Channel的interestOp中移除,跳出遍历直接到步骤5)
if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; }
2)采用类似自旋锁的逻辑不断调用NioSocketChannel.doWriteBytes(ByteBuf buf),将Entry.msg(即ByteBuf)中的数据写入套接字的发送缓冲区。
in.progress(flushedAmount);
in.remove();
<---对Entry链表区间中的每个Entry.msg(即ByteBuf)执行以上逻辑
incompleteWrite(setOpWrite);
若当前TCP发送缓冲区已满,则将OP_WRITE添加到ch.selectionKey.interestOps中,等待TCP发送缓冲队列可写时重新触发write操作;
若当前TCP发送缓冲区未满,构造一个flush()事件,等待EventLoop的下一个循环重新检测ChannelOutboundBuffer中有无待flush的数据。