服务器推送技术(又名Comet)是建立在ARP基础之上的一种非常实用的技术,它广泛应用于Web端,手机APP应用端等。具体很多场景都需要此技术的支撑,包括扫码登录、扫码支付、网页支付、端到端消息轮播推送、数据实时推送、金融数据实时刷新、图表数据实时变化、即时通讯等等都和此技术息息相关。
我们今天来介绍几种常用的服务器推送技术方案及实战。
Ajax短轮询 - 拉模式
基于Ajax(异步JS和XML)的定时轮询。特点:实现简单、短连接、数据同步不及时、对服务器资源会造成一定压力。此模式广泛应用于:扫描登录、扫码支付、天气更新等(腾讯、京东、阿里一直都在沿用此技术并日渐成熟和稳定)。
基于HTTP长连接和AJAX长轮询(long-polling)方式。特点:无需浏览器或APP端任何单独插件支持、长连接,减少网络(三次)握手和四次挥手、对服务器资源要求较高等。此模式常用于实时消息轮播、金融数据即时刷新、数据图表实时刷新等。JAVA服务器端一般采用Servlet3支持的异步任务、延时结果(DeferedResult)等手段实现。
SSE(Server Send Event) - 推模式
服务器主动推模式。特点:异步非阻塞、多次实时消息的接收,浏览器或APP端可仅发起一次请求、幂等性、长连接等。此模式常用于网页支付、扫码支付等场景。JAVA服务器端采用Spring提供的SseEmitter等实现。
服务器主动推模式。特点:浏览器或APP端需支持或兼容EventSource服务器派发机制、长连接、一次请求,服务器持续多次实时推送。此模式常用于对数据实时性要求较高的场景。
基于上面的了解和学习后,我们就来前后端配合实战一下以上系列的技术方案。
代码实战
/** * @author andychen https://blog.51cto.com/14815984 * @description:EventSource服务器推送控制器类 */ @Controller @RequestMapping(value = "/es") public class EventSourceController { private static final Logger log = LoggerFactory.getLogger(SSEController.class); @GetMapping("/index") public String index(){ return "eventsource"; } /** * 主动持久新闻推送的正确姿势 * @param response 服务器推送响应 */ @RequestMapping("/push") public void pushNews(HttpServletResponse response){ //设置推送内容类型为事件流 response.setContentType("text/event-stream"); //消息流编码格式 response.setCharacterEncoding("utf-8"); PrintWriter writer = null; try { //获取流写入器 writer = response.getWriter(); //连续20次推送消息到客户端 for (int i=0;i< Constant.WRITE_TIMES;i++){ if(writer.checkError()){ log.error("错误发生,将结束推送..."); //数据固定格式:/n/n,双换行符结尾 writer.write("data:end/n/n"); return; } Thread.sleep(2000); writer.write(this.buildSendData()); writer.flush(); } log.info("新闻推送达到阈值,将结束推送..."); writer.write("data:end/n/n"); writer.flush(); } catch (Exception e) { e.printStackTrace(); }finally { if(null != writer){ writer.close(); } } } /** * 构建发送数据 * @return */ private String buildSendData(){ String news = Utility.createNews(); StringBuilder builder = new StringBuilder(""); builder.append("retry:2000/n"); builder.append("data:"); builder.append(news); builder.append("/n/n"); return builder.toString(); } }
<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>EventSource消息持续推技术</title> <link rel="stylesheet" href="/assets/css/index.css" type="text/css"/> <!--兼容低版本或不支持SSE的浏览器--> <script type="text/javascript" src="/assets/js/eventsource.min.js"></script> </head> <body> <div> <span class="title">EventSource消息持续推技术 - 演示</span> </div> <div> <div class="content"> <p id="p_es"></p> </div> </div> <script type="text/javascript" src="/assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript" src="/assets/js/index.js"></script> <script type="text/javascript"> $(function () { ServerPush.eventSourcePush(); }); </script> </body> </html>
var ServerPush = { /** * Ajax短轮询,定时拉取 */ shortLoopPull: function () { function pullNews(){ $.get("/shortloop/pull", function (data) { console.log(data); $("#p_sloop").html(data); }); } window.setInterval(pullNews, 1500); }, /** * Ajax长轮询,循环拉取 */ longLoopPull: function () { pull(); function pull(){ $.get("/longloop/pull", function (data) { console.log(data); $("#p_lloop").html(data); //再发起请求 pull(); }); } }, /** * SSE异步主动推送 */ ssePush: function () { $("#p_sse").html("正在拉取新闻..."); $.ajax({ type:'get', url:'/sse/push?clientId='+new Date().getTime(), dataType:'text', success:function (data) { console.log(data); //多次推送的消息,通过"data:"固定格式(SseEmitter封装)分开; let items = data.split('data:'); let result = ""; let len = items.length; $.each(items, function (i, item) { result = result + item+"<br>"; if(i === len-1){ $("#p_sse").html(result); } }); }, error: function (error) { $("#p_sse").html(error); } }); }, /** * EventSource服务器持续流推送 */ eventSourcePush: function () { $('#p_es').html(""); //判断是否支持SSE if(window.EventSource){ let msg = ""; let source = new EventSource("/es/push"); /** * 通信建立事件处理 * @param e */ source.onopen = function (e) { msg = "通信连接已建立.."; console.log(msg); $('#p_es').html(msg); }; /** * 消息接收事件处理 * @param e */ source.onmessage = function (e) { let result = e.data; if(result === "end"){ msg = "服务器推送即将完成,停止中..."; console.log(msg); $('#p_es').html(msg); return; } let html = $('#p_es').html(); $('#p_es').html(html+"<br>"+result); }; /** * 异常发生事件处理 * @param error */ source.onerror = function (error) { msg = "通信发生错误:"+error; console.log(msg); $('#p_es').html(msg); } }else{ $("#p_es").html("当前浏览器不支持SSE!"); } } };
/** * @author andychen https://blog.51cto.com/14815984 * @description:SSE服务器推送控制器类 */ @Controller @RequestMapping("/sse") public class SSEController { private static final Logger log = LoggerFactory.getLogger(SSEController.class); /** * 推送请求集合 */ private static final Map<String, SseEmitterExt> requests = new ConcurrentHashMap<>(); /** * 任务执行池 */ private ExecutorService taskPool = Executors.newFixedThreadPool(2); @GetMapping("/index") public String index(){ return "sse"; } /** * SSE主动异步新闻推送 */ @GetMapping(value = "/push") @ResponseBody public SseEmitter pushNews(String clientId){ SseEmitterExt emitter = new SseEmitterExt(); requests.put(clientId, emitter); //注册移除事件 emitter.onCompletion(new CompleteCallback(clientId)); //放到任务池执行 this.taskPool.submit(new PushTask(clientId)); return emitter; } /** * 推送新闻任务 */ private static class PushTask implements Runnable{ private final String clientId; public PushTask(String clientId) { this.clientId = clientId; } /** * 执行任务核心逻辑 */ @Override public void run() { String msg = null; try { SseEmitterExt emitter = requests.get(this.clientId); msg = "正在从新闻池为您推送,请稍后..."; emitter.send(msg); log.info(msg); Thread.sleep(5000); msg = "头条:"+ Utility.createNews(); log.info(msg); emitter.send(msg); //完成推送 emitter.complete(); } catch (Exception e) { e.printStackTrace(); } } } /** * 任务完成回调 */ private static class CompleteCallback implements Runnable{ private final String clientId; public CompleteCallback(String clientId) { this.clientId = clientId; } @Override public void run() { requests.remove(this.clientId); log.info("已移除请求Id: "+this.clientId+" ..."); } } }
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"/> <title>SSE异步推技术</title> <link rel="stylesheet" href="/assets/css/index.css" type="text/css"/> </head> <body> <div> <span class="title">SSE异步推技术 - 演示</span> </div> <div> <div class="content"> <p id="p_sse"></p> </div> </div> <script type="text/javascript" src="/assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript" src="/assets/js/index.js"></script> <script type="text/javascript"> $(function () { ServerPush.ssePush(); }); </script> </body> </html>
/** * @author andychen https://blog.51cto.com/14815984 * @description:SSE扩展 */ public class SseEmitterExt extends SseEmitter { /** * 扩展支持中文推送 * @param outputMessage */ @Override protected void extendResponse(ServerHttpResponse outputMessage) { outputMessage.getHeaders().setContentType(new MediaType("text", "event-stream", Charset.forName("utf-8"))); super.extendResponse(outputMessage); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Ajax长轮询控制类 */ @Controller @RequestMapping("/longloop") public class LongLoopController { /** * 任务线程池: 一个请求一个后台线程处理 */ private ExecutorService taskPool = Executors.newFixedThreadPool(1); @GetMapping("/index") public String index(){ return "longloop"; } /** * 长轮询拉取新闻 */ @GetMapping(value = "/pull", produces = "text/html;charset=UTF-8") @ResponseBody public DeferredResult<String> pullNews(){ final DeferredResult<String> dr = new DeferredResult<>(); this.taskPool.submit(new Runnable() { @Override public void run() { //模拟等待5秒再返回结果 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } dr.setResult(Utility.createNews()); } }); return dr; } }
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"/> <title>Ajax长轮询拉技术</title> <link rel="stylesheet" href="/assets/css/index.css" type="text/css"/> </head> <body> <div> <span class="title">Ajax长轮询拉技术 - 演示</span> </div> <div> <div class="content"> <p id="p_lloop"></p> </div> </div> <script type="text/javascript" src="/assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript" src="/assets/js/index.js"></script> <script type="text/javascript"> $(function () { ServerPush.longLoopPull(); }); </script> </body> </html>
/** * @author andychen https://blog.51cto.com/14815984 * @description:Ajax短轮询控制类 */ @Controller @RequestMapping(value = "/shortloop", produces = "text/html;charset=UTF-8") public class ShortLoopController { @GetMapping("/index") public String index(){ return "shortloop"; } /** * 短轮询定时拉取新闻 * @return */ @GetMapping("/pull") @ResponseBody public String pullNews(){ return Utility.createNews(); } }
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"/> <title>Ajax短轮询拉技术</title> <link rel="stylesheet" href="/assets/css/index.css" type="text/css"/> </head> <body> <div> <span class="title">Ajax短轮询拉技术 - 演示</span> </div> <div> <div class="content"> <p id="p_sloop"></p> </div> </div> <script type="text/javascript" src="/assets/js/jquery-1.9.1.min.js"></script> <script type="text/javascript" src="/assets/js/index.js"></script> <script type="text/javascript"> $(function () { ServerPush.shortLoopPull(); }); </script> </body> </html>
实现验证
以上仅为常用的服务器推送技术设计实现,针对手机App端之前常用第三方推送服务极光来实现。还有实时性要求更高的即时通讯协议WebSocket可实现类似IM(Web聊天系统、在线机器人客服系统等)的功能。下次我们就来分享一下这块的研究和一个设计实现。请持续关注!