Promise使用commonJs规范中的一中异步编程解决方案,比传统的解决方案—回调函数和事件—更合理和更强大。
在java中,多线程编程相对来说是一件比较麻烦的事情,虽然在java concurrent
包中提供了一系列工具,但是我们想知道线程何时结束、获取线程执行结果、异常处理一直是件比较麻烦的事。 future.get()
会阻塞当前线程。Goolge-Guava Concurrent中的Service和ServiceManager很好地解决了这一问题,但是使用繁琐。某些时候我们需要线程a结束后,拿到线程a的结果立即执行线程b,可能会使用guava的ListenableFuture添加监听,可能得逻辑如下
public static void main(String[] args) throws Exception{ ExecutorService pool = Executors.newFixedThreadPool(1); ListeningExecutorService service = MoreExecutors.listeningDecorator(pool); MoreExecutors.addDelayedShutdownHook(service,3,TimeUnit.SECONDS); ListenableFuture<Integer> listenableFuture = service.submit(()->{ Random random = new Random(); int i=0; while (i<3){ i++; Console.log(random.nextInt(100)); Thread.sleep(100); } return random.nextInt(100); }); Futures.addCallback(listenableFuture, new FutureCallback<Integer>() { @Override public void onSuccess(@Nullable Integer result) { Console.log("执行结果:"+result); //执行线程b ..... Console.log("执行线程b"); ..... } @Override public void onFailure(Throwable t) { Console.log("线程执行发生错误"); t.printStackTrace(); } }); Console.log("主程序结束了"); }
输出结果
主程序结束了 47 62 48 执行结果:33 执行线程b
可以看到线程b的执行是嵌套在线程a的成功回调中的,然后线程b又是一个回调,做前端的知道ajax的回调地狱是多么痛苦和复杂
ajax(xxx,function(r1){ ajax(xxx,function(r2)){ ajax(xxx,function(r3){ }) } })
在上面java例子中,“主程序结束了”最先被打印,假如需要在springMVC的controller中返回线程b的执行结果,意味着需要在线程b执行结束前阻塞当前线程,这又该怎么做?如果以后需求变化,需要线程a和线程a1共同的执行结果去执行线程b,那改动会相当麻烦。
在前端开发中,也经常会遇到同样的问题,commonJs提出的Promise A+规范就很好地解决了这一个问题,ES6已经实现了这个规范。
java promise是一个开源项目,github地址: promise
是Promise A+规范的java实现版本,使用Promise可以很好地解决上面例子的监听回调问题,以第一个例子为例,用java Promise实现如下
ExecutorService pool = Promise.pool(1); IPromise promiseA = new Promise.Builder().pool(pool).promiseHanler(executor -> { //PromiseA的业务逻辑 Random random = new Random(); int i=0; while (i<3){ i++; System.out.println(random.nextInt(100)); Thread.sleep(100); } return random.nextInt(100); }).build(); promiseA.then(resultA -> {//PromiseA的成功回调 //在promiseA的回调中创建PromiseB IPromise promiseB = new Promise.Builder().pool(pool).externalInput(resultA) .promiseHanler(executor -> { //promiseB的业务逻辑 String bResult = "b:"+executor.getExternalInput(); return bResult; }).build(); //返回PromiseB return promiseB; }).then(resultB -> {//PromiseB的成功回调 System.out.println(resultB); return resultB; }).pCatch(e->{//捕获PromiseA和PromiseB的异常 e.printStackTrace(); return null; }); pool.shutdown(); System.out.println("主程序结束了");
打印结果如下
65 97 85 b:24 主程序结束了
从上面可以看到PromiseA和PromiseB是链式调用的,在promiseA的回调中创建并返回了promiseB,但是promiseB的回调是在外层调用的,假如需要顺序执行a->b-c->d四个线程,调用顺序如下
new PromiseA() .then(dataA->new PromiseB())//A的回调 .then(dataB->new PromiseC())//B的回调 .then(dataC->new PromiseD())//C的回调 .then(dataD->xxx)//D的回调 .pCatch(error->xxxx)//捕获中间可能产生的异常
promise规范可以参考Promise A+规范。其中ES6 Promise对象 在Promise A+规范上做了一些补充。java promise在使用上基本与ES6 Promise对象保持一致,部分地方有些许不同,后面会做出说明。 Promise的三个状态
Promise是IPromise的实现,Promise实例一经创建,将立即异步执行,部分接口如下
then(null,onRejectedExecutor)的别名,但返回不同于then,出现异常时可以选择不拒绝接下来Promise的执行,可用于异常修正,类似于try{}catch{}
该方法会尝试捕获当前promise的异常,最终返回一个新Promise,当被捕获Promise处于不同的状态时有不同的行为
指定一个监听器,在promise状态转为fulfilled或rejected调用,该方法不会阻塞线程执行,可以多次调用指定多个监听器
listen的别名,行为同listen
获取promise的当前状态
获取promise fulfilled状态下的终值,其余状态下时为null
获取promise rejected状态下的拒因,其余状态下为null
获取promise对应异步任务的future
尝试取消promise对应的异步任务,底层调用future.cancel(true)。fulfilled或rejected状态下无效。
Promise对象生成器
指定一个线程池用于执行promise任务,如果不指定,每一个promise都将启动一个线程
指定promise执行器,在promiseHanler的run方法中实现线程的具体业务逻辑,注意==promise对象一经创建,将立即执行其中的逻辑==
向Promise注入一个外部参数,可以在指定PromiseHandler时通过PromiseExecutor.getExternalInput()获取
int i = 3; IPromise p = new Promise.Builder() .externalInput(i).promiseHanler(new PromiseHandler() { public Object run(PromiseExecutor executor) { Integer args = (Integer) executor.getExternalInput(); return args*2; } }).build();
指定一个promise x,使当前promise接受 x 的状态
ExecutorService fixedPool = Promise.pool(1); IPromise promise1 = new Promise.Builder().pool(fixedPool).promiseHanler(executor->3).build(); IPromise promise2 = new Promise.Builder().pool(fixedPool) .promise(promise1) .promiseHanler(executor->4+(Integer) executor.getPromiseInput()) .build() .then(resolvedData->{ System.out.println(resolvedData); return resolvedData; }, rejectedReason-> rejectedReason.printStackTrace());
最终结果返回7,。如果promise1在执行过程中抛出异常e,promise2将被拒绝执行,将会以e作为拒因,状态为rejected返回一个新的Promise,最终会执行 rejectedReason-> rejectedReason.printStackTrace()
回调。
创建一个Promise实例
将多个 Promise 实例p1,...pn,包装成一个新的 Promise 实例 p,只有当p1-pn的状态都转为fulfilled时,p的状态才为fulfilled,此时p1-pn的返回值包装为一个数组Object[r1,...rn]作为p的终值。
只要p1-pn中任意一个被rejected,p的状态就转为rejected,将第一个被rejected的promise的拒因作为p的拒因,并尝试取消其余promise的执行(内部调用future.cancel(true))
将多个 Promise p1,...pn实例,包装成一个新的 Promise 实例 p,只要p1-pn有一个状态发生改变,p的状态立即改变。并尝试取消其余promise的执行(内部调用future.cancel(true))
第一个改变的promise的状态和数据作为p的状态和数据
创建一个终值为null、fulfilled状态的promise
创建一个终值为object、fulfilled状态的promise
static IPromise resolve(Object object,List args)
将object的then方法以异步方式执行,then方法的执行结果作为Promise的终值
static IPromise resolve(Object object,String methodName,List args)
将object的指定方法以异步方式执行,该方法的执行结果作为Promise的终值,目标方法的参数必须按顺序包含在List中,如object.doSomething(int a,Map b),用resolve执行为
List args = new ArrayList() args.add(1); args.add(map) Promise.resolve(object,"doSomething",args);
创建一个拒因为reason、rejected状态的promise
static IPromise pTry(Object object,String methodName,List args)
将object的指定方法以同步方式执行,该方法的执行结果作为Promise的终值,如果object为IPromise实例,将忽略methodName和args参数,异步执行该实例。
该方法是以Promise统一处理同步和异步方法,不管object是同步操作还是异步操作,都可以使用then指定下一步流程,用pCatch方法捕获异常,避免开发中出现以下情况
try{ object.doSomething(args1,args2);//可能会抛出异常 promise.then(resolvedData->{ //一些逻辑 }).then(resolvedData->{ //一些逻辑 }).pCatch(e->{ //异常处理逻辑 }) }catch(Exception e){ //异常处理逻辑 }
使用pTry,可以简化异常处理
List args = new ArrayList(){args1,args2}; Promise.pTry(object,"doSomething",args) .then(resolvedData->{ //一些逻辑 }).then(resolvedData->{ //一些逻辑 }).pCatch(e->{ //异常处理逻辑 })
定义异步逻辑的接口
run方法中实现具体的业务逻辑,最终run方式是在线程的call方法执行,如果run方法中含有wait、sleep...等锁操作,可能需要自行处理 InterruptedException
。因为该线程可能被外部调用cancel()或interrupt()方法
promise状态处理
将Promise对象的状态从“未完成”变为“成功”(即从pending变为fulfilled)。注意该方法一经调用,promise状态将不可改变,如下例,在调用executor.resolve(3);后,return之前抛出一个异常,promise的状态依旧是fulfilled,终值为3。
new Promise.Builder().promiseHanler(new PromiseHandler(){ @Override public Object run(PromiseExecutor executor) { executor.resolve(3); throw new RuntimeException("error"); return null; } }).build()
在run方法中executor.resolve(3)等同于return 3
@Override public Object run(PromiseExecutor executor) { return 3; }
大多数情况下建议直接使用return返回promise的终值。
将Promise对象的状态从“未完成”变为“失败”(即从pending变为fulfilled)
获取通过 new Promise.Builder().externalInput(Object externalInput)
方法注入的参数,具体参考 Promise.Builder#externalInput(Object externalInput)
获内部promise的执行结果。通过new Promise.Builder().promise(promise1)指定的promise1的执行结果。具体参考 Promise.Builder#promise(IPromise promise)
fulfilled回调接口
状态转为fulfilled时的回调,返回值可以是IPromise实例或普通对象。如果object是IPromise实例,object作为then方法的返回值,如果object是个普通对象,以object作为终值、状态为fulfilled包装一个新Promise作为then方法的返回值
rejected回调接口
当Promise转变为rejected状态时的回调
rejected回调接口
当发生异常时的回调,最终返回一个Promise或普通对象,如果是一个普通对象,这个对象将作为下一个Promise的终值
当Promise执行结束时的回调(无论是fulfilled还是rejected)
new Promise.Builder().promiseHanler(new PromiseHandler(){ @Override public Object run(PromiseExecutor executor) { executor.resolve(3);//返回异步执行结果3 return null; } }).build().then(new OnFulfilledExecutor() { @Override public Object onFulfilled(Object resolvedData) { Integer i = ((Integer)resolvedData)+1;//获取上一个promsie执行结果3,执行+1 System.out.println(i);//输出执行结果4 //创建一个新的promise,将3作为该promise的输入 IPromise p = new Promise.Builder().externalInput(i).promiseHanler(new PromiseHandler() { @Override public Object run(PromiseExecutor executor) { //获取外部输入4 Integer args = (Integer) executor.getExternalInput(); executor.resolve(args*2);//执行 4x2 return null; } }).build(); return p;//返回该promise p } }) .then(new OnFulfilledExecutor() {//执行p的回调 @Override public Object onFulfilled(Object args) { System.out.println(args);//输出p的执行结果 return args; } }, new OnRejectedExecutor() {//捕获可能出现的异常 @Override public void onRejected(Throwable rejectedReason) throws Exception { rejectedReason.printStackTrace(); } });
结果
ExecutorService fixedPool = Promise.pool(1);//创建一个线程池 //创建promise1 IPromise promise1 = new Promise.Builder().pool(fixedPool).promiseHanler(executor->3).build(); //创建promise2 IPromise promise2 = new Promise.Builder().pool(fixedPool) .promise(promise1)//让promise2接受promise1的状态,优先执行promise1 .promiseHanler(executor->{ //获取promise1的执行结果,执行promise2的逻辑 return 4+(Integer) executor.getPromiseInput(); }) .build() .then(resolvedData->{ System.out.println(resolvedData);//打印promise2的执行结果 return resolvedData; }, rejectedReason-> rejectedReason.printStackTrace()); System.out.println("end"); fixedPool.shutdown();
结果
7 end
new Promise.Builder().promiseHanler(executor -> 3).build().then(resolvedData->{ System.out.println("a:"+resolvedData); return new Promise.Builder().promiseHanler(executor -> { executor.reject(new RuntimeException("err"));//抛出异常 return null; }).build(); }).then(resolvedData1 -> {//fulfilled回调 System.out.println("b:"+resolvedData1); return resolvedData1; },rejectReason -> {//rejected回调 System.err.println("c:"+rejectReason); });
结果
a:3 c:java.lang.RuntimeException: err
new Promise.Builder().promiseHanler(executor -> 0).build() .then(res0->{ System.out.println("a:"+res0);//输出 a:0 Thread.sleep(100); return 1;//返回1 }).then(res1 -> { throw new RuntimeException("throw error");//抛出异常 }).then(res2->{ Thread.sleep(100); System.out.println("b:"+res2); return 2; }).pCatch(e->{ Thread.sleep(100); System.out.println("c:");//输出c: e.printStackTrace(); return 3; }).then(res3->{ Thread.sleep(100); System.out.println("d:"+res3);//输出d:3 return 4; });
结果
a:0 c: runtimeException:throw error d:3
从上面结果可以看出,在res1出抛出异常后,拒绝了res2处的执行,被pCatch捕获,pCatch返回3,被包装成终值为3、fulfilled状态的promise,在res3打印d:3。
IPromise p1 = new Promise.Builder().promiseHanler(executor -> { Thread.sleep(1000); return 1; }).build(); IPromise p2 = new Promise.Builder().promiseHanler(executor -> { Thread.sleep(4000); return 2; }).build(); IPromise p3 = new Promise.Builder().promiseHanler(executor -> { Thread.sleep(2000); return 3; }).build(); long s = System.currentTimeMillis(); Promise.all(p1,p2,p3).then(resolvedData -> { Object[] datas = (Object[])resolvedData; for(Object d:datas){ System.out.println(d); } return null; },e->e.printStackTrace()); System.out.println("耗时:"+(System.currentTimeMillis()-s));
结果
1 2 3 耗时:4033
Map<String,Boolean> p1Flag = new HashMap<>(); p1Flag.put("flag",true); IPromise p1 = new Promise.Builder().externalInput(p1Flag).promiseHanler(executor -> { while (((Map<String,Boolean>)executor.getExternalInput()).get("flag")){ //do something System.out.println("p1 正在执行任务"); } System.out.println("p1任务完成,正常结束"); return 1; }).build(); IPromise p2 = new Promise.Builder().promiseHanler(executor -> { while (!Thread.currentThread().isInterrupted()){ System.out.println("执行p2正常逻辑"); } System.err.println("p2线程被取消"); return 2; }).build(); IPromise p3 = new Promise.Builder().promiseHanler(executor -> { Thread.sleep(10); throw new RuntimeException("p3抛出异常"); }).build(); IPromise p4 = new Promise.Builder().finalPromise("4",true).build(); long s = System.currentTimeMillis(); Promise.all(p1,p2,p3,p4).then(resolvedData -> { Object[] datas = (Object[])resolvedData; for(Object d:datas){ System.out.println(d); } return null; },e->e.printStackTrace()); System.out.println("耗时:"+(System.currentTimeMillis()-s)); p1Flag.put("flag",false);
可能的结果如下
p1 正在执行任务 p1 正在执行任务 执行p2正常逻辑 执行p2正常逻辑 p1 正在执行任务 runtimeException:p3抛出异常 p2线程被取消 p1 正在执行任务 p1 正在执行任务 p1 正在执行任务 p1任务完成,正常结束
从上面结果可以看出,开始p1和p2都在正常执行,当p3抛出异常后,Promise.all方法立即返回p3的异常并打印,同时取消p1和p2的执行,由于p2判断了线程状态 Thread.currentThread().isInterrupted()
,所以p2执行了正常的退出逻辑。p1仍然在执行,并没有被取消掉,最后打印p1任务完成,正常结束是因为程序末尾执行了 p1Flag.put("flag",false);
,否则p1会永远循环打印。
public class ThenTest { public Integer then(int a,int b){ //打印当前执行现场名称 System.out.println(Thread.currentThread().getName()); return a+b; } public static void main(String[] args){ //打印主线程名称 System.out.println(Thread.currentThread().getName()); List arg = new ArrayList<>(); arg.add(1); arg.add(2); //将ThenTest实例then方法异步执行 Promise.resolve(new ThenTest(),arg).then(resolvedData -> { System.out.println(resolvedData); return resolvedData; }).pCatch(e->{ e.printStackTrace(); return 1; }); } }
结果
main promise-thread-0 3