永顺大牛写的系列教程 《源码之下无秘密 ── 做最好的 Netty 源码分析教程》 是目前我读过最好的netty源码分析文章。但不知道什么原因,作者在写到第三章的时候停更了。因此,我想尝试凭着个人的理解,续写后边几个章节。
永顺前辈已经写完章节有如下:
续写章节:
本文使用的netty版本为4.1.33
是前者的一个特殊实现。
Java并发编程包下提供了Future
// 尝试取消执行 boolean cancel(boolean mayInterruptIfRunning); // 是否已经被取消执行 boolean isCancelled(); // 是否已经执行完毕 boolean isDone(); // 阻塞获取执行结果 V get() throws InterruptedException, ExecutionException; // 阻塞获取执行结果或超时后返回 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
原生的Future
增加了更加丰富的状态判断方法
// 判断是否执行成功 boolean isSuccess(); // 判断是否可以取消执行 boolean isCancellable();
支持获取导致I/O操作异常
Throwable cause();
增加了监听回调有关方法,支持future完成后执行用户指定的回调方法
// 增加回调方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); // 增加多个回调方法 Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 删除回调方法 Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); // 删除多个回调方法 Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
增加了更丰富的阻塞等待结果返回的两类方法。其中一类是sync方法,阻塞等待结果且如果执行失败后向外抛出导致失败的异常;另外一类是await方法,仅阻塞等待结果返回,不向外抛出异常。
// 阻塞等待,且如果失败抛出异常 Future<V> sync() throws InterruptedException; // 同上,区别是不可中断阻塞等待过程 Future<V> syncUninterruptibly(); // 阻塞等待 Future<V> await() throws InterruptedException; // 同上,区别是不可中断阻塞等待过程 Future<V> awaitUninterruptibly();
Promise
// 设置成功状态并回调 Promise<V> setSuccess(V result); boolean trySuccess(V result); // 设置失败状态并回调 Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 设置为不可取消状态 boolean setUncancellable();
可见,Promise
以客户端连接的注册过程为例,调用链路如下:
io.netty.bootstrap.Bootstrap.connect() --> io.netty.bootstrap.Bootstrap.doResolveAndConnect() ---->io.netty.bootstrap.AbstractBootstrap.initAndRegister() ------>io.netty.channel.MultithreadEventLoopGroup.register() -------->io.netty.channel.SingleThreadEventLoop.register()
一直跟踪到SingleThreadEventLoop中,会看到这段代码:
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
此处新建了一个DefaultChannelPromise,构造函数传入了当前的channel以及当前所在的线程this。从第一节的类图我们知道,DefaultChannelPromise同时实现了Future和Promise,具有上述提到的所有方法。
然后继续将该promise传递进另外一个register方法中:
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
在该register方法中,继续将promise传递到Unsafe的register方法中,而立即返回了以ChannelFuture的形式返回了该promise。显然这里是一个异步回调处理:上层的业务可以拿到返回的ChannelFuture阻塞等待结果或者设置回调方法,而继续往下传的Promise可以用于设置执行状态并且回调设置的方法。
我们继续往下debug可以看到:
// io.netty.channel.AbstractChannel.AbstractUnsafe.java @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { // 如果已经注册过,则置为失败 promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { // 如果线程类型不兼容,则置为失败 promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); // 出现异常情况置promise为失败 safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // 注册之前,先将promise置为不可取消转态 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); // promise置为成功 safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); // 出现异常情况置promise为失败 safeSetFailure(promise, t); } }
可见,底层的I/O操作成功与否都可以通过Promise设置状态,并使得外层的ChannelFuture可以感知得到I/O操作的结果。
我们再来看看被返回的ChannelFuture的用途:
// io.netty.bootstrap.AbstractBootstrap.java final ChannelFuture initAndRegister() { //... ChannelFuture regFuture = config().group().register(channel); // 如果异常不为null,则意味着底层的I/O已经失败,并且promise设置了失败异常 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
这里通过检查失败异常栈是否为空,可以提前检查到I/O是否失败。继续回溯,还可以看到:
// io.netty.bootstrap.AbstractBootstrap.java private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // 如果注册已经成功 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 如果注册尚未完成 // ... } }
此处,通过ChannelFuture#isDone()方法可以知道底层的注册是否完成,如果完成,则继续进行bind操作。
但是因为注册是个异步操作,如果此时注册可能还没完成,那就会进入如下逻辑:
// io.netty.bootstrap.AbstractBootstrap.java //... else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; }
这里新建了一个新的PendingRegistrationPromise,并为原来的ChannelFuture对象添加了一个回调方法,并在回调中更改PendingRegistrationPromise的状态,而且PendingRegistrationPromise会继续被传递到上层。当底层的Promise状态被设置并且回调,就会进入该回调方法。从而将I/O状态继续向外传递。
我们已经了解清楚了Promise和Future的异步模型。再来看看底层是如何实现的。以最常用的DefaultChannelPromise为例,内部非常简单,我们主要看它的父类DefaultPromise:
// result字段的原子更新器 @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); // 缓存执行结果的字段 private volatile Object result; // promise所在的线程 private final EventExecutor executor; // 一个或者多个回调方法 private Object listeners; // 阻塞线程数量计数器 private short waiters;
以设置成功状态为例(setSuccess):
@Override public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { // 调用回调方法 notifyListeners(); return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { // 原子修改result字段为objResult if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); return true; } return false; } private synchronized void checkNotifyWaiters() { if (waiters > 0) { // 如果有其他线程在等待该promise的结果,则唤醒他们 notifyAll(); } }
设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。
其他设置状态方法不再赘言,基本上大同小异。
上文提到其他线程会阻塞等待该promise返回结果,具体实现以sync方法为例:
@Override public Promise<V> sync() throws InterruptedException { // 阻塞等待 await(); // 如果有异常则抛出 rethrowIfFailed(); return this; } @Override public Promise<V> await() throws InterruptedException { if (isDone()) { // 如果已经完成,直接返回 return this; } // 可以被中断 if (Thread.interrupted()) { throw new InterruptedException(toString()); } //检查死循环 checkDeadLock(); synchronized (this) { while (!isDone()) { // 递增计数器(用于记录有多少个线程在等待该promise返回结果) incWaiters(); try { // 阻塞等待结果 wait(); } finally { // 递减计数器 decWaiters(); } } } return this; }
所有调用sync方法的线程,都会被阻塞,直到promise被设置为成功或者失败。这也解释了为何Netty客户端或者服务端启动的时候一般都会调用sync方法,本质上都是阻塞当前线程而异步地等待I/O结果返回,如下:
Bootstrap bootstrap = new Bootstrap(); ChannelFuture future = bootstrap.group(new NioEventLoopGroup(10)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // 管道中添加基于换行符分割字符串的解析器 ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); // 管道中添加字符串编码解码器 ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); // 管道中添加服务端处理逻辑 ch.pipeline().addLast(new MyClientEchoHandler()); } }).connect("127.0.0.1", 9898).sync(); future.channel().closeFuture().sync();
@Override public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { // 添加回调方法 addListener0(listener); } if (isDone()) { // 如果I/O操作已经结束,直接触发回调 notifyListeners(); } return this; } private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { // 只有一个回调方法直接赋值 listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { // 将回调方法添加到DefaultFutureListeners内部维护的listeners数组中 ((DefaultFutureListeners) listeners).add(listener); } else { // 如果有多个回调方法,新建一个DefaultFutureListeners以保存更多的回调方法 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } }
从上边可以看到,添加回调方法完成之后,会立即检查promise是否已经完成;如果promise已经完成,则马上调用回调方法。
Netty的Promise和Future机制是基于Java并发包下的Future
因此,就像永顺大牛标题所言,在Netty的异步模型里,Promise和Future就像是双子星一般紧密相连。但我觉得这两者更像是量子纠缠里的两个电子,因为改变其中一个方的状态,另外一方能够马上感知。
至此,Promise和Future的核心原理已经分析完毕。