上一篇文章 Spring Boot系列21 Spring Websocket实现websocket集群方案讨论 里详细介绍了WebSocket集群的三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案。
第三个方案如下图
在方案一的基础进行如下修改,新的架构图流程如下:
工程名称:mvc 本文在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础进行修改。
<!-- webscoekt 集群 需要 引入支持RabbitMQ, redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 复制代码
application-wscluster.properties
# websocket集群需要配置RabbitMQ spring.rabbitmq.host:192.168.21.3 spring.rabbitmq.virtual-host: /icc-local spring.rabbitmq.username: icc-dev spring.rabbitmq.password: icc-dev # 配置redis spring.redis.database=0 spring.redis.host=192.168.21.4 # spring.redis.password= spring.redis.port=7001 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 复制代码
接口IRedisSessionService定义了对redis的操作 IRedisSessionService实现类将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询 IRedisSessionService
public interface IRedisSessionService { void add(String name, String wsSessionId); boolean del(String name); String get(String name); } 复制代码
SimulationRedisSessionServiceImpl将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询
@Component public class SimulationRedisSessionServiceImpl implements IRedisSessionService { @Autowired private RedisTemplate<String, String> template; // key = 登录用户名称, value=websocket的sessionId private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32); /** * 在缓存中保存用户和websocket sessionid的信息 * @param name * @param wsSessionId */ public void add(String name, String wsSessionId){ BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name); boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS); } /** * 从缓存中删除用户的信息 * @param name */ public boolean del(String name){ return template.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { byte[] rawKey = template.getStringSerializer().serialize(name); return connection.del(rawKey) > 0; } }, true); } /** * 根据用户id获取用户对应的sessionId值 * @param name * @return */ public String get(String name){ BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name); return boundValueOperations.get(); } } 复制代码
装饰WebSocketHandlerDecorator对象,在连接建立时,保存websocket的session id,其中key为帐号名称;在连接断开时,从缓存中删除用户的sesionId值。此websocket sessionId值用于创建消息的路由键。
@Component public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory { private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class); @Autowired private IRedisSessionService redisSessionService; @Override public WebSocketHandler decorate(WebSocketHandler handler) { return new WebSocketHandlerDecorator(handler) { @Override public void afterConnectionEstablished(final WebSocketSession session) throws Exception { // 客户端与服务器端建立连接后,此处记录谁上线了 Principal principal = session.getPrincipal(); if(principal != null){ String username = principal.getName(); log.info("websocket online: " + username + " session " + session.getId()); redisSessionService.add(username, session.getId()); } super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { // 客户端与服务器端断开连接后,此处记录谁下线了 Principal principal = session.getPrincipal(); if(principal != null){ String username = session.getPrincipal().getName(); log.info("websocket offline: " + username); redisSessionService.del(username); } super.afterConnectionClosed(session, closeStatus); } }; } } 复制代码
在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上增加如下功能,将myWebSocketHandlerDecoratorFactory配置到websocket
@Configuration // 此注解开使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping @EnableWebSocketMessageBroker public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer { @Autowired private MyPrincipalHandshakeHandler myDefaultHandshakeHandler; @Autowired private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor; @Autowired private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { …. } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { … } /** * 这时实际spring weboscket集群的新增的配置,用于获取建立websocket时获取对应的sessionid值 * @param registration */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory); super.configureWebSocketTransport(registration); } } 复制代码
在上文 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上,对此类进行修改
@Controller @RequestMapping(value = "/ws") public class TestMQCtl { private static final Logger log = LoggerFactory.getLogger(TestMQCtl.class); @Autowired private AmqpTemplate amqpTemplate; @Autowired private IRedisSessionService redisSessionService; /** * 向执行用户发送请求 * @param msg * @param name * @return */ @RequestMapping(value = "send2user") @ResponseBody public int sendMq2User(String msg, String name){ // 根据用户名称获取用户对应的session id值 String wsSessionId = redisSessionService.get(name); RequestMessage demoMQ = new RequestMessage(); demoMQ.setName(msg); // 生成路由键值,生成规则如下: websocket订阅的目的地 + "-user" + websocket的sessionId值。生成值类似: String routingKey = getTopicRoutingKey("demo", wsSessionId); // 向amq.topi交换机发送消息,路由键为routingKey log.info("向用户[{}]sessionId=[{}],发送消息[{}],路由键[{}]", name, wsSessionId, wsSessionId, routingKey); amqpTemplate.convertAndSend("amq.topic", routingKey, JSON.toJSONString(demoMQ)); return 0; } /** * 获取Topic的生成的路由键 * * @param actualDestination * @param sessionId * @return */ private String getTopicRoutingKey(String actualDestination, String sessionId){ return actualDestination + "-user" + sessionId; } …. } 复制代码
以不同端口启动两个服务启动服务类:WebSocketClusterApplication 以“--spring.profiles.active=wscluster --server.port=8081”参数启动服务A 以“--spring.profiles.active=wscluster --server.port=8082”参数启动服务B
登录模拟帐号:xiaoming登录服务A,xiaoming2登录服务B使用xiaoming登录服务A,并登录websocket http://127.0.0.1:8081/ws/login 使用xiaoming登录,并提交
点击连接,如果连接变灰色,则登录websocket成功
打开另一个浏览器,使用xiaoming2登录服务B,并登录websocket http://127.0.0.1:8082/ws/login 使用xiaoming2登录并提交,最后登录websocket
登录服务A模拟发送页面登录http://127.0.0.1:8081/ws/send,发送消息
此时两个页面收到信息:
xiaoming帐号只收到xiaoming-receive xiaoming2帐号只收到xiaoming2-receive
登录服务B模拟发送页面登录http://127.0.0.1:8082/ws/send,发送消息,和http://127.0.0.1:8081/ws/send 一样发送相同消息,结果是一样
结论无论用户登录的服务A,还是服务B,我们通过以上的代码,我们都可以发送消息到指定的用户,所以我们已经实现websocket集群