这种方式实现简单,前端通过 setInterval
定时去请求接口来获取最新的数据,当实时性要求不高,更新频率低的情况下可以使用这种方式。但是当实时性很高的时候,我们的 请求会很频繁,服务器的消耗非常大
,而且每次请求的时候服务端的数据可能还没有改变,导致很多请求都是没有意义的。
setInterval(function () { // 请求接口操作 // 。。。 }, 3000 ); 复制代码
WebSocket是基于 TCP协议 的,它是全双工通信的,服务端可以向客户端发送信息,客户端同样可以向服务器发送指令,常用于聊天应用中。
SpringBoot提供了websocket的starter
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> 复制代码
注入 ServerEndpointExporter
,这个bean会自动注册使用了 @ServerEndpoint
注解声明的Websocket endpoint
@Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 复制代码
创建一个服务类:
@ServerEndpoint
注解,设置WebSocket连接点的服务地址。 AtomicInteger
用于记录连接数 ConcurrentHashMap
用于存放连接信息 @OnOpen
注解表明该方法在建立连接后调用 @OnClose
注解表明该方法在断开连接后调用 @OnError
注解表明该方法在连接异常调用 @OnMessage
注解表明该方法在收到客户端消息后调用 @ServerEndpoint("/websocket/{userId}") @Component public class WebSocketServer { private final static Logger logger = LoggerFactory.getLogger(WebSocketServer.class); /** * 当前连接数 */ private static AtomicInteger count = new AtomicInteger(0); /** * 使用map对象,便于根据userId来获取对应的WebSocket,或者放redis里面 */ private static Map<String, WebSocketServer> websocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; /** * 对应的用户ID */ private String userId = ""; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { try { this.session = session; this.userId = userId; websocketMap.put(userId, this); // 数量+1 count.getAndIncrement(); logger.info("websocket 新连接:{}", userId); } catch (Exception e) { logger.error("websocket 新建连接 IO异常"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { // 删除 websocketMap.remove(this.userId); // 数量-1 count.getAndDecrement(); logger.info("close websocket : {}", this.userId); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message) { logger.info("来自客户端{}的消息:{}", this.userId, message); } @OnError public void onError(Throwable error) { logger.info("websocket 发生错误,移除当前websocket:{},err:{}", this.userId, error.getMessage()); websocketMap.remove(this.userId); // 数量-1 count.getAndDecrement(); } /** * 发送消息 (异步发送) * * @param message 消息主题 */ private void sendMessage(String message) { this.session.getAsyncRemote().sendText(message); } /** * 向指定用户发送信息 * * @param userId 用户id * @param wsInfo 信息 */ public static void sendInfo(String userId, String wsInfo) { if (websocketMap.containsKey(userId)) { websocketMap.get(userId).sendMessage(wsInfo); } } /** * 群发消息 */ public static void batchSendInfo(String wsInfo, List<String> ids) { ids.forEach(userId -> sendInfo(userId, wsInfo)); } /** * 群发所有人 */ public static void batchSendInfo(String wsInfo) { websocketMap.forEach((k, v) -> v.sendMessage(wsInfo)); } /** * 获取当前连接信息 */ public static List<String> getIds() { return new ArrayList<>(websocketMap.keySet()); } /** * 获取当前连接数量 */ public static int getUserCount() { return count.intValue(); } } 复制代码
@RestController @RequestMapping("/ws") public class WebSocketController { @GetMapping("/push/{message}") public ResponseEntity<String> push(@PathVariable(name = "message") String message) { WebSocketServer.batchSendInfo(message); return ResponseEntity.ok("WebSocket 推送消息给所有人"); } } 复制代码
在 resources/static
下创建 ws.html
,将WebSocket的地址设为服务类中 @ServerEndpoint
注解所配置的地址
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>WebSocket</title> </head> <body> <div id="message"></div> </body> <script> let websocket = null; // 用时间戳模拟登录用户 const username = new Date().getTime(); // alert(username) //判断当前浏览器是否支持WebSocket if ('WebSocket' in window) { console.log("浏览器支持Websocket"); websocket = new WebSocket('ws://localhost:8080/websocket/' + username); } else { alert('当前浏览器 不支持 websocket'); } //连接发生错误的回调方法 websocket.onerror = function () { setMessageInnerHTML("WebSocket连接发生错误"); }; //连接成功建立的回调方法 websocket.onopen = function () { setMessageInnerHTML("WebSocket连接成功"); }; //接收到消息的回调方法 websocket.onmessage = function (event) { setMessageInnerHTML(event.data); }; //连接关闭的回调方法 websocket.onclose = function () { setMessageInnerHTML("WebSocket连接关闭"); }; //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function () { closeWebSocket(); }; //关闭WebSocket连接 function closeWebSocket() { websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } </script> </html> 复制代码
启动项目,访问 http://localhost:8080/ws.html
,开启连接。调用消息推送接口 http://localhost:8080/ws/push/hello
,查看网页显示信息。
SseEmitter是SpringMVC(4.2+)提供的一种技术,它是基于 Http协议 的,相比WebSocket,它更轻量,但是它只能从服务端向客户端 单向 发送信息。在SpringBoot中我们无需引用其他jar就可以使用。
AtomicInteger
用于记录连接数 ConcurrentHashMap
用于存放连接信息 SseEmitter
给前端。超时间设为0表示永不过期 completionCallBack
timeoutCallBack
errorCallBack
SseEmitter.send()
public class SseEmitterServer { private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class); /** * 当前连接数 */ private static AtomicInteger count = new AtomicInteger(0); /** * 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面 */ private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); /** * 创建用户连接并返回 SseEmitter * * @param userId 用户ID * @return SseEmitter */ public static SseEmitter connect(String userId) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); // 注册回调 sseEmitter.onCompletion(completionCallBack(userId)); sseEmitter.onError(errorCallBack(userId)); sseEmitter.onTimeout(timeoutCallBack(userId)); sseEmitterMap.put(userId, sseEmitter); // 数量+1 count.getAndIncrement(); logger.info("创建新的sse连接,当前用户:{}", userId); return sseEmitter; } /** * 给指定用户发送信息 */ public static void sendMessage(String userId, String message) { if (sseEmitterMap.containsKey(userId)) { try { // sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON); sseEmitterMap.get(userId).send(message); } catch (IOException e) { logger.error("用户[{}]推送异常:{}", userId, e.getMessage()); removeUser(userId); } } } /** * 群发消息 */ public static void batchSendMessage(String wsInfo, List<String> ids) { ids.forEach(userId -> sendMessage(wsInfo, userId)); } /** * 群发所有人 */ public static void batchSendMessage(String wsInfo) { sseEmitterMap.forEach((k, v) -> { try { v.send(wsInfo, MediaType.APPLICATION_JSON); } catch (IOException e) { logger.error("用户[{}]推送异常:{}", k, e.getMessage()); removeUser(k); } }); } /** * 移除用户连接 */ public static void removeUser(String userId) { sseEmitterMap.remove(userId); // 数量-1 count.getAndDecrement(); logger.info("移除用户:{}", userId); } /** * 获取当前连接信息 */ public static List<String> getIds() { return new ArrayList<>(sseEmitterMap.keySet()); } /** * 获取当前连接数量 */ public static int getUserCount() { return count.intValue(); } private static Runnable completionCallBack(String userId) { return () -> { logger.info("结束连接:{}", userId); removeUser(userId); }; } private static Runnable timeoutCallBack(String userId) { return () -> { logger.info("连接超时:{}", userId); removeUser(userId); }; } private static Consumer<Throwable> errorCallBack(String userId) { return throwable -> { logger.info("连接异常:{}", userId); removeUser(userId); }; } } 复制代码
@RestController @RequestMapping("/sse") public class SseEmitterController { /** * 用于创建连接 */ @GetMapping("/connect/{userId}") public SseEmitter connect(@PathVariable String userId) { return SseEmitterServer.connect(userId); } @GetMapping("/push/{message}") public ResponseEntity<String> push(@PathVariable(name = "message") String message) { SseEmitterServer.batchSendMessage(message); return ResponseEntity.ok("WebSocket 推送消息给所有人"); } } 复制代码
在 resources/static
下创建 ws.html
,将EventSource的地址设为创建连接的地址
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>SseEmitter</title> </head> <body> <button onclick="closeSse()">关闭连接</button> <div id="message"></div> </body> <script> let source = null; // 用时间戳模拟登录用户 const userId = new Date().getTime(); if (!!window.EventSource) { // 建立连接 source = new EventSource('http://localhost:8080/sse/connect/' + userId); /** * 连接一旦建立,就会触发open事件 * 另一种写法:source.onopen = function (event) {} */ source.addEventListener('open', function (e) { setMessageInnerHTML("建立连接。。。"); }, false); /** * 客户端收到服务器发来的数据 * 另一种写法:source.onmessage = function (event) {} */ source.addEventListener('message', function (e) { setMessageInnerHTML(e.data); }); /** * 如果发生通信错误(比如连接中断),就会触发error事件 * 或者: * 另一种写法:source.onerror = function (event) {} */ source.addEventListener('error', function (e) { if (e.readyState === EventSource.CLOSED) { setMessageInnerHTML("连接关闭"); } else { console.log(e); } }, false); } else { setMessageInnerHTML("你的浏览器不支持SSE"); } // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 window.onbeforeunload = function () { closeSse(); }; // 关闭Sse连接 function closeSse() { source.close(); const httpRequest = new XMLHttpRequest(); httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true); httpRequest.send(); console.log("close"); } // 将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } </script> </html> 复制代码
启动项目,访问网页 http://localhost:8080/sse.html
建立连接。调用发送信息接口 http://localhost:8080/sse/push/hello
,查看网页显示信息。
所有代码均上传至Github上,方便大家访问
>>>>>> 消息推送之 WebSocket 和 SseEmitter <<<<<<
创作不易,如果各位觉得有帮助, 求点赞 支持