ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。
如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerContext的方法和Channel方法的区别是ChannelHandlerContext的事件会传递给下一个ChannelHandler来处理,而Channel发出的事件会从头ChannelHandler(head或tail)开始处理。ChannelPipeline类似Servlet中的Filter,或其他的Interceptor模式。
Inbound事件传播方法:
Outbound事件传播方法包括
ChannelPipeline上的ChannelHandler通常分为以下几类
ChannelPipeline最常用的方法就是在pipeline最后添加ChannelHandler了
ChannelPipeline addLast(ChannelHandler... handlers);
除此之外,pipeline是线程安全的,还能动态地添加删除ChannelHandler。
另外pipeline也包括了firestChannelxxx方法
@Override ChannelPipelinefireChannelRegistered(); @Override ChannelPipelinefireChannelUnregistered(); @Override ChannelPipelinefireChannelActive(); @Override ChannelPipelinefireChannelInactive(); @Override ChannelPipelinefireExceptionCaught(Throwable cause); @Override ChannelPipelinefireUserEventTriggered(Object event); @Override ChannelPipelinefireChannelRead(Object msg); @Override ChannelPipelinefireChannelReadComplete(); @Override ChannelPipelinefireChannelWritabilityChanged(); @Override ChannelPipelineflush();
很自然的我们可以想到使用双向链表来实现pipeline。
DefaultChannelPipeline中包含了两个特殊的ChannelHandler, head和tail, 实现类分别是HeadContext和TailContext,分别作为队列的头和尾。
两个节点在ChannelPipeline创建的时候被设置。
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; protected DefaultChannelPipeline(Channel channel){ this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
HeadContext主要负责把Outbound相关事件交给AbstractChannel.Unsafe来处理,如bind、write等。
final class HeadContextextends AbstractChannelHandlerContext implements ChannelOutboundHandler,ChannelInboundHandler{ private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler(){ return this; } @Override public void handlerAdded(ChannelHandlerContext ctx)throws Exception { // NOOP } @Override public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { // NOOP } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)throws Exception { unsafe.connect(remoteAddress, localAddress, promise); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception { unsafe.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception { unsafe.close(promise); } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception { unsafe.deregister(promise); } @Override public void read(ChannelHandlerContext ctx){ unsafe.beginRead(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Excep unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx)throws Exception { unsafe.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { ctx.fireExceptionCaught(cause); } @Override public void channelRegistered(ChannelHandlerContext ctx)throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); } @Override public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) { destroy(); } } @Override public void channelActive(ChannelHandlerContext ctx)throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } @Override public void channelInactive(ChannelHandlerContext ctx)throws Exception { ctx.fireChannelInactive(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception { ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx)throws Exception { ctx.fireChannelReadComplete(); readIfIsAutoRead(); } private void readIfIsAutoRead(){ if (channel.config().isAutoRead()) { channel.read(); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception { ctx.fireUserEventTriggered(evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception { ctx.fireChannelWritabilityChanged(); } }
TailContext的作用主要是最终给一些消息ReferenceCount减一、打印前面没有捕获的异常等。
final class TailContextextends AbstractChannelHandlerContextimplements ChannelInboundHandler{ TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override public ChannelHandler handler(){ return this; } @Override public void channelRegistered(ChannelHandlerContext ctx)throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx)throws Exception { onUnhandledInboundChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx)throws Exception { onUnhandledInboundChannelInactive(); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception { onUnhandledChannelWritabilityChanged(); } @Override public void handlerAdded(ChannelHandlerContext ctx)throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception { onUnhandledInboundUserEventTriggered(evt); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception { onUnhandledInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx)throws Exception { onUnhandledInboundChannelReadComplete(); } }
addLast将一个ChannelHandler放在pipeline最后,内部实现是在tail之前。有可选参数executor和name。
executor表示执行ChannelHandler处理的线程池,如果没有设置或传入null则使用对应channel的eventLoop。
通常如果有耗时很大的处理,则会自定义一个线程池来执行,避免阻塞eventLoop导致不能及时处理IO事件。
name可以给这个ChannelPipeline上的Handler定义一个名字,方便之后replace等操作,如果没有传入则会自动生成一个。
addLast首先给this加锁,来保证线程安全,因为其中的队列指针操作有很多步骤。
这里还有一个细节,就是Channel在执行addLast的时候可能还没有完成register,如果此时回调handlerAdded则会
导致顺序问题先发生了added再register。所以这里判断是否已经注册,如果没有则先放到一个队列中,等注册完成后再执行。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this;
创建ChannelHandlerContext
private AbstractChannelHandlerContextnewContext(EventExecutorGroup group, String name, ChannelHandlerhandler){ return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); } final class DefaultChannelHandlerContextextends AbstractChannelHandlerContext{ private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandlerhandler(){ return handler; } ... }
链表操作,放到队尾
private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
设置ChannelHandlerContext已经添加、回调ChannelHandler的handlerAdded
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx){ try { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); } catch (Throwable t) { ... }
ChannelPipeline是保存Channel上的ChannelHandler的组件,内部是双向链表结构,我们看到节点保存的是
ChannelHandlerContext,而ChannelHandlerContext又是通过ChannelPipeline和ChannelHandler构造出来的。