最近在学习 Akka ,因为Scala的新版本已将Actors迁移到Akka [ 1 ](教材就是那本烂到一定程度的 Akka in action ,排版字体代码示例一如既往的Manning式的糟糕)——至于为什么要学习Actors,大概是因为 红宝书 里提出了一些对异步编程组合子的设计要求。
官方定义:
Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM.
做为一个消息驱动的响庆式工具集,Akka具有如下特性:
同时,借助 Actor model,Akka实现了消息驱动,并为上层屏蔽了线程相关细节,提供了简洁的编程接口。
Actor model 最早是由Carl Hewitt在1973定义,由Erlang OTP (Open Telecom Platform) 发扬光大,其消息传递更加符合面向对象的原始意图。Actor model属于并发计算模型 ,通过对Actors并发原语的定义和使用,来避免使用者直接接触多线程并发或线程池等基础概念。
传统的并发模型是基于多线程之间的共享内存,使用同步(锁)方法防止写争夺;而Actor使用的是消息模型,每个Actor在同一时间处理最多一个消息,可以发送消息给其他Actors,保证了 单写原则 ,从而避免了多线程写争夺。
关于这个Actors, InfoQ 上有一个很生动的解释:
你可以将Actors当作是一群人,他们互相之间不会面对面地交流,而只是通过邮件的方式进行沟通。
Actors之间进行通信时,它们通过Actor URI 来进行定位,格式如下:
如 akka.tcp://backend@0.0.0.0:2551/user/tooSimple
就是一个合法的Actors URI,它代表的意思是:这个Actor 运行在backend系统,走的是akka.tcp协议管道,本机2551端口,路径为根结点下的tooSimple结点
Actors在收到消息时,它可以做出以下决策(可多选):
这个决策模型,在代码中看起来是这样的(根据消息的不同,会有不同的决策):
def receive = { case Withdraw(amount) => { if(this.balance > amount) { this.balance -= this.balance } } case Deposit(amount) => { this.balance += amount } }
actor ! message
一个Actor的生命周期由如下事件组成(重启与否,由Actor的监督者/父结点来决定:
来看一个演示Actors生命周期的代码片断(LifeCycleHook是用于输出生命周期事件的Actor,TestActor是与它交互的Actor):
import akka.actor.{ActorSystem, Props, ActorLogging, Actor} class LifeCycleHook extends Actor with ActorLogging { println("Constructor") override def preStart() { println("preStart") } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { println("preRestart") super.preRestart(reason, message) } override def postRestart(reason: Throwable): Unit = { println("postRestart") super.postRestart(reason) } override def postStop() { println("postStop") } def receive = { case "restart" => throw new IllegalStateException("force restart") case msg: AnyRef => println("Receive") } } class TestActor extends Actor { def receive = { case _ => println("Received") } } object app extends App { implicit val system = ActorSystem("my-system") val testActor = system.actorOf(Props[TestActor], "Sender") val testActorRef = system.actorOf(Props[LifeCycleHook], "LifeCycleHook") testActorRef ! "restart" Thread.sleep(2000) testActorRef.tell("msg", testActor) system.stop(testActorRef) Thread.sleep(1000) system.stop(testActor) system.shutdown() }
它的输出:
Actor model里只有消息,只关注和传递消息,没有共享数据结构,Locking-free,这也是Akka能提供高性能计算模型的原因
PS: Akka也有.NET平台的实现 => { http://getakka.net/ }