最近的一个项目:需要使用 Android App 作为 Socket 的服务端,并且一个端口能够同时监听 TCP/Web Socket 协议。
自然而然,项目决定采用 Netty 框架。Netty 服务端在收到客户端发来的消息后,能够做出相应的业务处理。在某些场景下,服务端也需要给客户端 App/网页发送消息。
首先,定义好 NettyServer,它使用 object
声明表示是一个单例。用于 Netty 服务端的启动、关闭以及发送消息。
object NettyServer { private val TAG = "NettyServer" private var channel: Channel?=null private lateinit var listener: NettyServerListener<String> private lateinit var bossGroup: EventLoopGroup private lateinit var workerGroup: EventLoopGroup var port = 8888 set(value) { field = value } var webSocketPath = "/ws" set(value) { field = value } var isServerStart: Boolean = false private set fun start() { object : Thread() { override fun run() { super.run() bossGroup = NioEventLoopGroup(1) workerGroup = NioEventLoopGroup() try { val b = ServerBootstrap() b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel::class.java) .localAddress(InetSocketAddress(port)) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(NettyServerInitializer(listener,webSocketPath)) // Bind and start to accept incoming connections. val f = b.bind().sync() Log.i(TAG, NettyServer::class.java.name + " started and listen on " + f.channel().localAddress()) isServerStart = true listener.onStartServer() f.channel().closeFuture().sync() } catch (e: Exception) { Log.e(TAG, e.localizedMessage) e.printStackTrace() } finally { isServerStart = false listener.onStopServer() disconnect() } } }.start() } fun disconnect() { workerGroup.shutdownGracefully() bossGroup.shutdownGracefully() } fun setListener(listener: NettyServerListener<String>) { this.listener = listener } // 异步发送TCP消息 fun sendMsgToClient(data: String, listener: ChannelFutureListener) = channel?.run { val flag = this.isActive if (flag) { this.writeAndFlush(data + System.getProperty("line.separator")).addListener(listener) } flag } ?: false // 同步发送TCP消息 fun sendMsgToClient(data: String) = channel?.run { if (this.isActive) { return this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly().isSuccess } false } ?: false // 异步发送WebSocket消息 fun sendMsgToWS(data: String,listener: ChannelFutureListener) = channel?.run { val flag = this.isActive if (flag) { this.writeAndFlush(TextWebSocketFrame(data)).addListener(listener) } flag } ?: false // 同步发送TCP消息 fun sendMsgToWS(data: String) = channel?.run { if (this.isActive) { return this.writeAndFlush(TextWebSocketFrame(data)).awaitUninterruptibly().isSuccess } false } ?: false /** * 切换通道 * 设置服务端,与哪个客户端通信 * @param channel */ fun selectorChannel(channel: Channel?) { this.channel = channel } } 复制代码
NettyServerInitializer 是服务端跟客户端连接之后使用的 childHandler:
class NettyServerInitializer(private val mListener: NettyServerListener<String>,private val webSocketPath:String) : ChannelInitializer<SocketChannel>() { @Throws(Exception::class) public override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() pipeline.addLast("active",ChannelActiveHandler(mListener)) pipeline.addLast("socketChoose", SocketChooseHandler(webSocketPath)) pipeline.addLast("string_encoder",StringEncoder(CharsetUtil.UTF_8)) pipeline.addLast("linebased",LineBasedFrameDecoder(1024)) pipeline.addLast("string_decoder",StringDecoder(CharsetUtil.UTF_8)) pipeline.addLast("commonhandler", CustomerServerHandler(mListener)) } } 复制代码
NettyServerInitializer 包含了多个 Handler:连接使用的ChannelActiveHandler,协议选择使用的 SocketChooseHandler,TCP 消息使用的 StringEncoder、LineBasedFrameDecoder、StringDecoder,以及最终处理消息的 CustomerServerHandler。
ChannelActiveHandler:
@ChannelHandler.Sharable class ChannelActiveHandler(var mListener: NettyServerListener<String>) : ChannelInboundHandlerAdapter() { @Throws(Exception::class) override fun channelActive(ctx: ChannelHandlerContext) { val insocket = ctx.channel().remoteAddress() as InetSocketAddress val clientIP = insocket.address.hostAddress val clientPort = insocket.port Log.i("ChannelActiveHandler","新的连接:$clientIP : $clientPort") mListener.onChannelConnect(ctx.channel()) } } 复制代码
SocketChooseHandler 通过读取消息来区分是 WebSocket 还是 Socket。如果是 WebSocket 的话,去掉 Socket 使用的相关 Handler。
class SocketChooseHandler(val webSocketPath:String) : ByteToMessageDecoder() { @Throws(Exception::class) override fun decode(ctx: ChannelHandlerContext, `in`: ByteBuf, out: List<Any>) { val protocol = getBufStart(`in`) if (protocol.startsWith(WEBSOCKET_PREFIX)) { PipelineAdd.websocketAdd(ctx,webSocketPath) ctx.pipeline().remove("string_encoder") ctx.pipeline().remove("linebased") ctx.pipeline().remove("string_decoder") } `in`.resetReaderIndex() ctx.pipeline().remove(this.javaClass) } private fun getBufStart(`in`: ByteBuf): String { var length = `in`.readableBytes() if (length > MAX_LENGTH) { length = MAX_LENGTH } // 标记读位置 `in`.markReaderIndex() val content = ByteArray(length) `in`.readBytes(content) return String(content) } companion object { /** 默认暗号长度为23 */ private val MAX_LENGTH = 23 /** WebSocket握手的协议前缀 */ private val WEBSOCKET_PREFIX = "GET /" } } 复制代码
StringEncoder、LineBasedFrameDecoder、StringDecoder 都是 Netty 内置的编、解码器。其中,LineBasedFrameDecoder 用于解决 TCP粘包/拆包的问题。
CustomerServerHandler:
@ChannelHandler.Sharable class CustomerServerHandler(private val mListener: NettyServerListener<String>) : SimpleChannelInboundHandler<Any>() { @Throws(Exception::class) override fun channelReadComplete(ctx: ChannelHandlerContext) { } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { cause.printStackTrace() ctx.close() } @Throws(Exception::class) override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) { val buff = msg as ByteBuf val info = buff.toString(CharsetUtil.UTF_8) Log.d(TAG,"收到消息内容:$info") } @Throws(Exception::class) override fun channelRead(ctx: ChannelHandlerContext, msg: Any) { if (msg is WebSocketFrame) { // 处理 WebSocket 消息 val webSocketInfo = (msg as TextWebSocketFrame).text().trim { it <= ' ' } Log.d(TAG, "收到WebSocketSocket消息:$webSocketInfo") mListener.onMessageResponseServer(webSocketInfo , ctx.channel().id().asShortText()) } else if (msg is String){ // 处理 Socket 消息 Log.d(TAG, "收到socket消息:$msg") mListener.onMessageResponseServer(msg, ctx.channel().id().asShortText()) } } // 断开连接 @Throws(Exception::class) override fun channelInactive(ctx: ChannelHandlerContext) { super.channelInactive(ctx) Log.d(TAG, "channelInactive") val reAddr = ctx.channel().remoteAddress() as InetSocketAddress val clientIP = reAddr.address.hostAddress val clientPort = reAddr.port Log.d(TAG,"连接断开:$clientIP : $clientPort") mListener.onChannelDisConnect(ctx.channel()) } companion object { private val TAG = "CustomerServerHandler" } } 复制代码
客户端也需要一个启动、关闭、发送消息的 NettyTcpClient,并且 NettyTcpClient 的创建采用 Builder 模式。
class NettyTcpClient private constructor(val host: String, val tcp_port: Int, val index: Int) { private lateinit var group: EventLoopGroup private lateinit var listener: NettyClientListener<String> private var channel: Channel? = null /** * 获取TCP连接状态 * * @return 获取TCP连接状态 */ var connectStatus = false /** * 最大重连次数 */ var maxConnectTimes = Integer.MAX_VALUE private set private var reconnectNum = maxConnectTimes private var isNeedReconnect = true var isConnecting = false private set var reconnectIntervalTime: Long = 5000 private set /** * 心跳间隔时间 */ var heartBeatInterval: Long = 5 private set//单位秒 /** * 是否发送心跳 */ var isSendheartBeat = false private set /** * 心跳数据,可以是String类型,也可以是byte[]. */ private var heartBeatData: Any? = null fun connect() { if (isConnecting) { return } val clientThread = object : Thread("Netty-Client") { override fun run() { super.run() isNeedReconnect = true reconnectNum = maxConnectTimes connectServer() } } clientThread.start() } private fun connectServer() { synchronized(this@NettyTcpClient) { var channelFuture: ChannelFuture?=null if (!connectStatus) { isConnecting = true group = NioEventLoopGroup() val bootstrap = Bootstrap().group(group) .option(ChannelOption.TCP_NODELAY, true)//屏蔽Nagle算法试图 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .channel(NioSocketChannel::class.java as Class<out Channel>?) .handler(object : ChannelInitializer<SocketChannel>() { @Throws(Exception::class) public override fun initChannel(ch: SocketChannel) { if (isSendheartBeat) { ch.pipeline().addLast("ping", IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS)) //5s未发送数据,回调userEventTriggered } ch.pipeline().addLast(StringEncoder(CharsetUtil.UTF_8)) ch.pipeline().addLast(StringDecoder(CharsetUtil.UTF_8)) ch.pipeline().addLast(LineBasedFrameDecoder(1024))//黏包处理,需要客户端、服务端配合 ch.pipeline().addLast(NettyClientHandler(listener, index, isSendheartBeat, heartBeatData)) } }) try { channelFuture = bootstrap.connect(host, tcp_port).addListener { if (it.isSuccess) { Log.d(TAG, "连接成功") reconnectNum = maxConnectTimes connectStatus = true channel = channelFuture?.channel() } else { Log.d(TAG, "连接失败") connectStatus = false } isConnecting = false }.sync() // Wait until the connection is closed. channelFuture.channel().closeFuture().sync() Log.d(TAG, " 断开连接") } catch (e: Exception) { e.printStackTrace() } finally { connectStatus = false listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, index) if (channelFuture != null) { if (channelFuture.channel() != null && channelFuture.channel().isOpen) { channelFuture.channel().close() } } group.shutdownGracefully() reconnect() } } } } fun disconnect() { Log.d(TAG, "disconnect") isNeedReconnect = false group.shutdownGracefully() } fun reconnect() { Log.d(TAG, "reconnect") if (isNeedReconnect && reconnectNum > 0 && !connectStatus) { reconnectNum-- SystemClock.sleep(reconnectIntervalTime) if (isNeedReconnect && reconnectNum > 0 && !connectStatus) { Log.e(TAG, "重新连接") connectServer() } } } /** * 异步发送 * * @param data 要发送的数据 * @param listener 发送结果回调 * @return 方法执行结果 */ fun sendMsgToServer(data: String, listener: MessageStateListener) = channel?.run { val flag = this != null && connectStatus if (flag) { this.writeAndFlush(data + System.getProperty("line.separator")).addListener { channelFuture -> listener.isSendSuccss(channelFuture.isSuccess) } } flag } ?: false /** * 同步发送 * * @param data 要发送的数据 * @return 方法执行结果 */ fun sendMsgToServer(data: String) = channel?.run { val flag = this != null && connectStatus if (flag) { val channelFuture = this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly() return channelFuture.isSuccess } false }?:false fun setListener(listener: NettyClientListener<String>) { this.listener = listener } /** * Builder 模式创建NettyTcpClient */ class Builder { /** * 最大重连次数 */ private var MAX_CONNECT_TIMES = Integer.MAX_VALUE /** * 重连间隔 */ private var reconnectIntervalTime: Long = 5000 /** * 服务器地址 */ private var host: String? = null /** * 服务器端口 */ private var tcp_port: Int = 0 /** * 客户端标识,(因为可能存在多个连接) */ private var mIndex: Int = 0 /** * 是否发送心跳 */ private var isSendheartBeat: Boolean = false /** * 心跳时间间隔 */ private var heartBeatInterval: Long = 5 /** * 心跳数据,可以是String类型,也可以是byte[]. */ private var heartBeatData: Any? = null fun setMaxReconnectTimes(reConnectTimes: Int): Builder { this.MAX_CONNECT_TIMES = reConnectTimes return this } fun setReconnectIntervalTime(reconnectIntervalTime: Long): Builder { this.reconnectIntervalTime = reconnectIntervalTime return this } fun setHost(host: String): Builder { this.host = host return this } fun setTcpPort(tcp_port: Int): Builder { this.tcp_port = tcp_port return this } fun setIndex(mIndex: Int): Builder { this.mIndex = mIndex return this } fun setHeartBeatInterval(intervalTime: Long): Builder { this.heartBeatInterval = intervalTime return this } fun setSendheartBeat(isSendheartBeat: Boolean): Builder { this.isSendheartBeat = isSendheartBeat return this } fun setHeartBeatData(heartBeatData: Any): Builder { this.heartBeatData = heartBeatData return this } fun build(): NettyTcpClient { val nettyTcpClient = NettyTcpClient(host!!, tcp_port, mIndex) nettyTcpClient.maxConnectTimes = this.MAX_CONNECT_TIMES nettyTcpClient.reconnectIntervalTime = this.reconnectIntervalTime nettyTcpClient.heartBeatInterval = this.heartBeatInterval nettyTcpClient.isSendheartBeat = this.isSendheartBeat nettyTcpClient.heartBeatData = this.heartBeatData return nettyTcpClient } } companion object { private val TAG = "NettyTcpClient" private val CONNECT_TIMEOUT_MILLIS = 5000 } } 复制代码
Android 的客户端相对而言比较简单,需要的 Handler 包括:支持心跳的 IdleStateHandler, TCP 消息需要使用的 Handler (跟服务端一样分别是StringEncoder、StringDecoder、LineBasedFrameDecoder),以及对收到 TCP 消息进行处理的 NettyClientHandler。
NettyClientHandler:
class NettyClientHandler(private val listener: NettyClientListener<String>, private val index: Int, private val isSendheartBeat: Boolean, private val heartBeatData: Any?) : SimpleChannelInboundHandler<String>() { /** * * 设定IdleStateHandler心跳检测每x秒进行一次读检测, * 如果x秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法 * * @param ctx ChannelHandlerContext * @param evt IdleStateEvent */ override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { if (evt is IdleStateEvent) { if (evt.state() == IdleState.WRITER_IDLE) { //发送心跳 if (isSendheartBeat) { if (heartBeatData == null) { ctx.channel().writeAndFlush("Heartbeat" + System.getProperty("line.separator")!!) } else { if (heartBeatData is String) { Log.d(TAG, "userEventTriggered: String") ctx.channel().writeAndFlush(heartBeatData + System.getProperty("line.separator")!!) } else if (heartBeatData is ByteArray) { Log.d(TAG, "userEventTriggered: byte") val buf = Unpooled.copiedBuffer((heartBeatData as ByteArray?)!!) ctx.channel().writeAndFlush(buf) } else { Log.d(TAG, "userEventTriggered: heartBeatData type error") } } } else { Log.d(TAG, "不发送心跳") } } } } /** * * 客户端上线 * * @param ctx ChannelHandlerContext */ override fun channelActive(ctx: ChannelHandlerContext) { Log.d(TAG, "channelActive") listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, index) } /** * * 客户端下线 * * @param ctx ChannelHandlerContext */ override fun channelInactive(ctx: ChannelHandlerContext) { Log.d(TAG, "channelInactive") } /** * 客户端收到消息 * * @param channelHandlerContext ChannelHandlerContext * @param msg 消息 */ override fun channelRead0(channelHandlerContext: ChannelHandlerContext, msg: String) { Log.d(TAG, "channelRead0:") listener.onMessageResponseClient(msg, index) } /** * @param ctx ChannelHandlerContext * @param cause 异常 */ override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { Log.e(TAG, "exceptionCaught") listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, index) cause.printStackTrace() ctx.close() } companion object { private val TAG = "NettyClientHandler" } } 复制代码
启动 NettyServer:
private fun startServer() { if (!NettyServer.isServerStart) { NettyServer.setListener(this@MainActivity) NettyServer.port = port NettyServer.webSocketPath = webSocketPath NettyServer.start() } else { NettyServer.disconnect() } } 复制代码
NettyServer 异步发送 TCP 消息:
NettyServer.sendMsgToClient(msg, ChannelFutureListener { channelFuture -> if (channelFuture.isSuccess) { msgSend(msg) } }) 复制代码
NettyServer 异步发送 WebSocket 消息:
NettyServer.sendMsgToWS(msg, ChannelFutureListener { channelFuture -> if (channelFuture.isSuccess) { msgSend(msg) } }) 复制代码
Demo 可以通过 startServer 来启动 Socket 服务端,也可以在启动之前点击 configServer 来修改服务端的端口以及 WebSocket 的 Endpoint。
NettyTcpClient 通过 Builder 模式创建:
mNettyTcpClient = NettyTcpClient.Builder() .setHost(ip) //设置服务端地址 .setTcpPort(port) //设置服务端端口号 .setMaxReconnectTimes(5) //设置最大重连次数 .setReconnectIntervalTime(5) //设置重连间隔时间。单位:秒 .setSendheartBeat(false) //设置发送心跳 .setHeartBeatInterval(5) //设置心跳间隔时间。单位:秒 .setHeartBeatData("I'm is HeartBeatData") //设置心跳数据,可以是String类型,也可以是byte[],以后设置的为准 .setIndex(0) //设置客户端标识.(因为可能存在多个tcp连接) .build() mNettyTcpClient.setListener(this@MainActivity) //设置TCP监听 复制代码
启动、关闭客户端连接:
private fun connect() { Log.d(TAG, "connect") if (!mNettyTcpClient.connectStatus) { mNettyTcpClient.connect()//连接服务器 } else { mNettyTcpClient.disconnect() } } 复制代码
NettyTcpClient 异步发送 TCP 消息到服务端:
mNettyTcpClient.sendMsgToServer(msg, object : MessageStateListener { override fun isSendSuccss(isSuccess: Boolean) { if (isSuccess) { msgSend(msg) } } }) 复制代码
Demo 的客户端 App 也可以在启动之前点击 configClient 来修改要连接的服务端 IP 、端口。
WebSocket 的测试可以通过: www.websocket-test.com/
Netty Server 端跟网页通信:
WebSocket在线测试:
借助 Kotlin 的特性以及 Netty 框架,我们在 Android 上也实现了一个 Socket 服务端。
本文 demo github 地址: github.com/fengzhizi71…
本文的例子很简单,只是发送简单的消息。在实际生产环境中,我们采用的消息格式可能是 json ,因为 json 更加灵活,通过解析 json 获取消息的内容。
参考资料: