1.什么是Spring Cloud Consul?
Spring Cloud Consul 是 Spring Cloud 提供的对
HashiCorp Consul 的支持。它是一种基于服务网格的工具,用于实现服务注册、发现、配置管理和健康检查。
主要功能包括:
- 服务注册与发现:通过 Consul 的服务注册功能,Spring Cloud Consul 可以实现微服务的动态注册和发现,简化服务间通信。
- 分布式配置管理:通过 Consul 的 Key/Value 存储机制,提供对分布式配置的管理。
- 健康检查:支持服务实例的健康检查,确保只有健康的实例可供其他服务调用。
- 选举与分布式锁:通过 Consul 的会话机制,支持分布式锁和领导选举。
Spring Cloud Consul 的选举机制
Spring Cloud Consul 的选举机制基于
Consul 会话(Session) 和
键值存储(Key/Value Store) 实现分布式领导选举。
工作原理:
- 会话创建:
- 服务实例向 Consul 创建一个会话(Session),这是一个临时的、与实例绑定的对象。
- 会话带有 TTL(生存时间),需要定期续约,保持活跃状态。
- 获取锁(Lock):
- 通过将一个 Key 的值设置为当前会话 ID,服务尝试获取该 Key 的锁。
- Consul 使用 CAS(Compare and Swap)操作来确保只有一个服务实例可以成功获取锁。
- 锁定成功:
- 成功获取锁的服务实例被视为领导者(Leader)。
- 其他实例会定期尝试获取锁,但只能等待当前锁被释放或超时。
- 锁释放或失效:
- 如果领导实例未能及时续约会话(例如宕机或网络中断),Consul 会释放与该会话相关联的锁,其他实例可以竞争成为新的领导者。
2.环境搭建
run Consul Agent
docker run -d --name=dev-consul -p 8500:8500 consul
web ui
http://localhost:8500
3.代码工程
实验目标
- 使用 Consul 提供的会话机制和键值存储来实现 分布式领导选举。
- 通过
@InboundChannelAdapter
和 @ServiceActivator
实现周期性检查领导身份并执行领导任务。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud-demo</artifactId>
<groupId>com.et</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>LeaderElection</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Cloud Starter Consul Discovery -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
</dependencies>
</project>
LeaderElectionConfig.java
package com.et;
import jakarta.annotation.PreDestroy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.web.client.RestTemplate;
@Configuration
public class LeaderElectionConfig {
private static final String LEADER_KEY = "service/leader";
private static final String CONSUL_URL = "http://localhost:8500";
private String sessionId;
@Bean
@InboundChannelAdapter(value = "leaderChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<String> leaderMessageSource() {
return () -> {
// Implement logic to check if this instance is the leader
boolean isLeader = checkLeadership();
return MessageBuilder.withPayload(isLeader ? "I am the leader" : "I am not the leader").build();
};
}
@Bean
@ServiceActivator(inputChannel = "leaderChannel")
public MessageHandler leaderMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
// Implement logic to perform leader-specific tasks
}
};
}
private final RestTemplate restTemplate = new RestTemplate();
public LeaderElectionConfig() {
this.sessionId = createSession();
}
private String createSession() {
String url = CONSUL_URL + "/v1/session/create";
HttpHeaders headers = new HttpHeaders();
HttpEntity<String> entity = new HttpEntity<>("{\"Name\": \"leader-election-session\"}", headers);
//ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
// PUT
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.PUT, entity, String.class);
// Extract session ID from response
return response.getBody().split("\"")[3]; // This is a simple way to extract the session ID
}
public boolean checkLeadership() {
String url = CONSUL_URL + "/v1/kv/" + LEADER_KEY + "?acquire=" + sessionId;
HttpHeaders headers = new HttpHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
ResponseEntity<Boolean> response = restTemplate.exchange(url, HttpMethod.PUT, entity, Boolean.class);
return Boolean.TRUE.equals(response.getBody());
}
public void releaseLeadership() {
String url = CONSUL_URL + "/v1/kv/" + LEADER_KEY + "?release=" + sessionId;
HttpHeaders headers = new HttpHeaders();
HttpEntity<String> entity = new HttpEntity<>(headers);
ResponseEntity<Boolean> response = restTemplate.exchange(url, HttpMethod.PUT, entity, Boolean.class);
if (Boolean.TRUE.equals(response.getBody())) {
System.out.println("Released leadership successfully");
} else {
System.out.println("Failed to release leadership");
}
}
@PreDestroy
public void onExit() {
releaseLeadership();
}
}
代码解释
- 初始化:
- 启动时通过
createSession()
向 Consul 注册会话。
- 周期性任务:
- 每 5 秒通过
checkLeadership()
检查领导身份。
- 如果是领导者,执行特定任务(如打印日志、执行业务逻辑)。
- 释放资源:
- 应用关闭时,通过
releaseLeadership()
释放锁。
LeaderElectionApplication.java
package com.et;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.integration.config.EnableIntegration;
@SpringBootApplication
@EnableDiscoveryClient
@EnableIntegration
public class LeaderElectionApplication {
public static void main(String[] args) {
SpringApplication.run(LeaderElectionApplication.class, args);
}
}
配置文件
node1
server.port=8081
spring.cloud.consul.discovery.enabled=true
spring.cloud.consul.discovery.register=true
spring.application.name=leader-election-example
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
node2
server.port=8082
spring.cloud.consul.discovery.enabled=true
spring.cloud.consul.discovery.register=true
spring.application.name=leader-election-example
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.instance-id=${spring.application.name}:${spring.application.instance_id:${random.value}}
4.测试
启动node1节点
启动node2节点
通过控制台观察日志,其中只有一台机器能选为主机
5.引用