转载

Netty源码分析-8-ChannelOutboundBuffer

前面我们看到了Java对象如何通过Encoder转换成ByteBuf对象的,那么现在问题来了,ByteBuf是如何写入到远程节点的呢,这就是本文要分析的内容。

在前面的ChannelPipeline分析中我们提到过HeadContext和TailContext两个特殊的ChannelHandlerContexty,而HeadContext就是位于最前方的ChannelOutboundHandler,它也是最终负责处理write、flush等outbound事件的处理器。

从HeadContext的write和flush方法实现可以看到,其委托给了所在channelPipeline的channel的unsafe来实现。

final class HeadContextextends AbstractChannelHandlerContext
implementsChannelOutboundHandler,ChannelInboundHandler{

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception {
            unsafe.flush();
        }

AbstractNioChannel使用的AbstractNioUnsafe和AbstractUnsafe差别不大。我们注重看一下AbstractUnsafe类。首先这里声明了一个ChannelOutboundBuffer类,这个类是用来保存已经write的ByteBuf但是还没有flush的ByteBuf以及已经flush但是还没有写到远端的数据。

protected abstract class AbstractUnsafeimplements Unsafe{

        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
        private RecvByteBufAllocator.Handle recvHandle;
        private boolean inFlush0;

ChannelOutboundBuffer由一个链表构成,链表的Entry中保存表示的消息、next指针等。

其中几个比较关键的变量

  1. flushedEntry表示从这个节点一直到unflushedEntry前的节点都是被标记为flushed的,但是还没有写入到channel中的
  2. unflushedEntry标记哪个节点开始还没有被flush
  3. tailEntry表示链表的最后一个元素
  4. flushed表示已经flush但是还没有write的Entry的数量
    public final class ChannelOutboundBuffer{
    
        private final Channel channel;
    
        // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
        //
        // The Entry that is the first in the linked-list structure that was flushed
        private Entry flushedEntry;
        // The Entry which is the first unflushed in the linked-list structure
        private Entry unflushedEntry;
        // The Entry which represents the tail of the buffer
        private Entry tailEntry;
        // The number of flushed entries that are not written yet
        private int flushed;
        ...
    

addMessage

再来看下AbstractUnsafe的write实现,最关键的地方是调用outboundBuffer.addMessage(msg, size, promise),这样将write的消息放到了outboundBuffer链表中。

@Override
  public final void write(Object msg, ChannelPromise promise) {
      assertEventLoop();

      ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
      if (outboundBuffer == null) {
          // If the outboundBuffer is null we know the channel was closed and so
          // need to fail the future right away. If it is not null the handling of the rest
          // will be done in flush0()
          // See https://github.com/netty/netty/issues/2362
          safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
          // release message now to prevent resource-leak
          ReferenceCountUtil.release(msg);
          return;
      }

      int size;
      try {
          msg = filterOutboundMessage(msg);
          size = pipeline.estimatorHandle().size(msg);
          if (size < 0) {
              size = 0;
          }
      } catch (Throwable t) {
          safeSetFailure(promise, t);
          ReferenceCountUtil.release(msg);
          return;
      }

      outboundBuffer.addMessage(msg, size, promise);
  }

如果当前channel的outbound没有创建或之前的都已经write完成,则会是下图所示状态。这个Entry本身是unflushedEntry和tailEntry

Netty源码分析-8-ChannelOutboundBuffer

再加一个消息Entry后

Netty源码分析-8-ChannelOutboundBuffer

应用层调用完write后不一定会立即进行flush,flush会进行系统调用是一个相对耗时的操作,所以有些优化会在channelReadComplete时来进行flush。

flush

flush分为两步

  1. 调用outboundBuffer.addFlush,设置flushedEntry,标识flushedEntry及之前的Entry都可以真正写入channel了
  2. 调用flush0,进行真正写入到channel里,这里有的实现会进行选择性的是否flush到channel,例如AbstractNioChannel里的AbstractNioUnsafe实现为现在是否在等待flush,如果等待状态则说明一定会flush,这里就不需要再调用了
    @Override
    public final void flush(){
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
    
        outboundBuffer.addFlush();
        flush0();
    }
    

addFlush

addFlush会从unflushedEntry开始,如果之前flushedEntry指向空,则将flushEntry指向unflushedEntry,然后开始遍历到tailEntry并且记录flushed数量

最后将unflushedEntry设置为null

如果这个时候又调用了addMessage,则此时的结构是这样的

flushedEntry –> entry –> entry –> unflushedEntry –> entry –> tailEntry

Netty源码分析-8-ChannelOutboundBuffer

public void addFlush(){
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
  • flush0中首先判断当前channel是否已经在flush了,如果是则跳过,否则设置inFlush0状态并继续
  • 做一些辅助判断,然后调用doWrite(outboundBuffer)
    @SuppressWarnings("deprecation")
    protected void flush0() {
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }
    
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }
    
        inFlush0 = true;
    
        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }
    
        try {
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                /**
    * Just call {@link#close(ChannelPromise, Throwable, boolean)} here which will take care of
    * failing all flushed messages and also ensure the actual close of the underlying transport
    * will happen before the promises are notified.
    *
    * This is needed as otherwise {@link#isActive()} , {@link#isOpen()} and {@link#isWritable()}
    * may still return {@codetrue} even if the channel should be closed as result of the exception.
    */
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }
    

NioSocketChannel.doWrite

doWrite方法是抽象的,需要各个AbstractChannel的具体类实现的。

NioSocketChannel实现如下

  1. 获取到真正的jdk的SocketChannel
  2. 在一个循环执行
  3. 如果没有可写的数据了,清除selectionKey的OP_WRITE并返回
  4. 取出ChannelOutboundBuffer中的ByteBuffer数组,调用SocketChannel.write方法写入到channel。
  5. 如果写入数量小于等于0,设置SelectionKey的OP_WRITE
  6. 否则调用ChannelOutboundBuffer方法
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            SocketChannel ch = javaChannel();
            int writeSpinCount = config().getWriteSpinCount();
            do {
                if (in.isEmpty()) {
                    // All written so clear OP_WRITE
                    clearOpWrite();
                    // Directly return here so incompleteWrite(...) is not called.
                    return;
                }
    
                // Ensure the pending writes are made of ByteBufs only.
                int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
                ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
                int nioBufferCnt = in.nioBufferCount();
    
                // Always us nioBuffers() to workaround data-corruption.
                // See https://github.com/netty/netty/issues/2761
                switch (nioBufferCnt) {
                    case 0:
                        // We have something else beside ByteBuffers to write so fallback to normal writes.
                        writeSpinCount -= doWrite0(in);
                        break;
                    case 1: {
                        // Only one ByteBuf so use non-gathering write
                        // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                        // to check if the total size of all the buffers is non-zero.
                        ByteBuffer buffer = nioBuffers[0];
                        int attemptedBytes = buffer.remaining();
                        final int localWrittenBytes = ch.write(buffer);
                        if (localWrittenBytes <= 0) {
                            incompleteWrite(true);
                            return;
                        }
                        adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                        in.removeBytes(localWrittenBytes);
                        --writeSpinCount;
                        break;
                    }
                    default: {
                        // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
                        // to check if the total size of all the buffers is non-zero.
                        // We limit the max amount to int above so cast is safe
                        long attemptedBytes = in.nioBufferSize();
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes <= 0) {
                            incompleteWrite(true);
                            return;
                        }
                        // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                        adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                                maxBytesPerGatheringWrite);
                        in.removeBytes(localWrittenBytes);
                        --writeSpinCount;
                        break;
                    }
                }
            } while (writeSpinCount > 0);
    
            incompleteWrite(writeSpinCount < 0);
        }
    

