在说JDK的异步Future之前,先简单介绍一下JDK自带的Future机制.
首先先上一段代码
public class JDKFuture { static ExecutorService executors = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(16)); public static void main(String[] args) throws Exception{ int cnt = 1; Future[] jdkFuture=new Future[cnt]; Object jdkFutureResult; for(int i = 0;i < cnt; i++){ jdkFuture[i] = executors.submit(new JDKCallable(i)); } System.out.println(String.format("%s 在 %s 即将获取任务执行结果", Thread.currentThread(), new Date())); jdkFutureResult = jdkFuture[0].get(); System.out.println(String.format("%s 在 %s 任务结果获取完毕 %s", Thread.currentThread(), new Date(), jdkFutureResult)); executors.shutdown(); } static class JDKCallable implements Callable{ int index; JDKCallable(int ind){ this.index = ind; } public Object call() throws Exception { try { System.out.println(String.format("线程 [%s] 提交任务[%s]", Thread.currentThread(), this.index)); // 耗时2秒,模拟耗时操作 Thread.sleep(2000); System.out.println(String.format("线程 [%s] 执行任务[%s]执行完毕", Thread.currentThread(), this.index)); }catch(InterruptedException e){ e.printStackTrace(); } return String.format("任务%s执行结果",this.index); } } } 复制代码
输出结果为:
线程 [Thread[pool-1-thread-1,5,main]] 提交任务[0] Thread[main,5,main] 在 Mon Dec 16 16:40:38 CST 2019 即将获取任务执行结果 线程 [Thread[pool-1-thread-1,5,main]] 执行任务[0]执行完毕 Thread[main,5,main] 在 Mon Dec 16 16:40:40 CST 2019 任务结果获取完毕 任务0执行结果 复制代码
可以看到主线程在使用 future.get()
的时候,因为子线程还未处理完返回结果而导致主线程活生生的等了2秒钟(耗时操作),这也是JDK自带的Future机制不够完善的地方.因为jdk自身的future机制不够完善,所以Netty自实现了一套Future机制.
Netty的Future是异步的,那他是怎么实现的呢?接下来就从源码开始探究.
先看一下 Netty 的 Future
和 Promise
这两个接口
/** * The result of an asynchronous operation * 异步操作的结果 * 对状态的判断、添加listener、获取结果 */ public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); boolean isCancellable(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); Future<V> sync() throws InterruptedException; Future<V> syncUninterruptibly(); Future<V> await() throws InterruptedException; Future<V> awaitUninterruptibly(); boolean await(long timeout, TimeUnit unit) throws InterruptedException; boolean await(long timeoutMillis) throws InterruptedException; boolean awaitUninterruptibly(long timeout, TimeUnit unit); boolean awaitUninterruptibly(long timeoutMillis); V getNow(); @Override boolean cancel(boolean mayInterruptIfRunning); } 复制代码
Promise是一个特殊的Future,它可写,可写意味着可以修改里面的结果.
/** * Special {@link Future} which is writable. * 一个可写的特殊的Future * 继承 Future, 继承的方法就不列出 */ public interface Promise<V> extends Future<V> { /** * Marks this future as a success and notifies all * listeners. * If it is success or failed already it will throw an {@link IllegalStateException}. * 将这个 future 标记为 success 并且通知所有的 listeners * 如果已经成功或者失败将会抛出异常 */ Promise<V> setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. * 尝试设置结果,成功返回true, 失败 false, 上面的方法设置失败会抛出异常 */ boolean trySuccess(V result); // 这2个跟上面的差不多 Promise<V> setFailure(Throwable cause); boolean tryFailure(Throwable cause); /** * Make this future impossible to cancel. * * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done * without being cancelled. {@code false} if this future has been cancelled already. */ boolean setUncancellable(); } 复制代码
看到这里都同学都默认是用netty写过程序的~,还没写过的话可以看看官方文档或者我的另一篇Netty使用.
接下来就开始源码的解读.
总所周知!
,我们使用Netty开发的时候,写出数据用的是 writeAndFlush(msg)
, 至于 write(msg)
嘛, 不就是少了个 flush (没错,是我比较懒).
在大家知道 channel().write
和 ctx.write
的区别后, 我们就从 channel().write
开始讲起.
不行,我感觉还是要说一下一些补充的,不然心里不舒服.
Netty中有一个 pipeline
,也就是事件调用链,开发的时候在调用链里面加入自己处理事件的handle,但是在这条 pipeline 中, Netty给我们加上了 Head
和 tail
这两个handle,方便Netty框架处理事件.
先看 DefaultChannelPipeline 的初始化,在初始化代码里给我们添加了2个handle, head 和 tail, 这2个东西很有用,为什么这么说呢?详情看后面解答
protected DefaultChannelPipeline(Channel channel) { this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel"); this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null); this.voidPromise = new VoidChannelPromise(channel, true); // ChannelInboundHandler this.tail = new DefaultChannelPipeline.TailContext(this); // ChannelInboundHandler && ChannelOutboundHandler this.head = new DefaultChannelPipeline.HeadContext(this); this.head.next = this.tail; this.tail.prev = this.head; } 复制代码
没错,还是从 channel().write(msg)
开始说起(为什么我要用还是).
跟踪代码 channel().write(), 首先会调用到 DefaultChannelPipeline的 writeAndFlush 方法.
###1.DefaultChannelPipeline#writeAndFlush
public final ChannelFuture writeAndFlush(Object msg) { return this.tail.writeAndFlush(msg); } 复制代码
this.tail 就是上面构造函数里面初始化的 tailHandle
, 而 write
是出栈事件, 会从 tailHandle 开始往前传递,最后传递到 headHandle
(怎么感觉好像提前剧透了).
public ChannelFuture writeAndFlush(Object msg) { // 这里new了一个 promise, 然后这个promise将会一直传递,一直传递..... return this.writeAndFlush(msg, this.newPromise()); } 复制代码
接下来来到了 AbstractChannelHandlerContext
的 writeAndFlush.
/** * 执行 write and flush 操作 * @param msg * @param promise */ private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { // 这个方法在 ChannelHandler#handlerAdded 调用后,才会返回 true if (invokeHandler()) { // write 继续传递 invokeWrite0(msg, promise); // flush data invokeFlush0(); } else { writeAndFlush(msg, promise); } } private void write(Object msg, boolean flush, ChannelPromise promise) { // 查找下一个 OutboundHandle, 因为是要输出 AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); // 下一个 OutboundHandle 所在的线程 EventExecutor executor = next.executor(); // 如果在是同一个线程(由于Netty的channel在一个ThreadPool中只绑定一个Thread, 不同线程的话也意味着是不同线程池) if (executor.inEventLoop()) { // 在同一个线程池(这里意味着同一个线程)中, if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { // 在不同线程池(不同线程池那自然就是不同线程),需要创建一个任务,提交到下一个线程池 final AbstractWriteTask task; if (flush) { // 提交给下一个线程池 && flush task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } // 因为是 write 事件, so 接下来提交任务到下一个 OutboundHandle(出栈) 所在的线程, 由它执行 if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. // 任务提交失败,取消任务 task.cancel(); } } } 复制代码
接下来本篇文章最重要的地方了, **HeadContext **!
HeadContext的write和flush方法 实际上都是调用 unsafe的方法实现.
// 如果是 writeAndFlush ,调用 write后会调用flush @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 这个调用 AbstrachUnsafe.write unsafe.write(msg, promise); } // 这是 unsafe 的 write 方法 @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // outboundBuffer = null 表明 channel已经关闭并且需要将 future 结果设置为 false if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // 将 msg添加进 buffer 中 outboundBuffer.addMessage(msg, size, promise); } 复制代码
/** * write 之后再调用这个 flush */ @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // buffer 标记为可以被 flush outboundBuffer.addFlush(); // 接下来就是真正的 flush flush0(); } 复制代码
ChannelOutboundBuffer 简单来说就是 存储当前channel写出的数据
, 并且在调用flush的时候将他们都写出去.
跟着源码一直走,在flush0之后,最终会调用到 AbstractNioMessageChannel#doWrite
方法.(上面还有doRead方法,是接收数据的时候调用的)
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { // Object msg = in.current(); if (msg == null) { // Wrote all messages. // 判断写事件 if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { // 循环写出数据 boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { // 真正的写出数据 // 最终会调用 javaChannel().send(nioData, mi); // 很眼熟吧,这个是java nio的方法,注册的时候也是javaChannel().register() if (doWriteMessage(msg, in)) { done = true; break; } } // 成功写出,从 buffer 中移除刚才写出的数据 if (done) { in.remove(); } else { // Did not write all messages. // 写出失败 if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (Exception e) { // 出错后是否继续写出后面的数据 if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } } 复制代码
到上面位置,数据是写出去了,那promise的相关作用呢?没看出来啊?
说实话,这个藏得挺深,居然! 放在了 buffer.remove() 里
!
public boolean remove() { // 刚写出去数据的Entry Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; // 这个就是writeAndFlush 的时候 new 的 DefaultPromise() ChannelPromise promise = e.promise; int size = e.pendingSize; // buffer 中移除 removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. ReferenceCountUtil.safeRelease(msg); // !!! 划重点 !!! // 这里设置了 promise 的结果, 调用了 trySuccess, 通知所有 listener // !!! 划重点 !!! safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry // 重置Entry的信息,方便重用. // 跟 Entry entry = Entry.newInstance(msg, size, total(msg), promise); 相对应, newInstance 是获取一个缓存的 Entry e.recycle(); return true; } 复制代码
promise 通知所有 listener 是在写数据成功,并且在 buffer.remove()
调用的时候在里面 safeSuccess(promise)
, 最终调用 Promise 的 trySuccess()
从而触发 notifyListeners()
通知所有 listeners.
这个是在 Promise#trySuccess的时候调用的,通知所有listeners操作已经完成.
/** * 通知监听者,任务已经完成 */ private void notifyListeners() { // 获取future所属线程(池) EventExecutor executor = executor(); // 执行通知是当前线程 则直接回调信息 // currentThread == this.executor if (executor.inEventLoop()) { // 获取 ThreadLocal 变量 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); // listen 的层级数 final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { // 通知所有的 listener notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 如果 executor 不是当前线程, 则交给 future 所属 executor 去执行 // 意思是添加通知的 executor 可能是前面的 executor , 然后到后面的 executor 也就是当前线程才执行通知 // 此时将通知交回给之前的 executor // 执行通知的不是当前线程, 封装成一个任务, 由之前提交的线程完成通知(回调) // 到这里就是 Netty Future 异步的原因, 任务在其它线程执行完毕后, 封装成任务交还给 创建 Future的线程,由该线程完成回调 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); } 复制代码
最后,觉得文章有帮助的同学不要忘了点个赞(我就是来骗赞的),你们的每个赞对我来说都非常重要,非常感谢你们能看到这里!!!