转载

我的开源项目|动态字节码技术实现的asyncframework,已支持带返回值的方法声明为异步方法

这是 ASM+ 动态字节码技术实现的一个异步框架,只需要在你的接口上添加一个 @AsyncFunction 注解即可让这个方法异步执行!

其实这个东西我们并不陌生, asyncframework@AsyncFunction 注解与 spring 框架的 @Async 异步注解实现的功能一样。

A:既然 spring 都已经提供这样的功能,你为什么还要实现一个这样的框架呢? Q:因为我之前封装组件的时候有需要用到,但又不想为了使用这个功能就把 spring 依赖到项目中,会比较臃肿。其次,也是因为喜欢折腾,想要把自己的想法实现。

asyncframework 可以取代 spring@Async 使用,只要封装一个 starter 包,依靠 spring 提供的 BeanPostProcess 实现无缝整合。但 spring 都已经提供了,我就不想去造轮子了, asyncframework 我推荐是在非 spring 项目中使用。

版本更新

2019 年我在微信公众号发表过一篇文章,当时介绍了如果在接口的某个方法上添加一个注解就能让这个方法异步执行的思路,但当时并没有实现带返回值的方法也能异步执行的功能,而 2020 年的今天,我已经把这个功能实现了,也是绞尽脑汁才做出来的。

另外,如果你也对字节码感兴趣,我非常推荐你阅读这个框架的源码,浓缩的都是精华,十几个类包含了设计模式的使用、字节码、以及框架的设计思想,也能较好的理解 spring@Async 是怎么实现的。

在实现支持带返回值的方法异步执行这个功能时,遇到了两个大难题:

难点一:带返回值的方法如何去实现异步?

难点二:如何编写字节码实现泛型接口的代理类?

如何使用asyncframework ?

第一步:项目中添加依赖

maven:

<dependency>
  <groupId>com.github.wujiuye</groupId>
  <artifactId>asyncframework</artifactId>
  <version>1.2.0-RELEASE</version>
</dependency>
复制代码

gradle:

implementation 'com.github.wujiuye:asyncframework:1.2.0-RELEASE'
复制代码

第二步:定义接口以及编写接口的实现类

A:为什么需要定义接口? Q:因为我之前遇到的需求是使用接口,其实想不用写接口更加简单,后面想玩的时候我再加上去。

定义接口:

/**
 * @author wujiuye
 * @version 1.0 on 2019/11/24 
 */
public interface AsyncMessageSubscribe {

    /**
     * 异步无返回值
     *
     * @param queue
     */
    @AsyncFunction
    void pullMessage(String queue);

    /**
     * 异步带返回值
     *
     * @param s1
     * @param s2
     * @return
     */
    @AsyncFunction
    AsyncResult<String> doAction(String s1, String s2);

}
复制代码

编写实现类:

public class Test{

     /**
     * 接口的实现
     */
    private AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
        @Override
        public void pullMessage(String queue) {
            System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
        }

        @Override
        public AsyncResult<String> doAction(String s1, String s2) {
            System.out.println("s1==>" + s1 + ", s2==>" + s2);
            return new AsyncResult<>("hello wujiuye! current thread name:" + Thread.currentThread().getName());
        }
    };

}
复制代码

第三步:配置全局线程池

public class Test{
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
}
复制代码

第四步:使用 AsyncProxyFactory 创建代理对象

调用 AsyncProxyFactorygetInterfaceImplSupporAsync 方法创建一个代理类,需要指定异步执行使用哪个线程池,以及接口的实现类。

public class AsmProxyTest {
 
     @Test
     public void testAutoProxyAsync() throws Exception {
         ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
         AsyncMessageSubscribe proxy = AsmProxyFactory.getInterfaceImplSupporAsync(AsyncMessageSubscribe.class, impl, executorService);
         // 异步不带返回值 
         proxy.pullMessage("wujiuye");
         // 异步带返回值 
         AsyncResult<String> asyncResult = proxy.doAction("sssss", "ddd");
         System.out.println(asyncResult.get());
         System.in.read();
     }
 
 }

