转载

从netty-example分析Netty组件续

上文我们从netty-example的Discard服务器端示例分析了netty的组件,今天我们从另一个简单的示例Echo客户端分析一下上个示例中没有出现的netty组件。

1. 服务端的连接处理,读写处理

echo客户端代码:

 /**  * Sends one message when a connection is open and echoes back any received  * data to the server.  Simply put, the echo client initiates the ping-pong  * traffic between the echo client and server by sending the first message to  * the server.  */ public final class EchoClient {      static final boolean SSL = System.getProperty("ssl") != null;     static final String HOST = System.getProperty("host", "127.0.0.1");     static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));     static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));      public static void main(String[] args) throws Exception {         // Configure SSL.git         final SslContext sslCtx;         if (SSL) {             sslCtx = SslContextBuilder.forClient()                 .trustManager(InsecureTrustManagerFactory.INSTANCE).build();         } else {             sslCtx = null;         }          // Configure the client.         EventLoopGroup group = new NioEventLoopGroup();         try {             Bootstrap b = new Bootstrap();             b.group(group)              .channel(NioSocketChannel.class)              .option(ChannelOption.TCP_NODELAY, true)              .handler(new ChannelInitializer<SocketChannel>() {                  @Override                  public void initChannel(SocketChannel ch) throws Exception {                      ChannelPipeline p = ch.pipeline();                      if (sslCtx != null) {                          p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));                      }                      //p.addLast(new LoggingHandler(LogLevel.INFO));                      p.addLast(new EchoClientHandler());                  }              });              // Start the client.             ChannelFuture f = b.connect(HOST, PORT).sync();              // Wait until the connection is closed.             f.channel().closeFuture().sync();         } finally {             // Shut down the event loop to terminate all threads.             group.shutdownGracefully();         }     } } 

从上面的代码可以看出,discard的服务端代码和echo的客户端代码基本相似,不同的是一个使用ServerBootStrap,另一个使用BootStrap而已。先看一下连接过程

NioEventLoop处理key的过程,

  private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();         if (!k.isValid()) {             // close the channel if the key is not valid anymore             unsafe.close(unsafe.voidPromise());             return;         }          try {             int readyOps = k.readyOps();             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead             // to a spin loop             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                 unsafe.read();                 if (!ch.isOpen()) {                     // Connection already closed - no need to handle write.                     return;                 }             }             if ((readyOps & SelectionKey.OP_WRITE) != 0) {                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write                 ch.unsafe().forceFlush();             }             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking                 // See https://github.com/netty/netty/issues/924                 int ops = k.interestOps();                 ops &= ~SelectionKey.OP_CONNECT;                 k.interestOps(ops);                  unsafe.finishConnect();             }         } catch (CancelledKeyException ignored) {             unsafe.close(unsafe.voidPromise());         }     } 

2.1 连接流程

调用AbstractNioByteChannel的finishConnect()方法

         @Override         public final void finishConnect() {             // Note this method is invoked by the event loop only if the connection attempt was             // neither cancelled nor timed out.              assert eventLoop().inEventLoop();              try {                 boolean wasActive = isActive();                 doFinishConnect();                 fulfillConnectPromise(connectPromise, wasActive);             } catch (Throwable t) {                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));             } finally {                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used                 // See https://github.com/netty/netty/issues/1770                 if (connectTimeoutFuture != null) {                     connectTimeoutFuture.cancel(false);                 }                 connectPromise = null;             }         } 

触发channelActive操作:

         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {             if (promise == null) {                 // Closed via cancellation and the promise has been notified already.                 return;             }              // trySuccess() will return false if a user cancelled the connection attempt.             boolean promiseSet = promise.trySuccess();              // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,             // because what happened is what happened.             if (!wasActive && isActive()) {                 pipeline().fireChannelActive();             }              // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().             if (!promiseSet) {                 close(voidPromise());             }         } 

2.2 读操作流程

调用AbstractNioByteChannel的read()方法,

典型的autoRead流程如下:

1. 当socket建立连接时,Netty触发一个inbound事件channelActive,然后提交一个read()请求给本身(参考DefaultChannelPipeline.fireChannelActive())

2. 接收到read()请求后,Netty从socket读取消息。

3. 当读取到消息时,Netty触发channelRead()。

4. 当读取不到消息后,Netty触发ChannelReadCompleted().

5. Netty提交另外一个read()请求来继续从socket中读取消息。

 @Override         public final void read() {             final ChannelConfig config = config();             if (!config.isAutoRead() && !isReadPending()) {                 // ChannelConfig.setAutoRead(false) was called in the meantime                 removeReadOp();                 return;             }              final ChannelPipeline pipeline = pipeline();             final ByteBufAllocator allocator = config.getAllocator();             final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();             allocHandle.reset(config);              ByteBuf byteBuf = null;             try {                 boolean needReadPendingReset = true;                 do {                     byteBuf = allocHandle.allocate(allocator);                     allocHandle.lastBytesRead(doReadBytes(byteBuf));                     if (allocHandle.lastBytesRead() <= 0) {                         // nothing was read. release the buffer.                         byteBuf.release();                         byteBuf = null;                         break;                     }                      allocHandle.incMessagesRead(1);                     if (needReadPendingReset) {                         needReadPendingReset = false;                         setReadPending(false);                     }                     pipeline.fireChannelRead(byteBuf);                     byteBuf = null;                 } while (allocHandle.continueReading());                  allocHandle.readComplete();                 pipeline.fireChannelReadComplete();                  if (allocHandle.lastBytesRead() < 0) {                     closeOnRead(pipeline);                 }             } catch (Throwable t) {                 handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle);             } finally {                 // Check if there is a readPending which was not processed yet.                 // This could be for two reasons:                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method                 //                 // See https://github.com/netty/netty/issues/2254                 if (!config.isAutoRead() && !isReadPending()) {                     removeReadOp();                 }             }         }     } 

触发读操作

     @Override     public ChannelHandlerContext fireChannelRead(Object msg) {         AbstractChannelHandlerContext next = findContextInbound();         next.invoker().invokeChannelRead(next, pipeline.touch(msg, next));         return this;     } 

读完触发完成事件

     @Override     public ChannelPipeline fireChannelReadComplete() {         head.fireChannelReadComplete();         if (channel.config().isAutoRead()) {             read();         }         return this;     }     @Override     public ChannelHandlerContext fireChannelReadComplete() {         AbstractChannelHandlerContext next = findContextInbound();         next.invoker().invokeChannelReadComplete(next);         return this;     } 

2.3 写操作流程

写操作

  @SuppressWarnings("deprecation")         protected void flush0() {             if (inFlush0) {                 // Avoid re-entrance                 return;             }              final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;             if (outboundBuffer == null || outboundBuffer.isEmpty()) {                 return;             }              inFlush0 = true;              // Mark all pending write requests as failure if the channel is inactive.             if (!isActive()) {                 try {                     if (isOpen()) {                         outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);                     } else {                         // Do not trigger channelWritabilityChanged because the channel is closed already.                         outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);                     }                 } finally {                     inFlush0 = false;                 }                 return;             }              try {                 doWrite(outboundBuffer);             } catch (Throwable t) {                 if (t instanceof IOException && config().isAutoClose()) {                     /**                      * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of                      * failing all flushed messages and also ensure the actual close of the underlying transport                      * will happen before the promises are notified.                      *                      * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}                      * may still return {@code true} even if the channel should be closed as result of the exception.                      */                     close(voidPromise(), t, false);                 } else {                     outboundBuffer.failFlushed(t, true);                 }             } finally {                 inFlush0 = false;             }         } 

写操作具体实现(以NioSocketChannel为例):

  @Override     protected void doWrite(ChannelOutboundBuffer in) throws Exception {         for (;;) {             int size = in.size();             if (size == 0) {                 // All written so clear OP_WRITE                 clearOpWrite();                 break;             }             long writtenBytes = 0;             boolean done = false;             boolean setOpWrite = false;              // Ensure the pending writes are made of ByteBufs only.             ByteBuffer[] nioBuffers = in.nioBuffers();             int nioBufferCnt = in.nioBufferCount();             long expectedWrittenBytes = in.nioBufferSize();             SocketChannel ch = javaChannel();              // Always us nioBuffers() to workaround data-corruption.             // See https://github.com/netty/netty/issues/2761             switch (nioBufferCnt) {                 case 0:                     // We have something else beside ByteBuffers to write so fallback to normal writes.                     super.doWrite(in);                     return;                 case 1:                     // Only one ByteBuf so use non-gathering write                     ByteBuffer nioBuffer = nioBuffers[0];                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                         final int localWrittenBytes = ch.write(nioBuffer);                         if (localWrittenBytes == 0) {                             setOpWrite = true;                             break;                         }                         expectedWrittenBytes -= localWrittenBytes;                         writtenBytes += localWrittenBytes;                         if (expectedWrittenBytes == 0) {                             done = true;                             break;                         }                     }                     break;                 default:                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {                         final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);                         if (localWrittenBytes == 0) {                             setOpWrite = true;                             break;                         }                         expectedWrittenBytes -= localWrittenBytes;                         writtenBytes += localWrittenBytes;                         if (expectedWrittenBytes == 0) {                             done = true;                             break;                         }                     }                     break;             }              // Release the fully written buffers, and update the indexes of the partially written buffer.             in.removeBytes(writtenBytes);              if (!done) {                 // Did not write all buffers completely.                 incompleteWrite(setOpWrite);                 break;             }         }     }   

2. ChannelInboundHandler和ChannelInboundHandler

Echo的handler代码如下:

 /**  * Handler implementation for the echo client.  It initiates the ping-pong  * traffic between the echo client and server by sending the first message to  * the server.  */ public class EchoClientHandler extends ChannelInboundHandlerAdapter {      private final ByteBuf firstMessage;      /**      * Creates a client-side handler.      */     public EchoClientHandler() {         firstMessage = Unpooled.buffer(EchoClient.SIZE);         for (int i = 0; i < firstMessage.capacity(); i ++) {             firstMessage.writeByte((byte) i);         }     }      @Override     public void channelActive(ChannelHandlerContext ctx) {         ctx.writeAndFlush(firstMessage);     }      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) {         ctx.write(msg);     }      @Override     public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();     }      @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         // Close the connection when an exception is raised.         cause.printStackTrace();         ctx.close();     } 

上面的代码出现了两个重要的netty组件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已经讲到。我们这次重点分析一下 ChannelInboundHandlerAdapter及其相关类。

ChannelInboundHandlerAdapter继承了ChannelInboundHandler,它的作用是将operation转到ChannelPipeline中的下一个ChannelHandler。子类可以重写一个方法的实现来改变。注意:在方法#channelRead(ChannelHandlerContext, Object)自动返回前,message不会释放。若需要一个可以自动释放接收消息的ChannelInboundHandler实现时,请考虑SimpleChannelInboundHandler。

ChannelOutboundHandlerAdapter继承了ChannelOutboundHandler,它仅通过调用ChannelHandlerContext跳转到每个方法。

ChannelInboundHandler处理输入的事件,事件由外部事件源产生,例如从一个socket接收到数据。

ChannelOutboundHandler解析你自己应用提交的操作。

2.1 ChannelInboundHandler.channelActive()

从源码角度看一下,Netty触发一个inbound事件channelActive(以LoggingHandler为例):

    @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         if (logger.isEnabled(internalLevel)) {             logger.log(internalLevel, format(ctx, "ACTIVE"));         }         ctx.fireChannelActive();     } 

触发操作如下:

      @Override     public ChannelHandlerContext fireChannelActive() {         AbstractChannelHandlerContext next = findContextInbound();         next.invoker().invokeChannelActive(next);         return this;     }     private AbstractChannelHandlerContext findContextInbound() {         AbstractChannelHandlerContext ctx = this;         do {             ctx = ctx.next;         } while (!ctx.inbound);         return ctx;     } 

invokeChannelActive方法实现:

     @Override     public void invokeChannelActive(final ChannelHandlerContext ctx) {         if (executor.inEventLoop()) {             invokeChannelActiveNow(ctx);         } else {             executor.execute(new OneTimeTask() {                 @Override                 public void run() {                     invokeChannelActiveNow(ctx);                 }             });         }     }     public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {         try {             ((ChannelInboundHandler) ctx.handler()).channelActive(ctx);         } catch (Throwable t) {             notifyHandlerException(ctx, t);         }     } 

2.2 ChannelOutboundHandler.Read()

读的流程:

     @Override     public ChannelHandlerContext read() {         AbstractChannelHandlerContext next = findContextOutbound();         next.invoker().invokeRead(next);         return this;     } 

查找outbound的过程:

     private AbstractChannelHandlerContext findContextOutbound() {         AbstractChannelHandlerContext ctx = this;         do {             ctx = ctx.prev;         } while (!ctx.outbound);         return ctx;     } 

触发读操作:

     @Override     public void invokeRead(final ChannelHandlerContext ctx) {         if (executor.inEventLoop()) {             invokeReadNow(ctx);         } else {             AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;             Runnable task = dctx.invokeReadTask;             if (task == null) {                 dctx.invokeReadTask = task = new Runnable() {                     @Override                     public void run() {                         invokeReadNow(ctx);                     }                 };             }             executor.execute(task);         }     } 

2.3 ChannelOutboundHandler.write()

以实现类LoggingHandler为例:

     @Override     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {         if (logger.isEnabled(internalLevel)) {             logger.log(internalLevel, format(ctx, "WRITE", msg));         }         ctx.write(msg, promise);     } 

具体实现:

     @Override     public ChannelFuture write(Object msg, ChannelPromise promise) {         AbstractChannelHandlerContext next = findContextOutbound();         next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise);         return promise;     } 

写操作的触发

     @Override     public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {         if (msg == null) {             throw new NullPointerException("msg");         }         if (!validatePromise(ctx, promise, true)) {             // promise cancelled             ReferenceCountUtil.release(msg);             return;         }          if (executor.inEventLoop()) {             invokeWriteNow(ctx, msg, promise);         } else {             safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);         }     } 

立刻触发

     public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {         try {             ((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);         } catch (Throwable t) {             notifyOutboundHandlerException(t, promise);         }     } 

小结:

Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:

从netty-example分析Netty组件续

参考文献

【1】http://blog.csdn.net/u013252773/article/details/21195593

【2】http://stackoverflow.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler

正文到此结束
Loading...