转载

Netty 源码分析之 四 Promise 与 Future: 双子星的秘密

永顺大牛写的系列教程 《源码之下无秘密 ── 做最好的 Netty 源码分析教程》 是目前我读过最好的netty源码分析文章。但不知道什么原因,作者在写到第三章的时候停更了。因此,我想尝试凭着个人的理解,续写后边几个章节。

写在最前

永顺前辈已经写完章节有如下:

  • Netty 源码分析之 番外篇 Java NIO 的前生今世
  • Netty 源码分析之 零 磨刀不误砍柴工 源码分析环境搭建
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (客户端)
  • Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)
  • Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (一)
  • Netty 源码分析之 二 贯穿 Netty 的大动脉 ── ChannelPipeline (二)
  • Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)
  • Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(二)

续写章节:

  • Netty 源码分析之 四 Promise 与 Future: 双子星的秘密
  • Netty 源码分析之 五 奔腾的血液: ByteBuf
  • Netty 源码分析之 六 流水线处理器: Handler

本文使用的netty版本为4.1.33

Future 和Promise 的关系

Netty内部的io.netty.util.concurrent.Future 继承自java.util.concurrent.Future ,而Promise

是前者的一个特殊实现。

Netty 源码分析之 四 Promise 与 Future: 双子星的秘密

Java原生Future

Java并发编程包下提供了Future 接口。Future在异步编程中表示该异步操作的结果,通过Future 的内部方法可以实现状态检查、取消执行、获取执行结果等操作。内部的方法如下:

// 尝试取消执行
boolean cancel(boolean mayInterruptIfRunning);
// 是否已经被取消执行
boolean isCancelled();
// 是否已经执行完毕
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果或超时后返回
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

Netty对Future 的扩展

原生的Future 功能比较有限,Netty扩展了Future 并增加了以下方法:

  • 增加了更加丰富的状态判断方法

    // 判断是否执行成功
    boolean isSuccess();
    // 判断是否可以取消执行
    boolean isCancellable();
    
  • 支持获取导致I/O操作异常

    Throwable cause();
    
  • 增加了监听回调有关方法,支持future完成后执行用户指定的回调方法

    // 增加回调方法
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 增加多个回调方法
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 删除回调方法
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    // 删除多个回调方法
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    
  • 增加了更丰富的阻塞等待结果返回的两类方法。其中一类是sync方法,阻塞等待结果且如果执行失败后向外抛出导致失败的异常;另外一类是await方法,仅阻塞等待结果返回,不向外抛出异常。

    // 阻塞等待,且如果失败抛出异常
    Future<V> sync() throws InterruptedException;
    // 同上,区别是不可中断阻塞等待过程
    Future<V> syncUninterruptibly();
    
    // 阻塞等待
    Future<V> await() throws InterruptedException;
    // 同上,区别是不可中断阻塞等待过程
    Future<V> awaitUninterruptibly();
    

Promise

Promise 接口继续继承了Future ,并增加若干个设置状态并回调的方法:

// 设置成功状态并回调
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
// 设置失败状态并回调
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();

可见,Promise 作为一个特殊的Future ,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。

通过Promise设置I/O执行结果

以客户端连接的注册过程为例,调用链路如下:

io.netty.bootstrap.Bootstrap.connect()
--> io.netty.bootstrap.Bootstrap.doResolveAndConnect()
---->io.netty.bootstrap.AbstractBootstrap.initAndRegister()
------>io.netty.channel.MultithreadEventLoopGroup.register()
-------->io.netty.channel.SingleThreadEventLoop.register()

一直跟踪到SingleThreadEventLoop中,会看到这段代码:

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

此处新建了一个DefaultChannelPromise,构造函数传入了当前的channel以及当前所在的线程this。从第一节的类图我们知道,DefaultChannelPromise同时实现了Future和Promise,具有上述提到的所有方法。

然后继续将该promise传递进另外一个register方法中:

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

在该register方法中,继续将promise传递到Unsafe的register方法中,而立即返回了以ChannelFuture的形式返回了该promise。显然这里是一个异步回调处理:上层的业务可以拿到返回的ChannelFuture阻塞等待结果或者设置回调方法,而继续往下传的Promise可以用于设置执行状态并且回调设置的方法。

我们继续往下debug可以看到:

