阅读对象:本文适合SpringBoot 初学者及对SpringBoot感兴趣的童鞋阅读。
背景介绍:在企业级 WEB 应用开发中,为了更好的用户体验&提升响应速度,往往会将一些耗时费力的请求 (Excel导入or导出,复杂计算, etc.) 进行***异步化***处理。 由此带来的一个重要的问题是***如何通知用户任务状态***,常见的方法大致分为2类4种:
HTTP Polling HTTP Long-Polling Server-Sent Events (SSE) WebSocket
是一种非常简单的实现方式。就是client通过***定时任务***不断得重复请求服务器,从而获取新消息,而server按时间顺序提供自上次请求以后发生的单个或多个消息。
短轮询的优点非常明显,就是实现简单。当两个方向上的数据都非常少,并且请求间隔不是非常密集时,这种方法就会非常有效。例如,新闻评论信息可以每半分钟更新一次,这对用户来说是可以的。
它得缺点也是非常明显,一旦我们对数据实时性要求非常高时,为了保证消息的及时送达,请求间隔必须缩短,在这种情况下,会加剧服务器资源的浪费,降低服务的可用性。另一个缺点就是在消息的数量较少时,将会有大量的 request
做无用功,进而也导致服务器资源的浪费。
长轮询的官方定义是:
The server attempts to "hold open" (notimmediately reply to) each HTTP request, responding only when there are events to deliver. In this way, there is always a pending request to which the server can reply for the purpose of delivering events as they occur, thereby minimizing the latency in message delivery.
如果与 Polling
的方式相比,会发现 Long-Polling
的优点是通过hold open HTTP request 从而减少了无用的请求。
大致步骤为:
请求超时
。 获取到新消息
或 请求超时
,进行消息处理并发起下一次请求。 Long-Polling
的缺点之一也是服务器资源的浪费,因为它和 Polling
的一样都属于***被动获取***,都需要不断的向服务器请求。在并发很高的情况下,对服务器性能是个严峻的考验。
Note:因为以上2两种方式的实现都比较简单,所以我们这里就不做代码演示了。接下来我们重点介绍一下 Server-Sent Events
及 WebSocket
。
下面我们将通过一个***下载文件***的案例进行演示 SSE
和 WebSocket
的消息推送,在这之前,我们先简单说一下我们项目的结构,整个项目基于SpringBoot 构建。
首先我们定义一个供前端访问的API DownloadController
@RestController public class DownloadController { private static final Logger log = getLogger(DownloadController.class); @Autowired private MockDownloadComponent downloadComponent; @GetMapping("/api/download/{type}") public String download(@PathVariable String type, HttpServletRequest request) { // (A) HttpSession session = request.getSession(); String sessionid = session.getId(); log.info("sessionid=[{}]", sessionid); downloadComponent.mockDownload(type, sessionid); // (B) return "success"; // (C) } } 复制代码
type
参数用于区分使用哪种推送方式,这里为 sse
, ws
, stomp
这三种类型。 MockDownloadComponent
用于异步模拟下载文件的过程。 success
,用于表明 下载开始 。 在 DownloadController
中我们调用 MockDownloadComponent
的 mockDownload()
的方法进行模拟真正的下载逻辑。
@Component public class MockDownloadComponent { private static final Logger log = LoggerFactory.getLogger(DownloadController.class); @Async // (A) public void mockDownload(String type, String sessionid) { for (int i = 0; i < 100; i++) { try { TimeUnit.MILLISECONDS.sleep(100); // (B) int percent = i + 1; String content = String.format("{/"username/":/"%s/",/"percent/":%d}", sessionid, percent); // (C) log.info("username={}'s file has been finished [{}]% ", sessionid, percent); switch (type) { // (D) case "sse": SseNotificationController.usesSsePush(sessionid, content); break; case "ws": WebSocketNotificationHandler.usesWSPush(sessionid, content); break; case "stomp": this.usesStompPush(sessionid, content); break; default: throw new UnsupportedOperationException(""); } } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
@Async
让使其 异步化
。 {"username":"abc","percent":1}
。 type
选择消息推送方式。 SSE
是W3C定义的一组API规范,这使服务器能够通过HTTP将数据推送到Web页面,它具有如下特点:
key-value
格式& UTF-8
编码的文本数据流,我们可以在消息 payload
中可以使用 JSON
或者 XML
或自定义数据格式。 Note:IE 浏览器可通过第三方JS库进行支持SSE
从Spring 4.2开始支持SSE规范,我们只需要在 Controller
中返回 SseEmitter
对象即可。
Note:Spring 5 中提供了Spring Webflux 可以更加方便的使用SSE,但是为更贴近我们的实际项目,所以文本仅演示使用Spring MVC SSE。
我们在 服务器端 定义一个 SseNotificationController
用于和客户端处理和保存 SSE
连接. 其 endpoint
为 /api/sse-notification
。
@RestController public class SseNotificationController { public static final Map<String, SseEmitter> SSE_HOLDER = new ConcurrentHashMap<>(); // (A) @GetMapping("/api/sse-notification") public SseEmitter files(HttpServletRequest request) { long millis = TimeUnit.SECONDS.toMillis(60); SseEmitter sseEmitter = new SseEmitter(millis); // (B) HttpSession session = request.getSession(); String sessionid = session.getId(); SSE_HOLDER.put(sessionid, sseEmitter); return sseEmitter; } /** * 通过sessionId获取对应的客户端进行推送消息 */ public static void usesSsePush(String sessionid, String content) { // (C) SseEmitter emitter = SseNotificationController.SSE_HOLDER.get(sessionid); if (Objects.nonNull(emitter)) { try { emitter.send(content); } catch (IOException | IllegalStateException e) { log.warn("sse send error", e); SseNotificationController.SSE_HOLDER.remove(sessionid); } } } } 复制代码
SSE_HOLDER
保存了所有客户端的 SseEmitter
,用于后续通知对应客户端。 SseEmitter
对象, 它是SpringMVC提供用于操作SSE的类。 usesSsePush()
提供根据sessionId向对应客户端发送消息。发送只需要调用 SseEmitter
的 send()
方法即可。 至此服务端已经完成,我们使用Vue编写客户端 Download.html
进行测试。核心代码如下:
usesSSENotification: function () { var tt = this; var url = "/api/sse-notification"; var sseClient = new EventSource(url); // (A) sseClient.onopen = function () {...}; // (B) sseClient.onmessage = function (msg) { // (C) var jsonStr = msg.data; console.log('message', jsonStr); var obj = JSON.parse(jsonStr); var percent = obj.percent; tt.sseMsg += 'SSE 通知您:已下载完成' + percent + "%/r/n"; if (percent === 100) { sseClient.close(); // (D) } }; sseClient.onerror = function () { console.log("EventSource failed."); }; } 复制代码
/api/sse-notification
WebSocket
类似于标准的TCP连接,它是IETF(RFC 6455)定义的通过TCP进行实时全双工通信一种通信方式,这意味这它的功能更强大,常用于如股票报价器,聊天应用。
相比于SSE,它不仅可以双向通信,而且甚至还能处理音频/视频等二进制内容。
Note:使用 WebSocket
,在高并发情况下,服务器将拥有许多长连接。这对网络代理层组件及 WebSocket
服务器都是一个不小的性能挑战,我们需要考虑其负载均衡方案。同时连接安全等问题也不容忽视。
Spring 4提供了一个新的 Spring-WebSocket
模块,用于适应各种WebSocket引擎,它与Java WebSocket API标准(JSR-356)兼容,并且提供了额外的增强功能。
Note: 对于应用程序来说,直接使用WebSocket API会大大增加开发难度,所以Spring为我们提供了STOMP over WebSocket 更高级别的API使用WebSocket。在本文中将会分别演示通过low level API及higher level API进行演示。
如果想在SpringBoot中使用WebSocket,首先需要引入 spring-boot-starter-websocket
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> 复制代码
然后就可以配置相关信息,我们先通过low level API进行演示。
首先需要自定义一个 WebSocketNotificationHandler
用于处理WebSocket 的连接及消息处理。我们只需要实现 WebSocketHandler
或子类 TextWebSocketHandler
BinaryWebSocketHandler
。
public class WebSocketNotificationHandler extends TextWebSocketHandler { private static final Logger log = getLogger(WebSocketNotificationHandler.class); public static final Map<String, WebSocketSession> WS_HOLDER= new ConcurrentHashMap<>(); // (A) @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { // (B) String httpSessionId = (String) session.getAttributes().get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME); WS_HOLDER.put(httpSessionId, session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { log.info("handleTextMessage={}", message.getPayload()); } public static void usesWSPush(String sessionid, String content) { // (C) WebSocketSession wssession = WebSocketNotificationHandler.WS_HOLDER.get(sessionid); if (Objects.nonNull(wssession)) { TextMessage textMessage = new TextMessage(content); try { wssession.sendMessage(textMessage); } catch (IOException | IllegalStateException e) { WebSocketNotificationHandler.SESSIONS.remove(sessionid); } } } } 复制代码
WS_HOLDER
用于保存客户端的 WebSocket Session
afterConnectionEstablished()
方法,当连接建立之后,按 sessionId
将 WebSocket Session
保存至 WS_HOLDER
,用于后续向client推送消息。 sessionId
获取对应 WebSocket Session
,并调用 WebSocket Session
的 sendMessage(textMessage)
方法向client发送消息。 使用 @EnableWebSocket
开启WebSocket,并实现 WebSocketConfigurer
进行配置。
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { WebSocketNotificationHandler notificationHandler = new WebSocketNotificationHandler(); registry.addHandler(notificationHandler, "/ws-notification") // (A) .addInterceptors(new HttpSessionHandshakeInterceptor()) // (B) .withSockJS(); // (C) } } 复制代码
WebSocketNotificationHandler
注册至 WebSocketHandlerRegistry
. HttpSessionHandshakeInterceptor
是一个内置的拦截器,用于传递HTTP会话属性到WebSocket会话。当然你也可以通过 HandshakeInterceptor
接口实现自己的拦截器。 server端至此就基本大功告成,接下来我们来完善一下client端 Download.html
,其核心方法如下:
usesWSNotification: function () { var tt = this; var url = "http://localhost:8080/ws-notification"; var sock = new SockJS(url); // (A) sock.onopen = function () { console.log('open'); sock.send('test'); }; sock.onmessage = function (msg) { // (B) var jsonStr = msg.data; console.log('message', jsonStr); var obj = JSON.parse(jsonStr); var percent = obj.percent; tt.wsMsg += 'WS 通知您:已下载完成' + percent + "%/r/n"; if (percent === 100) { sock.close(); } }; sock.onclose = function () { console.log('ws close'); }; } 复制代码
callback
,我们可以在该方法中处理我们的消息。 WebSocket虽然定义了两种类型的消息,文本和二进制,但是针对消息的内容没有定义,为了更方便的处理消息,我们希望Client和Server都需要就某种协议达成一致,以帮助处理消息。那么,有没有已经造好的轮子呢?答案肯定是有的。这就是STOMP。
STOMP是一种简单的面向文本的消息传递协议,它其实是消息队列的一种协议, 和AMQP,JMS是平级的。 只不过由于它的简单性恰巧可以用于定义WS的消息体格式。虽然STOMP是面向文本的协议,但消息的内容也可以是二进制数据。同时STOMP 可已使用任何可靠的双向流网络协议,如TCP和WebSocket,目前很多服务端消息队列都已经支持了STOMP, 比如RabbitMQ, ActiveMQ等。
它结构是一种基于帧的协议,一帧由一个命令,一组可选的Header和一个可选的Body组成。
COMMAND header1:value1 header2:value2 Body^@ 复制代码
客户端可以使用 SEND
或 SUBSCRIBE
命令发送或订阅消息。 通过 destination
标记述消息应由谁来接收处理,形成了类似于MQ的发布订阅机制。
STOMP的优势也非常明显,即:
最重要的是, Spring STOMP
为我们提供了能够像Spring MVC一样的编程模型,减少了我们的学习成本。
下面将我们的DEMO稍作调整,使用 Spring STOMP
来实现消息推送,在本例中我们使用 SimpleBroker
模式,我们的应用将会内置一个 STOMP Broker
,将所有信息保存至内存中。
具体代码如下:
@Configuration @EnableWebSocketMessageBroker // (A) public class WebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws-stomp-notification") .addInterceptors(httpSessionHandshakeInterceptor()) // (B) .setHandshakeHandler(httpSessionHandshakeHandler()) // (C) .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app") // (D) .enableSimpleBroker("/topic", "/queue"); // (E) } @Bean public HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor() { return new HttpSessionHandshakeInterceptor(); } @Bean public HttpSessionHandshakeHandler httpSessionHandshakeHandler() { return new HttpSessionHandshakeHandler(); } } 复制代码
@EnableWebSocketMessageBroker
注解开启支持STOMP HttpSessionHandshakeHandler
,其主要作用是按sessionId标记识别连接。 destination
已 /app
开头时,将会把该消息路由到server端的对应的消息处理方法中。***(在本例中无实际意义)*** HttpSessionHandshakeHandler
代码如下:
public class HttpSessionHandshakeHandler extends DefaultHandshakeHandler { @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { String sessionId = (String) attributes.get(HttpSessionHandshakeInterceptor.HTTP_SESSION_ID_ATTR_NAME); return new HttpSessionPrincipal(sessionId); } } 复制代码
当我们需要向client发送消息时,只需要注入 SimpMessagingTemplate
对象即可,是不是感觉非常熟悉?! 没错,这种 Template
模式和我们日常使用的 RestTemplate
JDBCTemplate
是一样的。 我们只需要调用 SimpMessagingTemplate
的 convertAndSendToUser()
方法即可向对应用户发送消息了。
private void usesStompPush(String sessionid, String content) { String destination = "/queue/download-notification"; messagingTemplate.convertAndSendToUser(sessionid, destination, content); } 复制代码
在浏览器端,client可以使用stomp.js和sockjs-client进行如下连接:
usesStompNotification: function () { var tt = this; var url = "http://localhost:8080/ws-stomp-notification"; // 公共topic // var notificationTopic = "/topic/download-notification"; // 点对点广播 var notificationTopic = "/user/queue/download-notification"; // (A) var socket = new SockJS(url); var stompClient = Stomp.over(socket); stompClient.connect({}, function (frame) { console.log("STOMP connection successful"); stompClient.subscribe(notificationTopic, function (msg) { // (B) var jsonStr = msg.body; var obj = JSON.parse(jsonStr); var percent = obj.percent; tt.stompMsg += 'STOMP 通知您:已下载完成' + percent + "%/r/n"; if (percent === 100) { stompClient.disconnect() } }); }, function (error) { console.log("STOMP protocol error " + error) }) } 复制代码
/user/
为前缀,Spring STOMP会把以 /user/
为前缀的消息交给 UserDestinationMessageHandler
进行处理并发给特定的用户,当然这个 /user/
是可以通过 WebSocketBrokerConfig
进行个性化配置的,为了简单起见,我们这里就使用默认配置,所以我们的topic url就是 /user/queue/download-notification
。 stompClient
消息处理callback进行消息处理。 在文中为大家简单讲解了几种常用的消息推送方案,并通过一个下载案例重点演示了 SSE
及 WebSocket
这两种server push模式的消息推送。当然还有很多细节并没有在文中说明,建议大家下载 源码 对照参考。
相比较这几种模式,小编认为如果我们的需求仅仅是 向客户端推送消息 ,那么使用 SSE
的性价比更高一些, Long-Polling
次之。使用 WebSocket
有一种杀鸡用牛刀的感觉,并且给我们系统也带来了更多的复杂性,得不偿失,所以不太推荐。而 Polling
虽然实现方式最简单且兼容性最强,但是其效率过低,所以不建议使用。当然如果您有其他见解,欢迎留言讨论交流。
文中示例源码: github.com/leven-space…
如果您觉得这篇文章有用,请留下您的小:heartpulse::heartpulse:,我是一枚Java小学生,欢迎大家吐槽留言。