转载

Netty源码分析3-ChannelPipeline

Netty源码分析3-ChannelPipeline

ChannelPipeline是Channel的负责组织ChannelHandler的组件,如上图所示,想象远端为上方,最上面为head,近端为我们的程序,最下面为tail。一个inbound事件,通常为读到的消息、用户自定义事件等会从上而下经过各个ChannelInboundHandler。而Outbound事件通常为write消息等,会经过ChannelOutboundHandler处理。

如果要在程序中发起一个事件,可以通过ChannelHandlerContext,ChannelHandlerContext的方法和Channel方法的区别是ChannelHandlerContext的事件会传递给下一个ChannelHandler来处理,而Channel发出的事件会从头ChannelHandler(head或tail)开始处理。ChannelPipeline类似Servlet中的Filter,或其他的Interceptor模式。

Inbound事件传播方法:

  • ChannelHandlerContext#fireChannelRegistered()
  • ChannelHandlerContext#fireChannelActive()
  • ChannelHandlerContext#fireChannelRead(Object)
  • ChannelHandlerContext#fireChannelReadComplete()
  • ChannelHandlerContext#fireExceptionCaught(Throwable)
  • ChannelHandlerContext#fireChannelUserEventTriggered(Object)
  • ChannelHandlerContext#fireChannelChannelInactive()
  • ChannelHandlerContext#fireChannelChannelUnRegistered()
  • ChannelHandlerContext#fireChannelChannelWritabilityChanged()

Outbound事件传播方法包括

  • ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
  • ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
  • ChannelHandlerContext#write(Object, ChannelPromise)
  • ChannelHandlerContext#flush()
  • ChannelHandlerContext#read() //
  • ChannelHandlerContext#close(ChannelPromise)
  • ChannelHandlerContext#disconnect(ChannelPromise)
  • ChannelHandlerContext#deregister(SocketAddress, ChannelPromise)

ChannelPipeline上的ChannelHandler通常分为以下几类

  • Protocol Decoder - 将二进制数据转换为Java对象或将一种Java对象转换为另一种Java对象
  • Protocol Encoder - 将Java对象转换成二进制数据或将一种Java对象转换为另一种Java对象
  • 业务逻辑Handler - 针对不同的事件做出业务逻辑

ChannelPipeline最常用的方法就是在pipeline最后添加ChannelHandler了

ChannelPipeline addLast(ChannelHandler... handlers);

除此之外,pipeline是线程安全的,还能动态地添加删除ChannelHandler。

另外pipeline也包括了firestChannelxxx方法

@Override
    ChannelPipelinefireChannelRegistered();

     @Override
    ChannelPipelinefireChannelUnregistered();

    @Override
    ChannelPipelinefireChannelActive();

    @Override
    ChannelPipelinefireChannelInactive();

    @Override
    ChannelPipelinefireExceptionCaught(Throwable cause);

    @Override
    ChannelPipelinefireUserEventTriggered(Object event);

    @Override
    ChannelPipelinefireChannelRead(Object msg);

    @Override
    ChannelPipelinefireChannelReadComplete();

    @Override
    ChannelPipelinefireChannelWritabilityChanged();

    @Override
    ChannelPipelineflush();

DefaultChannelPipeline

ChannelPipeline实现-DefaultChannelPipeline

很自然的我们可以想到使用双向链表来实现pipeline。

DefaultChannelPipeline中包含了两个特殊的ChannelHandler, head和tail, 实现类分别是HeadContext和TailContext,分别作为队列的头和尾。

两个节点在ChannelPipeline创建的时候被设置。

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

protected DefaultChannelPipeline(Channel channel){
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

HeadContext

HeadContext主要负责把Outbound相关事件交给AbstractChannel.Unsafe来处理,如bind、write等。

final class HeadContextextends AbstractChannelHandlerContext
implements ChannelOutboundHandler,ChannelInboundHandler{
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, false, true);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
    @Override
    public ChannelHandler handler(){
        return this;
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception {
        // NOOP
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception {
        // NOOP
    }
    @Override
    public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
        unsafe.bind(localAddress, promise);
    }
    @Override
    public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise)throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.disconnect(promise);
    }
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.close(promise);
    }
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise)throws Exception {
        unsafe.deregister(promise);
    }
    @Override
    public void read(ChannelHandlerContext ctx){
        unsafe.beginRead();
    }
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)throws Excep
unsafe.write(msg, promise);
    }
    @Override
    public void flush(ChannelHandlerContext ctx)throws Exception {
        unsafe.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
        ctx.fireExceptionCaught(cause);
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)throws Exception {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelUnregistered();
        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelInactive();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        ctx.fireChannelRead(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead(){
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
}

TailContext

TailContext的作用主要是最终给一些消息ReferenceCount减一、打印前面没有捕获的异常等。

final class TailContextextends AbstractChannelHandlerContextimplements ChannelInboundHandler{
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }
    @Override
    public ChannelHandler handler(){
        return this;
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void channelActive(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelActive();
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelInactive();
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx)throws Exception {
        onUnhandledChannelWritabilityChanged();
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx)throws Exception { }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
        onUnhandledInboundUserEventTriggered(evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
        onUnhandledInboundException(cause);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        onUnhandledInboundMessage(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
        onUnhandledInboundChannelReadComplete();
    }
}

addLast实现

addLast将一个ChannelHandler放在pipeline最后,内部实现是在tail之前。有可选参数executor和name。

executor表示执行ChannelHandler处理的线程池,如果没有设置或传入null则使用对应channel的eventLoop。

通常如果有耗时很大的处理,则会自定义一个线程池来执行,避免阻塞eventLoop导致不能及时处理IO事件。

name可以给这个ChannelPipeline上的Handler定义一个名字,方便之后replace等操作,如果没有传入则会自动生成一个。

addLast首先给this加锁,来保证线程安全,因为其中的队列指针操作有很多步骤。

  1. 创建一个ChannelHandlerContext, ChannelHandlerContext可以理解为ChannelPipeline和ChannelHandler的交集或交叉点,ctx中能得到ChannelPipeline和ChannelHandler。
  2. 追加到队尾,tail之前
  3. 设置ChannelHandlerContext状态为已添加,同时触发ChannelHandler的handlerAdded事件

这里还有一个细节,就是Channel在执行addLast的时候可能还没有完成register,如果此时回调handlerAdded则会

导致顺序问题先发生了added再register。所以这里判断是否已经注册,如果没有则先放到一个队列中,等注册完成后再执行。

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;

创建ChannelHandlerContext

private AbstractChannelHandlerContextnewContext(EventExecutorGroup group, String name, ChannelHandlerhandler){
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

final class DefaultChannelHandlerContextextends AbstractChannelHandlerContext{
    private final ChannelHandler handler;
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
    @Override
    public ChannelHandlerhandler(){
        return handler;
    }
    ...
}

链表操作,放到队尾

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

设置ChannelHandlerContext已经添加、回调ChannelHandler的handlerAdded

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx){
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
        ...
}

总结

ChannelPipeline是保存Channel上的ChannelHandler的组件,内部是双向链表结构,我们看到节点保存的是

ChannelHandlerContext,而ChannelHandlerContext又是通过ChannelPipeline和ChannelHandler构造出来的。

原文  https://liuzhengyang.github.io/2018/07/14/netty-3-channelpipeline/
正文到此结束
Loading...