复制代码

A:还要创建代理类去调用,我直接 new 一个 Runnable 放线程池执行不是更方便? Q:确实如此,但如果通过包扫描自动创建代理对象,那就不一样了。 spring 就是通过 BeanPostProcess 实现的。

异步带返回值的实现原理

spring 项目中,如果想在带返回值的方法上添加 @Async 注解,就需要方法返回值类型为 AsyncResult ,我也去看了一下 spring 的源码,发现 AsyncResult 是一个 Future ,但仅仅只是依靠 Future 还是实现不了的。因为 AsyncResult 是我们在方法执行完成的时候才返回的,方法没执行之前我怎么取到这个 AsyncResult ?并且调用这个 AsyncResultget 方法获取到的结果就是最终的方法返回的结果?

我想到的方法是代理,没错,就是代理代理又代理。我也实现了一个 AsyncResult ,也是一个 Future ,只不过这个 Futureget 方法并不是阻塞的,因为不需要阻塞。

/**
 * 异步方法的返回参数
 *
 * @author wujiuye 2020/03/27
 */
public class AsyncResult<T> implements Future<T> {

    private T result;

    public AsyncResult(T result) {
        this.result = result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return true;
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return result;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return get();
    }

    /**
     * 由字节码调用
     *
     * @param future 提交到线程池执行返回的future
     * @param <T>
     * @return
     */
    public static <T> AsyncResult<T> newAsyncResultProxy(final Future<AsyncResult<T>> future) {
        return new AsyncResult<T>(null) {
            @Override
            public T get() throws InterruptedException, ExecutionException {
                AsyncResult<T> asyncResult = future.get();
                return asyncResult.get();
            }
        };
    }

}
复制代码

newAsyncResultProxy 方法才是整个异步实现的最关键一步,该方法是给字节码生成的代理对象调用的。

我们知道, ExecutorServicesubmit 方法支持提交一个 Callable 带返回值的任务,并且 submit 方法返回一个 Future ,调用这个 Futureget 方法当前线程会阻塞,直到任务执行结束。那么我们可以把 AsyncResultget 方法代理出来,当调用 AsyncResultget 方法时,实际上是去调用 ExecutorServicesubmit 方法返回的那个 Futureget 方法。对使用者屏蔽了这个阻塞获取结果的实现过程。

以前面的使用例子为例,下面的测试代码便是动态字节码要实现的,这便是框架得以支持带返回值的方法异步执行的原理。

public class Test2{

    /**
     * 接口的实现
     */
    private AsyncMessageSubscribe impl = new AsyncMessageSubscribe() {
        @Override
        public void pullMessage(String queue) {
            System.out.println(queue + ", current thread name:" + Thread.currentThread().getName());
        }

        @Override
        public AsyncResult<String> doAction(String s1, String s2) {
            System.out.println("s1==>" + s1 + ", s2==>" + s2);
            return new AsyncResult<>("hello wujiuye! current thread name:" + Thread.currentThread().getName());
        }
    };

    // 提交到线程池执行的Callable
    public static class AsyncMessageSubscribe_doActionCallable implements Callable<AsyncResult<String>> {
        private AsyncMessageSubscribe target;
        private String param1;
        private String param2;

        public AsyncMessageSubscribe_doActionCallable(AsyncMessageSubscribe var1, String var2, String var3) {
            this.target = var1;
            this.param1 = var2;
            this.param2 = var3;
        }

        public AsyncResult<String> call() throws Exception {
            return this.target.doAction(this.param1, this.param2);
        }
    }

    @Test
    public void test2() throws ExecutionException, InterruptedException {
        AsyncMessageSubscribe_doActionCallable callable = new AsyncMessageSubscribe_doActionCallable(impl, "wujiuye", "hello");
        Future result = executorService.submit(callable);
        AsyncResult<String> asyncResult = AsyncResult.newAsyncResultProxy(result);
        System.out.println(asyncResult.get());
    }

}
复制代码

编写字节码实现泛型接口需要注意的地方

test2 方法中的代码,对应字节码的实现如下,源码在 FutureFunctionHandler 这个类中。

