在Akka中, 一个 Future
是用来获取某个并发操作的结果的数据结构。这个操作通常是由Actor执行或由Dispatcher直接执行的. 这个结果可以以同步(阻塞)或异步(非阻塞)的方式访问。
Future提供了一种简单的方式来执行并行算法。
Future中的一个常见用例是在不需要使用Actor的情况下并发地执行计算。Future有两种使用方式:
scala.concurrent.Await
使用。 onComplete
、 onSuccess
、 onFailure
方式使用。 为了运行回调和操作,Futures需要有一个 ExecutionContext
。
如果你在作用域内有一个 ActorSystem
,它会它自己派发器用作ExecutionContext,你也可以用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹,或者甚至创建自己的实例。
通过导入 ExecutionContext.Implicits.global
来导入默认的全局执行上下文。你可以把该执行上下文看做是一个线程池,ExecutionContext是在某个线程池执行任务的抽象。
如果在代码中没有导入该执行上下文,代码将无法编译。
第一个例子展示如何创建一个future,然后通过阻塞方式等待其计算结果。虽然阻塞方式不是一个很好的用法,但是可以说明问题。这个例子中,通过在未来某个时间计算1+1,当计算结果后再返回。
importscala.concurrent.{Await,Future} importscala.concurrent.duration._ importscala.concurrent.ExecutionContext.Implicits.global objectFutureBlockDemoextendsApp{ implicit val baseTime = System.currentTimeMillis // create a Future val f = Future{ Thread.sleep(500) 1+1 } // this isblocking(blockingisbad) val result=Await.result(f,1second) // 如果Future没有在Await规定的时间里返回, // 将抛出java.util.concurrent.TimeoutException println(result) Thread.sleep(1000) }
代码解释:
Dispatcher
所执行,代码块的返回结果会被用来完成 Future
。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。 Await.result
方法将阻塞1秒时间来等待Future结果返回,如果Future在规定时间内没有返回,将抛出 java.util.concurrent.TimeoutException
异常。 scala.concurrent.duration._
,可以用一种方便的方式来声明时间间隔,如 100 nanos
, 500 millis
, 5 seconds
、 1 minute
、 1 hour
, 3 days
。还可以通过 Duration(100, MILLISECONDS)
, Duration(200, "millis")
来创建时间间隔。 有时你只需要监听 Future
的完成事件,对其进行响应,不是创建新的Future,而仅仅是产生副作用。
通过 onComplete
, onSuccess
, onFailure
三个回调函数来异步执行Future任务,而后两者仅仅是第一项的特例。
使用onComplete的代码示例:
importscala.concurrent.{Future} importscala.concurrent.ExecutionContext.Implicits.global importscala.util.{Failure, Success} importscala.util.Random objectFutureNotBlockextendsApp{ println("starting calculation ...") valf = Future { Thread.sleep(Random.nextInt(500)) 42 } println("before onComplete") f.onComplete{ caseSuccess(value) => println(s"Got the callback, meaning = $value") caseFailure(e) => e.printStackTrace } // do the rest of your work println("A ...") Thread.sleep(100) println("B ....") Thread.sleep(100) println("C ....") Thread.sleep(100) println("D ....") Thread.sleep(100) println("E ....") Thread.sleep(100) Thread.sleep(2000) }
使用onSuccess、onFailure的代码示例:
importscala.concurrent.{Future} importscala.concurrent.ExecutionContext.Implicits.global importscala.util.{Failure, Success} importscala.util.Random objectTest12_FutureOnSuccessAndFailureextendsApp{ valf = Future { Thread.sleep(Random.nextInt(500)) if(Random.nextInt(500) >250)thrownewException("Tikes!")else42 } f onSuccess { caseresult => println(s"Success: $result") } f onFailure { caset => println(s"Exception: ${t.getMessage}") } // do the rest of your work println("A ...") Thread.sleep(100) println("B ....") Thread.sleep(100) println("C ....") Thread.sleep(100) println("D ....") Thread.sleep(100) println("E ....") Thread.sleep(100) Thread.sleep(1000) }
代码解释:
上面两段例子中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。
然后在回调函数中进行相关处理。
先看一下示例:
importscala.concurrent.{Await,Future, future} importscala.concurrent.ExecutionContext.Implicits.global importscala.util.{Failure,Success} objectReturnFutureextendsApp{ implicit val baseTime = System.currentTimeMillis // `future` methodisanother way to create a future // Itstarts the computation asynchronouslyandretures aFuture[Int] that // will hold the resultofthe computation. def longRunningComputation(i: Int):Future[Int] = future { Thread.sleep(100) i + 1 } // this does notblock longRunningComputation(11).onComplete { caseSuccess(result) => println(s"result = $result") caseFailure(e) => e.printStackTrace } // keep the jvm fromshutting down Thread.sleep(1000) }
代码解释:
上面代码中的longRunningComputation返回一个 Future[Int]
,然后进行相关的异步操作。
其中 future
方法是创建一个future的另一种方法。它将启动一个异步计算并且返回包含计算结果的 Future[T]
。
通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息 actor ! msg
,这种方法只在发送者是一个Actor时有效;第二种是通过一个Future。
使用Actor的 ?
方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是:
importscala.concurrent.Await importakka.pattern.ask importscala.concurrent.duration._ importakka.util.Timeout implicit val timeout = Timeout(5seconds) val future = actor ? msg val result=Await.result(future, timeout.duration).asInstanceOf[String]
下面是使用 ?
发送消息给actor,并等待回应的代码示例:
importakka.actor._ importakka.pattern.ask importakka.util.Timeout importscala.concurrent.{Await, Future} importscala.language.postfixOps importscala.concurrent.duration._ caseobjectAskNameMessage classTestActorextendsActor{ defreceive = { caseAskNameMessage =>// respond to the 'ask' request sender ! "Fred" case_ => println("that was unexpected") } } objectAskDemoextendsApp{ //create the system and actor valsystem = ActorSystem("AskDemoSystem") valmyActor = system.actorOf(Props[TestActor], name="myActor") // (1) this is one way to "ask" another actor for information implicit valtimeout = Timeout(5seconds) valfuture = myActor ? AskNameMessage valresult = Await.result(future, timeout.duration).asInstanceOf[String] println(result) // (2) a slightly different way to ask another actor for information valfuture2: Future[String] = ask(myActor, AskNameMessage).mapTo[String] valresult2 = Await.result(future2,1second) println(result2) system.shutdown }
代码解释:
Await.result(future, timeout.duration).asInstanceOf[String]
会导致当前线程被阻塞,并等待actor通过它的应答来完成 Future
。但是阻塞会导致性能问题,所以是不推荐的。致阻塞的操作位于 Await.result
和 Await.ready
中,这样就方便定位阻塞的位置。 Future[Any]
,这是因为actor是动态的。 这也是为什么上例中注释(1)使用了 asInstanceOf
。 mapTo
方法来将Future转换到期望的类型。如果转换成功, mapTo
方法会返回一个包含结果的新的 Future,如果不成功,则返回 ClassCastException
异常。 转载请注明作者Jason Ding及其出处
Github博客主页(http://jasonding1354.github.io/)
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页