这篇文章主要是第一次学习Akka编程,先试试水,探探坑,对Akka和SBT的使用有一个直观的了解,以几个简单的akka编程实例来说明akka的使用。希望在日后的学习和编程中,能有更多自己的体会和经验总结来分享。
Actor实例可以想象成是服务器上的Web服务,你无法控制,只能通过发送消息去请求执行任务或查询信息,而不能直接在Web服务中修改状态或者处理资源。通过发送不可改变的消息,虽然看上去有些限制,但是可以很简单安全的编写并发程序。
一个actor是基于Actor系统的最小单元,就像面向对象系统中的对象实例一样,它也封装了状态和行为。我们无法窥探actor内部的信息,只能通过发送消息来请求状态信息(就像是问一个人,他感觉如何)。actor中有一个存放不可变状态信息的信箱。我们通过发送信息和actor进行通信,当actor收到信息之后,它会运用相关算法来处理具体的信息。
在一个应用程序中,多个actor构成了一套层级系统,像是一个家族或者一个商业组织。一个actor可以认为是一个商业组织的个人。一个actor有一个父亲,称为监督者(supervisor),还有好多孩子,可以认为,在一个商业组织中,主席(actor)下面有多个副主席,副主席也有很多下属随从。
Actor系统的最佳实践是“ 委派任务 ”,尤其是当actor的行为被阻塞的时候。可以想象,在实际商业活动中,主席将要做的工作分配给下面的几个副主席去分别执行,而副主席也会将子任务分配给自己的随从,直到该任务被下属们执行完毕。
Actor模型的一个重要内容是处理故障。在工作工程中,如果出现错误或者抛出异常,actor和其子actor都将暂停,然后发送一条信息给监督者(supervisor)actor,报告出现故障的信号。根据工作任务和故障的性质,监督者actor将会作出几种选择:
现在我用一个最简单的actor编程实例来介绍akka编程,先给出代码:
importakka.actor.Actor importakka.actor.ActorSystem importakka.actor.Props classHelloActorextendsActor{ defreceive = { case"hello"=> println("hello back to you.") case_ => println("huh?") } } objectTest1_HelloActorextendsApp{ // actor need an ActorSystem valsystem = ActorSystem("HelloSystem") // create and start the actor valhelloActor = system.actorOf(Props[HelloActor], name="helloActor") // send two messages helloActor ! "hello" helloActor ! "what" // shutdown the actor system system.shutdown }
代码注解:
hello
作为消息,做出相应打印动作 system.actorOf
创建actor实例 !
方法来发送消息 一个actor system是actors的层级集团,分享公共配置信息(比如分发器dispatchers,部署deployments,远程功能remote capabilities,地址addresses)。它同时也是创建和查询actors的入口。ActorSystem是为你的应用程序分配线程资源的结构。
当你调用 ActorSystem
的 actorOf
方法时,将创建并返回一个 ActorRef
的实例:
def actorOf(props: Props, name: String): ActorRef
。
这个引用用来处理actor,你可以将其看做是处理实际actor的代理人(broker)或包装外观(facade)。ActorRef防止你破坏Actor模型,比如直接处理Actor实例,或直接修改Actor实例中的变量。所以只能通过给actor发送消息方式来执行任务,这种“袖手旁观(不干涉,hands-off)”的方法帮助巩固适宜的编程实践。
ActorRef有以下特点:
下面给出的是两个actor实例相互发送消息进行通信的PingPong示例:
importakka.actor._ caseobjectPingMessage caseobjectPongMessage caseobjectStartMessage caseobjectStopMessage classPing(pong: ActorRef)extendsActor{ varcount =0 defincrementAndPrint {count +=1; println(s"$count:ping")} defreceive = { caseStartMessage => incrementAndPrint pong ! PongMessage casePingMessage => incrementAndPrint if(count >99) { sender ! StopMessage println("ping stopped") context.stop(self) } else sender ! PongMessage case_ => println("Ping got unexpected information") } } classPongextendsActor{ varcount =0 defreceive = { caseStopMessage => println("pong stopped") context.stop(self) casePongMessage => count += 1 println(s"$count:pong") sender ! PingMessage case_ => println("Pong got unexpected information") } } objectPingPangTestextendsApp{ valsystem = ActorSystem("PingPongTest") valpongActor = system.actorOf(Props[Pong], name="pong") valpingActor = system.actorOf(Props(newPing(pongActor)), name = "ping") pingActor ! StartMessage }
代码注释:
ActorSystem
之后; Pong
的actor实例(pongActor对象其实是 ActorRef
的实例); Ping
的actor实例,其构造函数接受 ActorRef
参数; pingActor
发送一个 StartMessage
消息来启动pingActor和pongActor的具体动作; Ping
Actor和 Pong
Actor通过PingMessage和PongMessage相互发送消息, sender
用来引用消息发送源Actor; Ping
通过计数,知道进行了100次消息的发送之后,发送StopMessage来终止actor。分别调用自己的 context.stop
方法来结束 在ActorSystem层面,通过调用 system.actorOf
方法来创建actors;在actor内部,通过调用 context.actorOf
方法来创建子actor。
下面给出一个ParentChild示例:
importakka.actor._ caseclassCreateChild(name: String) caseclassName(name: String) classChildextendsActor{ varname ="No name" overridedefpostStop: Unit = { println(s"D'oh! They killed me ($name): ${self.path}") } defreceive = { caseName(name) =>this.name = name case_ => println(s"Child $name got message.") } } classParentextendsActor{ defreceive = { caseCreateChild(name) => // Parent creates a new Child here println(s"Parent about to create Child ($name) ...") valchild = context.actorOf(Props[Child], name=s"$name") child ! Name(name) case_ => println(s"Parent got some other message.") } } objectParentChildDemoextendsApp{ valactorSystem = ActorSystem("ParentChildTest") valparent = actorSystem.actorOf(Props[Parent], name="Parent") // send messages to Parent to create to child actors parent ! CreateChild("XiaoMing") parent ! CreateChild("XiaoLiang") Thread.sleep(500) // lookup XiaoMing, the kill it println("Sending XiaoMing a PoisonPill ... ") valxiaoming = actorSystem.actorSelection("/user/Parent/XiaoMing") xiaoming ! PoisonPill println("XiaoMing was killed") Thread.sleep(5000) actorSystem.shutdown }
打印结果:
Parent about to create Child (XiaoMing) ... Parent about to create Child (XiaoLiang) ... Sending XiaoMing a PoisonPill ... XiaoMing was killed D'oh! They killed me (XiaoMing): akka://ParentChildTest/user/Parent/XiaoMing D'oh! They killed me (XiaoLiang): akka://ParentChildTest/user/Parent/XiaoLiang
在ActorSystem层面,通过 system.stop(actorRef)
来终止一个actor;在actor内部,使用 context.stop(actorRef)
来结束一个actor。
如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem的 dead letter mailbox
, 但是这取决于邮箱的实现。
actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己(调用postStop,销毁邮箱,向DeathWatch发布Terminated,通知其监管者)。这个过程保证actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应(i.e.由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。
在 ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。
postStop()
hook是在actor被完全终止以后调用的。这是为了清理资源:
overridedefpostStop() = { // 关闭文件或数据库连接 }
还有其他两种方式,发送 PoisonPill
消息或者使用 gracefulStop
终止。
你也可以向actor发送 akka.actor.PoisonPill
消息,这个消息处理完成后actor会被终止。PoisonPill与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。
如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop。下面给出gracefulStop的代码示例:
importakka.actor._ importakka.pattern.gracefulStop importscala.concurrent.{Await, ExecutionContext, Future} importscala.concurrent.duration._ importscala.language.postfixOps caseobjectTestActorStop classTestActorextendsActor{ defreceive = { caseTestActorStop => context.stop(self) case_ => println("TestActor got message") } overridedefpostStop {println("TestActor: postStop")} } objectGracefulStopTestextendsApp{ valsystem = ActorSystem("GracefulStopTest") valtestActor = system.actorOf(Props[TestActor], name="TestActor") // try to stop the actor graceful try{ valstopped: Future[Boolean] = gracefulStop(testActor,2seconds, TestActorStop) Await.result(stopped, 3seconds) println("testActor was stopped") } catch{ casee: akka.pattern.AskTimeoutException => e.printStackTrace } finally{ system.shutdown } }
gracefulStop(actorRef, timeout)
将返回一个Future实例,当目标actor有处理相关终止动作的消息时,会执行成功。
上面示例中,通过发送TestActorStop消息来终止actor;如果没有处理终止的工作,当超过2s后,Future抛出 akka.pattern.AskTimeoutException
异常。默认情况下,gracefulStop将发送PoisonPill消息。
当深入Akka actors,我们将认识监督者策略(supervisor strategies)概念。当实现了监督者策略,向actor发送一个 Kill
消息,这可以用来重新启动actor。如果使用默认的监督者策略,Kill消息将终止目标actor。
下面是示例代码:
importakka.actor._ classNumber5extendsActor{ defreceive = { case_ => println("Number 5 got a message") } overridedefpreStart { println("Number 5 is alive")} overridedefpostStop { println("Number 5::postStop called")} overridedefpreRestart(reason: Throwable, message: Option[Any]): Unit = { println("Number 5::preRestart called") } overridedefpostRestart(reason: Throwable): Unit = { println("Number 5::postRestart called") } } objectKillTestextendsApp{ valsystem = ActorSystem("KillTestSystem") valnumber5 = system.actorOf(Props[Number5], name="Number5") number5 ! "hello" number5 ! Kill system.shutdown }
打印的信息:
Number5isalive Number5gotamessage [ERROR][01/17/2016 19:20:09.342][KillTestSystem-akka.actor.default-dispatcher-3][akka://KillTestSystem/user/Number5]Kill(akka.actor.ActorKilledException) Number5::postStopcalled
转载请注明作者Jason Ding及其出处
Github博客主页(http://jasonding1354.github.io/)
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页