// io.netty.channel.AbstractChannel.AbstractUnsafe.java
@Override
       public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           if (eventLoop == null) {
               throw new NullPointerException("eventLoop");
           }
           if (isRegistered()) {
               // 如果已经注册过,则置为失败
               promise.setFailure(new IllegalStateException("registered to an event loop already"));
               return;
           }
           if (!isCompatible(eventLoop)) {
               // 如果线程类型不兼容,则置为失败
               promise.setFailure(
                       new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
               return;
           }

           AbstractChannel.this.eventLoop = eventLoop;

           if (eventLoop.inEventLoop()) {
               register0(promise);
           } else {
               try {
                   eventLoop.execute(new Runnable() {
                       @Override
                       public void run() {
                           register0(promise);
                       }
                   });
               } catch (Throwable t) {
                   logger.warn(
                           "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                           AbstractChannel.this, t);
                   closeForcibly();
                   closeFuture.setClosed();
                   // 出现异常情况置promise为失败
                   safeSetFailure(promise, t);
               }
           }
       }

       private void register0(ChannelPromise promise) {
           try {
               // 注册之前,先将promise置为不可取消转态
               if (!promise.setUncancellable() || !ensureOpen(promise)) {
                   return;
               }
               boolean firstRegistration = neverRegistered;
               doRegister();
               neverRegistered = false;
               registered = true;

               pipeline.invokeHandlerAddedIfNeeded();
               // promise置为成功
               safeSetSuccess(promise);
               pipeline.fireChannelRegistered();
            
               if (isActive()) {
                   if (firstRegistration) {
                       pipeline.fireChannelActive();
                   } else if (config().isAutoRead()) {
                       beginRead();
                   }
               }
           } catch (Throwable t) {
               // Close the channel directly to avoid FD leak.
               closeForcibly();
               closeFuture.setClosed();
               // 出现异常情况置promise为失败
               safeSetFailure(promise, t);
           }
       }

可见,底层的I/O操作成功与否都可以通过Promise设置状态,并使得外层的ChannelFuture可以感知得到I/O操作的结果。

通过ChannelFuture获取I/O执行结果

我们再来看看被返回的ChannelFuture的用途:

// io.netty.bootstrap.AbstractBootstrap.java

    final ChannelFuture initAndRegister() {
        //...
        ChannelFuture regFuture = config().group().register(channel);
        // 如果异常不为null,则意味着底层的I/O已经失败,并且promise设置了失败异常
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

这里通过检查失败异常栈是否为空,可以提前检查到I/O是否失败。继续回溯,还可以看到:

// io.netty.bootstrap.AbstractBootstrap.java

 private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 如果注册已经成功
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
       // 如果注册尚未完成
       // ...
    }
}

此处,通过ChannelFuture#isDone()方法可以知道底层的注册是否完成,如果完成,则继续进行bind操作。

但是因为注册是个异步操作,如果此时注册可能还没完成,那就会进入如下逻辑:

// io.netty.bootstrap.AbstractBootstrap.java

//...
else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

这里新建了一个新的PendingRegistrationPromise,并为原来的ChannelFuture对象添加了一个回调方法,并在回调中更改PendingRegistrationPromise的状态,而且PendingRegistrationPromise会继续被传递到上层。当底层的Promise状态被设置并且回调,就会进入该回调方法。从而将I/O状态继续向外传递。

DefaultChannelPromise的结果传递实现原理

我们已经了解清楚了Promise和Future的异步模型。再来看看底层是如何实现的。以最常用的DefaultChannelPromise为例,内部非常简单,我们主要看它的父类DefaultPromise:

// result字段的原子更新器
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
        AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// promise所在的线程
private final EventExecutor executor;
// 一个或者多个回调方法
private Object listeners;
// 阻塞线程数量计数器  
private short waiters;

设置状态

以设置成功状态为例(setSuccess):

@Override
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        // 调用回调方法
        notifyListeners();
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
    // 原子修改result字段为objResult
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters();
        return true;
    }
    return false;
}

private synchronized void checkNotifyWaiters() {
    if (waiters > 0) {
        // 如果有其他线程在等待该promise的结果,则唤醒他们
        notifyAll();
    }
}

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

其他设置状态方法不再赘言,基本上大同小异。

阻塞线程以等待执行结果

上文提到其他线程会阻塞等待该promise返回结果,具体实现以sync方法为例:

@Override
public Promise<V> sync() throws InterruptedException {
    // 阻塞等待
    await();
    // 如果有异常则抛出
    rethrowIfFailed();
    return this;
}

@Override
public Promise<V> await() throws InterruptedException {
    if (isDone()) {
        // 如果已经完成,直接返回
        return this;
    }
    // 可以被中断
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    //检查死循环
    checkDeadLock();

    synchronized (this) {
        while (!isDone()) {
            // 递增计数器(用于记录有多少个线程在等待该promise返回结果)
            incWaiters();
            try {
                // 阻塞等待结果
                wait();
            } finally {
                // 递减计数器
                decWaiters();
            }
        }
    }
    return this;
}

所有调用sync方法的线程,都会被阻塞,直到promise被设置为成功或者失败。这也解释了为何Netty客户端或者服务端启动的时候一般都会调用sync方法,本质上都是阻塞当前线程而异步地等待I/O结果返回,如下:

Bootstrap bootstrap = new Bootstrap();
ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10))
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 管道中添加基于换行符分割字符串的解析器
                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                // 管道中添加字符串编码解码器
                ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                // 管道中添加服务端处理逻辑
                ch.pipeline().addLast(new MyClientEchoHandler());
            }
        }).connect("127.0.0.1", 9898).sync();

future.channel().closeFuture().sync();

回调机制

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");

    synchronized (this) {
        // 添加回调方法
        addListener0(listener);
    }

    if (isDone()) {
        // 如果I/O操作已经结束,直接触发回调
        notifyListeners();
    }

    return this;
}

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        // 只有一个回调方法直接赋值
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        // 将回调方法添加到DefaultFutureListeners内部维护的listeners数组中
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        // 如果有多个回调方法,新建一个DefaultFutureListeners以保存更多的回调方法
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}

从上边可以看到,添加回调方法完成之后,会立即检查promise是否已经完成;如果promise已经完成,则马上调用回调方法。

总结

Netty的Promise和Future机制是基于Java并发包下的Future 开发的。其中Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

因此,就像永顺大牛标题所言,在Netty的异步模型里,Promise和Future就像是双子星一般紧密相连。但我觉得这两者更像是量子纠缠里的两个电子,因为改变其中一个方的状态,另外一方能够马上感知。

至此,Promise和Future的核心原理已经分析完毕。

原文  https://blog.duval.top/2020/07/04/Netty-源码分析之-四-Promise-与-Future-双子星的秘密/
正文到此结束
Loading...