 
 
ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。
如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerContext的方法和Channel方法的区别是ChannelHandlerContext的事件会传递给下一个ChannelHandler来处理,而Channel发出的事件会从头ChannelHandler(head或tail)开始处理。ChannelPipeline类似Servlet中的Filter,或其他的Interceptor模式。
Inbound事件传播方法:
Outbound事件传播方法包括
ChannelPipeline上的ChannelHandler通常分为以下几类
ChannelPipeline最常用的方法就是在pipeline最后添加ChannelHandler了
ChannelPipeline addLast(ChannelHandler... handlers);
除此之外,pipeline是线程安全的,还能动态地添加删除ChannelHandler。
另外pipeline也包括了firestChannelxxx方法
@Override
    ChannelPipelinefireChannelRegistered();
     @Override
    ChannelPipelinefireChannelUnregistered();
    @Override
    ChannelPipelinefireChannelActive();
    @Override
    ChannelPipelinefireChannelInactive();
    @Override
    ChannelPipelinefireExceptionCaught(Throwable cause);
    @Override
    ChannelPipelinefireUserEventTriggered(Object event);
    @Override
    ChannelPipelinefireChannelRead(Object msg);
    @Override
    ChannelPipelinefireChannelReadComplete();
    @Override
    ChannelPipelinefireChannelWritabilityChanged();
    @Override
    ChannelPipelineflush();
 
 很自然的我们可以想到使用双向链表来实现pipeline。
DefaultChannelPipeline中包含了两个特殊的ChannelHandler, head和tail, 实现类分别是HeadContext和TailContext,分别作为队列的头和尾。
两个节点在ChannelPipeline创建的时候被设置。
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
protected DefaultChannelPipeline(Channel channel){
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}
 
 HeadContext主要负责把Outbound相关事件交给AbstractChannel.Unsafe来处理,如bind、write等。
final class HeadContextextends AbstractChannelHandlerContext
implements ChannelOutboundHandler,ChannelInboundHandler{
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
    @Override
    public ChannelHandler handler(){
        return this;
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception {
        // NOOP
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception {
        // NOOP
    }
    @Override
    public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
        unsafe.bind(localAddress, promise);
    }
    @Override
    public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.disconnect(promise);
    }
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.close(promise);
    }
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.deregister(promise);
    }
    @Override
    public void read(ChannelHandlerContext ctx){
        unsafe.beginRead();
    }
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Excep
unsafe.write(msg, promise);
    }
    @Override
    public void flush(ChannelHandlerContext ctx)throws Exception {
        unsafe.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
        ctx.fireExceptionCaught(cause);
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelUnregistered();
        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelInactive();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        ctx.fireChannelRead(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead(){
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}
 
 TailContext的作用主要是最终给一些消息ReferenceCount减一、打印前面没有捕获的异常等。
final class TailContextextends AbstractChannelHandlerContextimplements ChannelInboundHandler{
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }
    @Override
    public ChannelHandler handler(){
        return this;
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelActive();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelInactive();
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
        onUnhandledChannelWritabilityChanged();
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        onUnhandledInboundUserEventTriggered(evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
        onUnhandledInboundException(cause);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        onUnhandledInboundMessage(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelReadComplete();
    }
}
 
 addLast将一个ChannelHandler放在pipeline最后,内部实现是在tail之前。有可选参数executor和name。
executor表示执行ChannelHandler处理的线程池,如果没有设置或传入null则使用对应channel的eventLoop。
通常如果有耗时很大的处理,则会自定义一个线程池来执行,避免阻塞eventLoop导致不能及时处理IO事件。
name可以给这个ChannelPipeline上的Handler定义一个名字,方便之后replace等操作,如果没有传入则会自动生成一个。
addLast首先给this加锁,来保证线程安全,因为其中的队列指针操作有很多步骤。
这里还有一个细节,就是Channel在执行addLast的时候可能还没有完成register,如果此时回调handlerAdded则会
导致顺序问题先发生了added再register。所以这里判断是否已经注册,如果没有则先放到一个队列中,等注册完成后再执行。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
 
 创建ChannelHandlerContext
private AbstractChannelHandlerContextnewContext(EventExecutorGroup group, String name, ChannelHandlerhandler){
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
final class DefaultChannelHandlerContextextends AbstractChannelHandlerContext{
    private final ChannelHandler handler;
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    @Override
    public ChannelHandlerhandler(){
        return handler;
    }
    ...
}
 
 链表操作,放到队尾
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}
 
 设置ChannelHandlerContext已经添加、回调ChannelHandler的handlerAdded
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx){
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
        ...
}
 
 ChannelPipeline是保存Channel上的ChannelHandler的组件,内部是双向链表结构,我们看到节点保存的是
ChannelHandlerContext,而ChannelHandlerContext又是通过ChannelPipeline和ChannelHandler构造出来的。