文章首发于公众号【大数据学徒】,感兴趣请搜索 dashujuxuetu 或者文末扫码关注。
因为 Apache Zeppelin 中使用了 Jetty,所以我需要补充一下 Jetty 的相关背景,以便阅读 Zeppelin 的相关代码,其中 WebSocket 是很重要的内容,因此通过阅读学习 Jetty 的 WebSocket API 文档 总结出了本文的内容。
内容提要:
WebSocket 是一种应用层传输协议,可以在单个 TCP 连接上进行全双工通信,允许服务端主动向客户端推送数据,协议开销小,协议头是 ws
(明文) 或 wss
(加密),端口和 HTTP 一样使用 80 或 443,通过 HTTP/1.1 的 Upgrade
头部来建立连接。更多 WebSocket 的介绍请参考 WebSocket 维基百科 。
一个 WebSocket 连接有四种基本的状态:
Jetty 提供了比 Java 原生 WebSocket 更强大的 API,包含了一些服务端和客户端通用的 API,是基于 WebSocket 消息的事件驱动型 API。每个 WebSocket 连接会接收到四种事件:
org.eclipse.jetty.websocket.api.Session
对象,这是一个重要的对象,一般通过它来和对端通信。 WebSocket 的最基本用法是 对一个 POJO 类使用 WebSocket 注解 ,比如:
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; // WebSocket 注解 @WebSocket(maxTextMessageSize = 64 * 1024) public class AnnotatedEchoSocket { // onWebSocketMessage注解,用于标记收到消息时的处理函数 @OnWebSocketMessage public void onText(Session session, String message) { if (session.isOpen()) { System.out.printf("Echoing back message [%s]%n", message); // 将收到的消息原样返回 session.getRemote().sendString(message, null); } } } 复制代码
可用的注解有以下几种:
注解 | 作用 |
---|---|
@WebSocket | 必需的类级别注解,标记 POJO 类为 WebSocket |
@OnWebSocketConnect | 可选的方法级别注解,标记处理 Connect 事件的函数 |
@OnWebSocketClose | 可选的方法级别注解,标记处理 Close 事件的函数 |
@OnWebSocketMessage | 可选的方法级别注解,标记处理 Message 事件的函数 |
@OnWebSocketError | 可选的方法级别注解,标记处理 Error 事件的函数 |
@OnWebSocketFrame | 可选的方法级别注解,标记处理 Frame 事件的函数 |
Jetty 也提供了 WebSocketListener
接口,有上面的 5 个方法,以及 WebSocketAdapter
类,实现了这个接口,不过都是空实现,但提供了以下非常实用的方法来检查 Session 的状态。
服务端有两个重要的抽象类和接口, WebSocketServlet
和 WebSocketCreator
,使用 WebSocketServlet
的例子:
@WebServlet(name = "MyEcho WebSocket Servlet", urlPatterns = {"/echo"}) public class MyEchoServlet extends WebSocketServlet { @Override public void configure(WebSocketServletFactory factory) { // set a 10 second timeout factory.getPolicy().setIdleTimeout(10000); // register MyEchoSocket as the WebSocket to create on Upgrade factory.register(MyEchoSocket.class); } } 复制代码
类上有一个注解,表明这个 Servlet
会在 /echo
这个 URI 上收到 HTTP Upgrade 请求时创建,这个类有一个 configure()
方法,参数是 WebSocketServletFactory
类型,在这个方法内需要做的事,除了设置一些参数以外,需要 使用 register()
方法注册用来处理请求的 Servlet
,在这个例子中注册了 MyEchoServlet
自身,也可以不这么做,另外一种做法是, 调用 WebSocketServletFactory
的 setCreator()
方法,不直接注册 Servlet
,而是让设置的 WebSocketCreator
来创建合适的 Servlet
,看一个 WebSocketCreator
的例子:
public class MyAdvancedEchoCreator implements WebSocketCreator { private MyBinaryEchoSocket binaryEcho; private MyEchoSocket textEcho; public MyAdvancedEchoCreator() { // 创建好可以复用的 Socket this.binaryEcho = new MyBinaryEchoSocket(); this.textEcho = new MyEchoSocket(); } @Override public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) { for (String subprotocol : req.getSubProtocols()) { if ("binary".equals(subprotocol)) { resp.setAcceptedSubProtocol(subprotocol); return binaryEcho; } if ("text".equals(subprotocol)) { resp.setAcceptedSubProtocol(subprotocol); return textEcho; } } // 其它情况就返回 null return null; } } 复制代码
WebSocketCreator
接口需要实现 createWebSocket(ServletUpgradeRequest, ServletUpgradeResponse)
方法,这里 MyAdvancedEchoCreator
实现类,根据子协议是文本类型还是二进制类型生成了不同的 WebSocket
来处理请求。
看起来这里使用了抽象工厂模式,但我对设计模式所知有限,就不展开讲了。
客户端需要的依赖是:
<dependency> <groupId>org.eclipse.jetty.websocket</groupId> <artifactId>websocket-client</artifactId> <version>${project.version}</version> </dependency> 复制代码
直接看一个客户端例子,很清晰:
public class SimpleEchoClient { public static void main(String[] args) { // 指定服务端 URI String destUri = "ws://echo.websocket.org"; if (args.length > 0) { destUri = args[0]; } // 创建 WebSocketClient 对象 WebSocketClient client = new WebSocketClient(); SimpleEchoSocket socket = new SimpleEchoSocket(); try { client.start(); URI echoUri = new URI(destUri); ClientUpgradeRequest request = new ClientUpgradeRequest(); // 发送 HTTP Upgrade 请求,建立连接 client.connect(socket, echoUri, request); System.out.printf("Connecting to : %s%n", echoUri); // 等待关闭 socket.awaitClose(5, TimeUnit.SECONDS); } catch (Throwable t) {...} finally {...} } } 复制代码
欢迎交流讨论,吐槽建议,分享收藏。