因为最近工作的关系,要把异步任务从应用服务器中拆分到专门的异步处理服务器中。
是采用MQ的方式将任务消息发出,在服务端进行处理,如下图所示:
Sample Flowchart Template (2).png
这种方案是采用MQ作为中间的媒介,在服务端采用线程池异步处理任务,处理完成之后将结果发送到MQ中,客户端采用侦听的方式得到结果继续进行处理。
这种方案的不足是,可能在某些需求的情况下,需要将结果存放到共享的HashMap或者Threadlocal中进行存放结果,客户端会一直阻塞,直到得到结果,从多线程的角度来说,还是用了共享变量,虽然共享变量可能是线程安全的,但是从并发模型的角度来讲,并不是一个最好的方式。
采用比较流行的Akka框架来实现。
Akka的五大特性
易于构建并行和分布式应用
可靠性(Resilient by Design)系统具备自愈能力,在本地/远程都有监护。
高性能(High Performance)在单机中每秒可发送50000000个消息。内存占用小,1GB内存中可保存2500000个actors。
弹性,无中心(Elastic — Decentralized)自适应的负责均衡,路由,分区,配置
可扩展(Extensible) 可以使用Akka 扩展包进行扩展。
因为之前一直研究Scala,Scala的多线程处理的性能是非常高的,那基于Scala语言而开发出来的Akka框架得到了广泛使用。那么接下来我将使用一个非常简单的例子,以及一些测试用例展现一下它的性能。代码如下:
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
/**
* PROJECT_NAME: akkademo
* DATE: 16/2/27
* CREATE BY: chao.cheng
**/
public class ToStringActor extends UntypedActor {
@Override
public void onReceive(Object message) {
System.out.println(message.toString());
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("toStringActor");
final ActorRef toString = system.actorOf(Props.create(ToStringActor.class),"toString");
for(int i=0;i<10000000;i++) {
toString.tell("test"+i,toString);
}
System.out.println("[结束]=======================");
}
}
程序的简单说明:采用事件的机制,循环发送一千万条数据,通过onReceive方法异步处理任务。
用VisualVM工具截图可以看到:
DFF8C31B-3886-4F75-A56B-EA78F85A6067.png
后台其实自适应只起了三个线程在运行,分别是dispatcher-2,dispatcher-3,dispatcher-4。