前面我们看到了Java对象如何通过Encoder转换成ByteBuf对象的,那么现在问题来了,ByteBuf是如何写入到远程节点的呢,这就是本文要分析的内容。
在前面的ChannelPipeline分析中我们提到过HeadContext和TailContext两个特殊的ChannelHandlerContexty,而HeadContext就是位于最前方的ChannelOutboundHandler,它也是最终负责处理write、flush等outbound事件的处理器。
从HeadContext的write和flush方法实现可以看到,其委托给了所在channelPipeline的channel的unsafe来实现。
final class HeadContextextends AbstractChannelHandlerContext implementsChannelOutboundHandler,ChannelInboundHandler{ private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } ... @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) throws Exception { unsafe.flush(); }
AbstractNioChannel使用的AbstractNioUnsafe和AbstractUnsafe差别不大。我们注重看一下AbstractUnsafe类。首先这里声明了一个ChannelOutboundBuffer类,这个类是用来保存已经write的ByteBuf但是还没有flush的ByteBuf以及已经flush但是还没有写到远端的数据。
protected abstract class AbstractUnsafeimplements Unsafe{ private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); private RecvByteBufAllocator.Handle recvHandle; private boolean inFlush0;
ChannelOutboundBuffer由一个链表构成,链表的Entry中保存表示的消息、next指针等。
其中几个比较关键的变量
public final class ChannelOutboundBuffer{ private final Channel channel; // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) // // The Entry that is the first in the linked-list structure that was flushed private Entry flushedEntry; // The Entry which is the first unflushed in the linked-list structure private Entry unflushedEntry; // The Entry which represents the tail of the buffer private Entry tailEntry; // The number of flushed entries that are not written yet private int flushed; ...
再来看下AbstractUnsafe的write实现,最关键的地方是调用outboundBuffer.addMessage(msg, size, promise),这样将write的消息放到了outboundBuffer链表中。
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
如果当前channel的outbound没有创建或之前的都已经write完成,则会是下图所示状态。这个Entry本身是unflushedEntry和tailEntry
再加一个消息Entry后
应用层调用完write后不一定会立即进行flush,flush会进行系统调用是一个相对耗时的操作,所以有些优化会在channelReadComplete时来进行flush。
flush分为两步
@Override public final void flush(){ assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); }
addFlush会从unflushedEntry开始,如果之前flushedEntry指向空,则将flushEntry指向unflushedEntry,然后开始遍历到tailEntry并且记录flushed数量
最后将unflushedEntry设置为null
如果这个时候又调用了addMessage,则此时的结构是这样的
flushedEntry –> entry –> entry –> unflushedEntry –> entry –> tailEntry
public void addFlush(){ // There is no need to process all entries if there was already a flush before and no new messages // where added in the meantime. // // See https://github.com/netty/netty/issues/2577 Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }
@SuppressWarnings("deprecation") protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link#close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link#isActive()} , {@link#isOpen()} and {@link#isWritable()} * may still return {@codetrue} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally { inFlush0 = false; } }
doWrite方法是抽象的,需要各个AbstractChannel的具体类实现的。
NioSocketChannel实现如下
protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }
removeBytes负责从ChannelOutboundBuffer中的链表中删除已经真正写入完成的Entry
public void removeBytes(long writtenBytes){ for (;;) { Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }
public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry e.recycle(); return true; }
removeEntry判断当前flushed的Entry是否已经删除完了,如果删完了则设置flushedEntry为null并且如果已经删到了tailEntry
则把tailEntry和unflushedEntry也设置为null
否则将flushedEntry指向移到next
private void removeEntry(Entry e) { if (-- flushed == 0) { // processed everything flushedEntry = null; if (e == tailEntry) { tailEntry = null; unflushedEntry = null; } } else { flushedEntry = e.next; } }
至此,一个应用中的数据如何通过Encoder转换成ByteBuf,存储到ChannelOutboundBuffer,然后写入到SocketChannel中的过程已经清晰了。