最近在检查Spring Integration区域时,我注意到与 Hazelcast datagrid 的非常好的集成。在以下位置查看:
https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-hazelcast
如果您使用Hazelcast,那么您可以从Hazelcast支持的各种分布式数据结构中提供Spring Integration通道基础架构,如:
真正令我印象深刻的是Spring Integration团队通过Hazelcast实施集群领导者选举。让我们来看看并测试它。但首先要做的事情是:
演示任务:我们有两个Spring Boot微服务,每10秒产生一个随机值放入分布式IMap。现在只允许一个微服务在某一时刻消费使用IMap的数据。为了使它变得有点辣,微服务应该在消息传递给其他节点之后放弃其领导。
通过Spring Integration Cluster Leadership解决方案:
首先,您需要将Spring Boot MicroService添加到领导游戏中:
@Bean <b>public</b> Candidate nodeService1Candidate() { <b>final</b> NodeCandidate candidate = <b>new</b> NodeCandidate(<font>"service1"</font><font>, HazelcastConfiguration.ROLE_JOB_MAP); <b>return</b> candidate; } @Bean <b>public</b> LeaderInitiator initiator() { <b>final</b> LeaderInitiator leaderInitiator = <b>new</b> LeaderInitiator(hazelcastConfiguration.hazelcastInstance(), nodeService1Candidate()); <b>return</b> leaderInitiator; } </font>
但这不是全部。我们的目标是在领导权被授予后从IMap开始数据消费,另一方面在领导被撤销后停止数据消费。
为此,我们需要监听org.springframework.integration.leader.DefaultCandidate子类NodeCandidate中的onGranted和onRevoked事件。第一个构造函数参数是节点id,第二个是角色名称。阅读 Spring Integration角色 ,但我不会使用它们。我将手动启动IMap更改生成者。
为了了解数据更改,SI Hazelcast集成提供了HazelcastEventDrivenMessageProducer,它可以监听分布式IMap更改并将适当的数据更改事件委派给Spring Integration通道基础结构。
@Configuration <b>public</b> <b>class</b> HazelcastConfiguration { . . @Bean <b>public</b> IMap<String, String> getDistributedMapForJobInput() { <b>return</b> hazelcastInstance().getMap(INPUT_JOB_MAP); } @Bean <b>public</b> MessageChannel inputJobChannel() { <b>return</b> <b>new</b> DirectChannel(); } @Bean <b>public</b> HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() { <b>final</b> HazelcastEventDrivenMessageProducer producer = <b>new</b> HazelcastEventDrivenMessageProducer( getDistributedMapForJobInput() ); producer.setOutputChannel(inputJobChannel()); producer.setCacheEventTypes(<font>"ADDED,REMOVED,UPDATED,CLEAR_ALL"</font><font>); producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE); producer.setAutoStartup(false); <b>return</b> producer; } } </font>
setAutostartup(false)的注意事项。我们希望让这位生产者在获得领导时能够开始启动:
<font><i>/** * Created by tomask79 on 24.08.17. */</i></font><font> <b>public</b> <b>class</b> NodeCandidate <b>extends</b> DefaultCandidate { @Autowired <b>private</b> HazelcastConfiguration hazelcastConfiguration; <b>public</b> NodeCandidate(String nodeId, String role) { <b>super</b>(nodeId, role); } @Override <b>public</b> <b>void</b> onGranted(Context ctx) { <b>super</b>.onGranted(ctx); System.out.println(</font><font>"Leader granted to: "</font><font>+ctx.toString()); hazelcastConfiguration.hazelcastEventDrivenMessageProducer().start(); } @Override <b>public</b> <b>void</b> onRevoked(Context ctx) { <b>super</b>.onRevoked(ctx); System.out.println(</font><font>"Leader revoked to: "</font><font>+ctx.toString()); hazelcastConfiguration.hazelcastEventDrivenMessageProducer().stop(); } } </font>
最后一项任务是消费来自分布式IMap的消息并放弃领导,以便其他节点可以接受工作并享受一些乐趣。因此,让我们声明ServiceActivator监听来自jobInputChannel DirectChannel 的数据:
@Bean @ServiceActivator(inputChannel =“inputJobChannel”) <b>public</b> MessageHandler logger(){ <b>return</b> <b>new</b> LogAndGiveInitiatorHandler(); }
将消息记录到标准输出:
<font><i>/** * Created by tomask79 on 24.08.17. */</i></font><font> <b>public</b> <b>class</b> LogAndGiveInitiatorHandler implements MessageHandler{ @Autowired <b>private</b> JobServices jobServices; @Override <b>public</b> <b>void</b> handleMessage(Message<?> message) throws MessagingException { System.out.println(message.toString()); System.out.println(</font><font>"Waiting for another node to take the work...!"</font><font>); jobServices.giveUp(); <b>try</b> { Thread.sleep(5000); } <b>catch</b> (InterruptedException e) { e.printStackTrace(); } System.out.println(</font><font>"........"</font><font>); } } </font>
并命令微服务放弃其领导地位:
<font><i>/** * Created by tomask79 on 10.08.17. */</i></font><font> @Service <b>public</b> <b>class</b> JobServices { @Autowired <b>private</b> LeaderInitiator initiator; . . <b>public</b> <b>void</b> giveUp() { <b>if</b> (initiator.getContext().isLeader()) { System.out.println(</font><font>"Giving up on leadership: "</font><font>+initiator.getContext().toString()); initiator.getContext().yield(); } } } </font>
就是这样!让我们测试整个包。
输出:
[INFO] Reactor Summary: <p>[INFO] <p>[INFO] spring-cloud-cluster-demo .......................... SUCCESS [ 0.412 s] <p>[INFO] spring-microservice-hazelcast ...................... SUCCESS [ 2.380 s] <p>[INFO] spring-microservice-service1 ....................... SUCCESS [ 3.685 s] <p>[INFO] spring-microservice-service2 ....................... SUCCESS [ 2.745 s] <p>[INFO] ------------------------------------------------------------------------ <p>[INFO] BUILD SUCCESS <p>[INFO] ------------------------------------------------------------------------ <p>[INFO] Total time: 10.047 s <p>[INFO] Finished at: 2017-08-28T19:57:53+02:00 <p>[INFO] Final Memory: 40M/532M <p>[INFO] ------------------------------------------------------------------------
现在打开两个终端并运行:
要验证两个微服务是否形成有效的Hazelcast集群,您应该看到类似的内容:
Members [2] { Member [192.168.1.112]:5702 Member [192.168.1.112]:5701 <b>this</b> }
在形成Hazelcast群集设置后,您应该看到以下输出
第一终端(获取领导权并放弃给服务2):
[st-leadership-0] com.example.hazelcast.NodeCandidate : DefaultCandidate{role=leader, id=service1} has been granted leadership; context: HazelcastContext{role=leader, id=service1, isLeader=<b>true</b>} Leader granted to: HazelcastContext{role=leader, id=service1, isLeader=<b>true</b>} <p>[st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer GenericMessage [payload=EntryEventMessagePayload [key=service18eff005d-6da8-4fb8-b747-f977ad8e1544, value=a61b5f9a-1b96-493d-b240-61ccb549ba17, oldValue=<b>null</b>], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5702, id=f9c5455b-b42d-3ab7-ec49-9bd33db9ec5f, hazelcast_eventType=ADDED, timestamp=1503945864993}] Waiting <b>for</b> another node to take the work...! Giving up on leadership: HazelcastContext{role=leader, id=service1, isLeader=<b>true</b>}
第二终端(获取领导权并放弃给服务1)
Leader granted to: HazelcastContext{role=leader, id=service2, isLeader=<b>true</b>} 2017-08-28 20:47:08.001 INFO 1357 --- [st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer 2017-08-28 20:47:08.019 INFO 1357 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8082 (http) 2017-08-28 20:47:08.029 INFO 1357 --- [ main] c.e.SpringMicroserviceServiceComponent : Started SpringMicroserviceServiceComponent in 12.807 seconds (JVM running <b>for</b> 13.507) ........ GenericMessage [payload=EntryEventMessagePayload [key=service249cde108-5045-4b77-84f7-cdc9f524df04, value=c4fe775f-e44d-4f10-ac43-0fe7157c0e67, oldValue=<b>null</b>], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5701, id=df474178-1ff1-35e5-e1e1-d3f6f25d6d68, hazelcast_eventType=ADDED, timestamp=1503946037904}] Waiting <b>for</b> another node to take the work...! Giving up on leadership: HazelcastContext{role=leader, id=service2, isLeader=<b>true</b>}
总结
只是一些想法。如果在生产过程中流经系统的消息数量每天只有几千(我们在Embedit的生产系统中的速率),那么Hazelcast肯定是一种有过度杀伤力工作。建议始终使用JMS / AMPQ以循环方式将数据分发到您的节点。 但是当处理存储在内存中的大数据时。你不应该错过由Hazelcast支持的Spring Integration Election算法。