大多数分布式应用都需要一些有状态的组件来实现一致性和容错性。Atomix是一个可嵌入的库,有助于实现分布式资源的容错和一致性。
它提供了一套丰富的API,用于管理其资源,如集合、组和并发的工具。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>atomix</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<atomix.version>3.1.12</atomix.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
<version>${atomix.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-raft</artifactId>
<version>${atomix.version}</version>
</dependency>
</dependencies>
</project>
private static Atomix buildAtomix() {
List<String> raftMembers = Collections.singletonList("node1");
//创建atomix
return Atomix.builder(AtomixCluster.class.getClassLoader())
.withClusterId("my-cluster")
.withMemberId("node1")
.withHost("127.0.0.1")
.withPort(6789)
.withMembershipProvider(BootstrapDiscoveryProvider.builder().build())
.withManagementGroup(RaftPartitionGroup.builder("system")
.withNumPartitions(1)
.withDataDirectory(new File(LOCAL_DATA_DIR, "system"))
.withMembers(raftMembers)
.build())
.addPartitionGroup(RaftPartitionGroup.builder(groupName)
.withNumPartitions(raftMembers.size())
.withDataDirectory(new File(LOCAL_DATA_DIR, "data"))
.withMembers(raftMembers)
.build())
.build();
}
Atomix atomix = buildAtomix();
//atomix启动并加入集群
atomix.start().join();
//创建atomixMap
AsyncAtomicMap<Object, Object> asyncAtomicMap = atomix.atomicMapBuilder("myCfgName")
.withProtocol(MultiRaftProtocol.builder(groupName)
.withRecoveryStrategy(Recovery.RECOVER)
.withMaxRetries(MAX_RETRIES)
.build())
.withReadOnly(false)
.build()
.async();
//进行数据存储
asyncAtomicMap.put("HBLOG", "http://www.liuhaihua.cn");
//进行查询
CompletableFuture<Versioned<Object>> myBlog = asyncAtomicMap.get("HBLOG");
Versioned<Object> objectVersioned = myBlog.get();
System.out.printf("value:%s version:%s%n", objectVersioned.value(), objectVersioned.version());
//Elector
AsyncLeaderElector leaderElector = atomix.leaderElectorBuilder("leader")
.withProtocol(MultiRaftProtocol.builder(groupName)
.withRecoveryStrategy(Recovery.RECOVER)
.withMaxRetries(MAX_RETRIES)
.withMaxTimeout(Duration.ofMillis(15000L))
.build())
.withReadOnly(false)
.build()
.async();
//获取出当前节点
Member localMember = atomix.getMembershipService().getLocalMember();
System.out.println("localMember:" + localMember.toString());
String topic = "this is a topic";
//根据某一topic选举出leader,返回的是选举为leader的节点
Leadership leadership = (Leadership) leaderElector.run(topic, localMember.toString()).get();
System.out.println("==========" + leadership);
//get leadership
Leadership topicLeadership = (Leadership) leaderElector.getLeadership(topic).get();
System.out.println("------------>" + topicLeadership);
//输出所有的topic对应的leader
Map topicLeadershipMaps = (Map) leaderElector.getLeaderships().get();
System.out.println("++++++++++++" + topicLeadershipMaps.toString());
需要注意的是:基于java的atomix现已停止维护,这里仅用作学习目的,在分布式系统中体验一下。