ChannelOutboundBuffer.removeBytes

removeBytes负责从ChannelOutboundBuffer中的链表中删除已经真正写入完成的Entry

  1. 获取flushedEntry,判断当前Entry的ByteBuf中可读数量如果小于了writtenBytes,说明已经写入了不止当前Entry数据,则调用remove删除当前Entry
  2. 否则则移动当前Entry的ByteBuf的readerIndex
  3. 将nioBuffer数组元素均设置为null
    public void removeBytes(long writtenBytes){
        for (;;) {
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }
    
            final ByteBuf buf = (ByteBuf) msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;
    
            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();
            } else { // readableBytes > writtenBytes
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
        clearNioBuffers();
    }
    
public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }

    // recycle the entry
    e.recycle();

    return true;
}

removeEntry判断当前flushed的Entry是否已经删除完了,如果删完了则设置flushedEntry为null并且如果已经删到了tailEntry

则把tailEntry和unflushedEntry也设置为null

否则将flushedEntry指向移到next

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        // processed everything
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

至此,一个应用中的数据如何通过Encoder转换成ByteBuf,存储到ChannelOutboundBuffer,然后写入到SocketChannel中的过程已经清晰了。

原文  https://liuzhengyang.github.io/2018/08/03/netty-8-channeloutboundbuffer/
正文到此结束
Loading...