public class FutureFunctionHandler implements AsyncFunctionHandler{
        /**
             * asyncMethod有返回值,且返回值类型为Future的处理
             *
             * @param classWriter          类改写器
             * @param interfaceClass       接口
             * @param asyncMethod          异步方法
             * @param proxyObjClass        接口的实现类
             * @param executorServiceClass 线程池的类型
             */
        @Override
        public void doOverrideAsyncFunc(ClassWriter classWriter, Class<?> interfaceClass, Method asyncMethod, Class<?> proxyObjClass, Class<? extends ExecutorService> executorServiceClass) {
              ...........
            // invoke submit callable
            methodVisitor.visitVarInsn(ALOAD, 0);
            methodVisitor.visitFieldInsn(GETFIELD, ByteCodeUtils.getProxyClassName(proxyObjClass), "executorService", Type.getDescriptor(executorServiceClass));
            methodVisitor.visitVarInsn(ALOAD, index);
            if (!executorServiceClass.isInterface()) {
                methodVisitor.visitMethodInsn(INVOKEVIRTUAL, executorServiceClass.getName().replace(".", "/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), false);
            } else {
                methodVisitor.visitMethodInsn(INVOKEINTERFACE, executorServiceClass.getName().replace(".", "/"),
                        "submit", ByteCodeUtils.getFuncDesc(Future.class, Callable.class), true);
            }
            // 将返回值存到操作数栈
            methodVisitor.visitVarInsn(ASTORE, ++index);

            // 再来一层代理,对外部屏蔽线程阻塞等待
            methodVisitor.visitVarInsn(ALOAD, index);
            methodVisitor.visitMethodInsn(INVOKESTATIC, AsyncResult.class.getName().replace(".", "/"),
                    "newAsyncResultProxy", ByteCodeUtils.getFuncDesc(AsyncResult.class, Future.class),
                    false);

            methodVisitor.visitInsn(ARETURN);
            ..............
        }
}
复制代码

线程池在调用 AsyncMessageSubscribe_doActionCallable 这个 Callable 的时候,它查找的方法是

java/util/concurrent/Callable.call:()Ljava.lang.Object;
复制代码

因为 Callable 是个泛型接口。

如果把实现类的签名和 call 方法的签名改为下面这样反而不行。

类的签名:Ljava/lang/Object;Ljava/util/concurrent/Callable<Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String;>;>;"
call方法的签名:()Lcom/wujiuye/asyncframework/handler/async/AsyncResult<Ljava/lang/String;>;
复制代码

因为泛型 <T> 编译后的描述符是 Ljava.lang.Object;

AsyncResult 泛型类。(选部分)

public class AsyncResult<T> implements Future<T> {

    private T result;
    
    @Override
    public T get() throws InterruptedException, ExecutionException {
        return result;
    }

}
复制代码

AsyncResult 泛型类编译后的字节码信息。(选部分)

public class com.wujiuye.asyncframework.handler.async.AsyncResult<T> implements java.util.concurrent.Future<T> {
  private T result;
    descriptor: Ljava/lang/Object;

  public T get() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException;
    descriptor: ()Ljava/lang/Object;
    Code:
       0: aload_0
       1: getfield      #2  // Field result:Ljava/lang/Object;
       4: areturn

复制代码

类型 Tdescriptor (类型描述符)为 Ljava/lang/Object; ,以及 get 方法中, getfield 指令指定的类型描述符也是 Ljava/lang/Object;

Callable 接口也是泛型接口,编译后 call 方法的描述符便是 ()Ljava.lang.Object;

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
复制代码

所以,如果通过字节码实现 Callable 接口, call 方法不要设置方法签名,设置方法签名意味着也要改变方法的描述符,一改变就会导致线程池中调用这个 Callablecall 方法抛出抽象方法调用错误,原因是根据 Callable 接口的 call 方法的描述符在其实现类中找不到对应的 call 方法。

原文  https://juejin.im/post/5e7e71f7e51d4547052ce4cc
正文到此结束
Loading...