1.什么是akka?
AKKA 是一个用于构建高并发、分布式和容错应用程序的开源框架。它基于Actor模型,提供了强大的并发抽象和工具,适用于各种业务场景。以下是一些使用AKKA框架的常见业务场景的示例:
- 实时数据处理:AKKA提供了轻量级的Actor模型,可以用于处理实时数据流。您可以创建多个Actor来处理数据的不同部分,并使用消息传递机制进行通信和协调。这在实时监控、实时分析和实时推送等场景中非常有用。
- 并发任务执行:AKKA的Actor模型使得并发任务的执行变得简单。您可以将任务分解为多个独立的Actor,并让它们并行地执行。每个Actor可以负责处理一部分任务,并通过消息传递进行协调和结果汇总。这在批处理、并行计算和任务调度等场景中非常有用。
- 分布式系统:AKKA提供了分布式Actor模型,可以在多个节点上分布Actor的实例。这使得构建分布式系统变得更加容易。您可以使用AKKA的远程Actor和集群功能来实现分布式的任务分发、数据共享和容错机制。
- 微服务架构:AKKA可以作为构建微服务架构的基础。每个微服务可以由一个或多个Actor组成,并使用消息传递进行通信。AKKA的容错机制和监督策略可以帮助实现高可用性和容错性的微服务。
- 实时通信和聊天应用:AKKA提供了高效的消息传递机制,适用于实时通信和聊天应用。每个用户可以由一个Actor表示,消息可以通过Actor之间的邮箱进行传递。这使得实现实时聊天、通知和协作功能变得更加简单。
Actor模型简介
Actor由状态(state)、行为(Behavior)和邮箱(mailBox)三部分组成
- 状态:Actor中的状态指的是Actor对象的变量信息,状态由Actor自己管理,避免了并发环境下的锁和内存原子性等问题
- 行为:行为指定的是Actor中计算逻辑,通过Actor接收到消息来改变Actor的状态
- 邮箱:邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送方Actor消息,接受方Actor从邮箱队列中获取消息
Actor 模型及其说明
- Akka 处理并发的方法基于 Actor 模型。(示意图)
- 在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是 对象一样。
- Actor 模型是作为一个并发模型设计和架构的。Actor 与 Actor 之间只能通过消息通信,如图 的信封
- Actor 与 Actor 之间只能用消息进行通信,当一个 Actor 给另外一个 Actor 发消息,消息是有 顺序的 (消息队列),只需要将消息投寄的相应的邮箱即可。
- 怎么处理消息是由接收消息的 Actor 决定的,发送消息 Actor 可以等待回复,也可以异步处理 【ajax】
- ActorSystem 的职责是负责创建并管理其创建的 Actor, ActorSystem 是单例的 (可以 ActorSystem 是一个工厂,专门创建 Actor),一个 JVM 进程中有一个即可,而 Acotr 是可以有多个的。
- Actor 模型是对并发模型进行了更高的抽象。
- Actor 模型是异步、非阻塞、高性能的事件驱动编程模型。[案例:说明 什么是异步、非阻塞,最 经典的案例就是 ajax 异步请求处理]
- Actor 模型是轻量级事件处理 (1GB 内存可容纳百万级别个 Actor),因此处理大并发性能高.
2.代码工程
实验目的
基于AKKA actor模型编程
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>akka</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.13</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
actor
传递string和int参数
package com.et.akka.actor;
import akka.actor.AbstractActor;
public class ActorNormal extends AbstractActor {
//process msg
@Override
public Receive createReceive() {
//Process a specific type of message, such as a string type message
Receive build = receiveBuilder().match(String.class,(msg)-> {
System.out.println(msg);
sender().tell("response", self());
}).match(Integer.class,(msg)-> {
System.out.println(msg+"1");
}).build();
return build;
}
}
传递对象参数
package com.et.akka.actor;
import akka.actor.AbstractActor;
import com.et.akka.model.User;
public class ActorStruct extends AbstractActor {
private final User user;
public ActorStruct(User userModel){
this.user = userModel;
}
//process msg
@Override
public Receive createReceive() {
Receive build = receiveBuilder().match(String.class,(msg)-> {
System.out.println(msg);
sender().tell(" I am a result of ActorStruct:"+user.getName(), self());
}).match(Integer.class,(msg)-> {
System.out.println(msg+"1");
}).build();
return build;
}
}
controller
package com.et.akka.controller;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.et.akka.actor.ActorNormal;
import com.et.akka.actor.ActorStruct;
import com.et.akka.model.User;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.concurrent.TimeUnit;
@RestController
public class AkkaController {
@GetMapping(value = "/Akka/AkkaSendString")
@ResponseBody
public void AkkaSendString() {
//Creates system management objects for all management actors
ActorSystem actorSystem = ActorSystem.create();
//use actorSystem.actorOf to define actorNormal as ActorRef
ActorRef actor = actorSystem.actorOf(Props.create(ActorNormal.class), "actorNormal");
//Send message Object msg (the content of the message, any type of data), final ActorRef sender (indicates that there is no sender (actually an Actor called deadLetters))
actor.tell("kiba", ActorRef.noSender());
}
@GetMapping(value = "/Akka/AkkaSendInt")
@ResponseBody
public void AkkaSendInt() {
ActorSystem actorSystem = ActorSystem.create();
ActorRef actor = actorSystem.actorOf(Props.create(ActorNormal.class), "actorNormal");
actor.tell(518, ActorRef.noSender());//send int
}
@GetMapping(value = "/Akka/AkkaAsk")
@ResponseBody
public void AkkaAsk() {
ActorSystem actorSystem = ActorSystem.create();
ActorRef actor = actorSystem.actorOf(Props.create(ActorNormal.class), "actorNormal");
Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS));
Future<Object> future = Patterns.ask(actor, "hello", timeout);
try {
Object obj = Await.result(future, timeout.duration());
String reply = obj.toString();
System.out.println("reply msg: " + reply);
} catch (Exception e) {
e.printStackTrace();
}
}
@GetMapping(value = "/Akka/AkkaAskStruct")
@ResponseBody
public void AkkaAskStruct() {
ActorSystem actorSystem = ActorSystem.create();
ActorRef actor = actorSystem.actorOf(Props.create(ActorStruct.class,new User(1,"kiba")), "actorNormal");
Timeout timeout = new Timeout(Duration.create(2, TimeUnit.SECONDS));
Future<Object> future = Patterns.ask(actor, "hello", timeout);
try {
Object obj = Await.result(future, timeout.duration());
String reply = obj.toString();
System.out.println("reply msg: " + reply);
} catch (Exception e) {
e.printStackTrace();
}
}
}
model
package com.et.akka.model;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* @author liuhaihua
* @version 1.0
* @ClassName User
* @Description todo
* @date 2024/09/11/ 9:45
*/
@Data
@AllArgsConstructor
public class User {
private int age;
private String name;
}
3.测试
- 启动Spring Boot应用
- 访问http://127.0.0.1:8088/Akka/AkkaSendString
- 访问http://127.0.0.1:8088/Akka/AkkaSendInt
- 访问http://127.0.0.1:8088/Akka/AkkaAsk
- 访问http://127.0.0.1:8088/Akka/AkkaAskStruct
4.引用