个人博客
www.milovetingting.cn
本文介绍基于Netty实现的服务端与客户端通信的简单使用方法,并在此基础上实现一个简单的服务端-客户端指令通信的Demo。
Netty是一个NIO客户端-服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化了网络编程,例如TCP和UDP套接字服务器的开发。提供一个异步事件驱动的网络应用程序框架和工具,以快速开发可维护的高性能和高可扩展性协议服务器和客户端。
以上内容摘选自 netty.io/wiki/user-g…
Netty具有以下特点:
以上内容摘选自 netty.io/
Netty的使用,可以参照Netty的官方文档,这里以4.x为例来演示Netty在服务端和客户端上使用。文档地址: netty.io/wiki/user-g…
这里用Eclipse来进行开发,服务端和客户端都放在一个工程里。
新建Java工程
首先需要导入netty的jar包。这里使用netty-all-4.1.48.Final.jar。
新建NettyServer类
public class NettyServer { private int mPort; public NettyServer(int port) { this.mPort = port; } public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 指定连接队列大小 .option(ChannelOption.SO_BACKLOG, 128) //KeepAlive .childOption(ChannelOption.SO_KEEPALIVE, true) //Handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new NettyServerHandler()); } }); ChannelFuture f = b.bind(mPort).sync(); if (f.isSuccess()) { LogUtil.log("Server,启动Netty服务端成功,端口号:" + mPort); } // f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // workerGroup.shutdownGracefully(); // bossGroup.shutdownGracefully(); } } } 复制代码
在初始化时,需要指定Handle,用来处理Channel相关业务。
public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Server,channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LogUtil.log("Server,接收到客户端发来的消息:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LogUtil.log("Server,exceptionCaught"); cause.printStackTrace(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Server,channelInactive"); } } 复制代码
经过上面这些步骤后,服务端最基本的设置就完成了。
客户端和服务端在初始化时大体是类似的,不过相比服务端要简单一些。
public class NettyClient { private String mHost; private int mPort; private NettyClientHandler mClientHandler; private ChannelFuture mChannelFuture; public NettyClient(String host, int port) { this.mHost = host; this.mPort = port; } public void connect() { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); mClientHandler = new NettyClientHandler(); b.group(workerGroup).channel(NioSocketChannel.class) // KeepAlive .option(ChannelOption.SO_KEEPALIVE, true) // Handler .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(mClientHandler); } }); mChannelFuture = b.connect(mHost, mPort).sync(); if (mChannelFuture.isSuccess()) { LogUtil.log("Client,连接服务端成功"); } mChannelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } } 复制代码
public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Client,channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LogUtil.log("Client,接收到服务端发来的消息:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LogUtil.log("Client,exceptionCaught"); cause.printStackTrace(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Client,channelInactive"); } } 复制代码
到这里,客户端最基本设置就完成了。
新建一个Main类,用于测试服务端和客户端是否能正常连接。
public class Main { public static void main(String[] args) { try { String host = "127.0.0.1"; int port = 12345; NettyServer server = new NettyServer(port); server.run(); Thread.sleep(1000); NettyClient client = new NettyClient(host, port); client.connect(); } catch (Exception e) { e.printStackTrace(); } } } 复制代码
运行main方法,输出日志如下:
2020-4-13 0:11:02--Server,启动Netty服务端成功,端口号:12345 2020-4-13 0:11:03--Client,channelActive 2020-4-13 0:11:03--Client,连接服务端成功 2020-4-13 0:11:03--Server,channelActive 复制代码
可以看到,客户端成功连接上了服务端,服务端和客户端里设置的Handler的channelActive方法都会回调。
在服务端与客户端连接成功后,我们往往需要在双方间进行通信。这里假定,在连接成功后,服务端给客户端发送一个欢迎信息"你好,客户端",而客户端在收到服务端的消息后,也给服务端回复一个消息"你好,服务端"。下面来实现具体的功能。
修改服务端NettyServerHandler中的channelActive方法和channelRead方法,在channelActive方法中给客户端发送消息,在channelRead方法中解析客户端发来的消息
public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Server,channelActive"); ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端", Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] buffer = new byte[buf.readableBytes()]; buf.readBytes(buffer); String message = new String(buffer, "utf-8"); LogUtil.log("Server,接收到客户端发来的消息:" + message); } } 复制代码
修改客户端NettyClientHandler中的channelRead方法,当收到服务端的消息时,回复服务端
public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] buffer = new byte[buf.readableBytes()]; buf.readBytes(buffer); String message = new String(buffer,"utf-8"); LogUtil.log("Client,接收到服务端发来的消息:" + message); ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服务端", Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); } } 复制代码
运行后,输出日志如下:
2020-4-13 0:29:16--Server,启动Netty服务端成功,端口号:12345 2020-4-13 0:29:17--Client,channelActive 2020-4-13 0:29:17--Client,连接服务端成功 2020-4-13 0:29:17--Server,channelActive 2020-4-13 0:29:17--Client,接收到服务端发来的消息:你好,客户端 2020-4-13 0:29:17--Server,接收到客户端发来的消息:你好,服务端 复制代码
可以看到,服务端与客户端已经可以正常通信。
在实际的使用场景中,可能会存在短时间内大量数据发送的问题。我们模拟这个场景。在客户端连接上服务端后,服务端给客户端发送100个消息,而为便于分析,客户端在收到服务端消息后,不作回复。
修改服务端中NettyServerHandler的channelActive方法
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Server,channelActive"); for (int i = 0; i < 100; i++) { ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端", Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); } } 复制代码
修改客户端中NettyClientHandler的channelRead方法
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] buffer = new byte[buf.readableBytes()]; buf.readBytes(buffer); String message = new String(buffer, "utf-8"); LogUtil.log("Client,接收到服务端发来的消息:" + message); //ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服务端", Charset.forName("utf-8")); //ctx.writeAndFlush(byteBuf); } 复制代码
运行后,输出的部分结果如下:
2020-4-13 0:35:28--Server,启动Netty服务端成功,端口号:12345 2020-4-13 0:35:29--Client,channelActive 2020-4-13 0:35:29--Client,连接服务端成功 2020-4-13 0:35:29--Server,channelActive 2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端 2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端 2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端 复制代码
可以看到,出现了多条消息"粘"在一起的情况。
TCP是个"流"协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
以上内容摘选自 TCP粘包/拆包与Netty解决方案
在没有 Netty 的情况下,用户如果自己需要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
以上内容摘选自 彻底理解Netty,这一篇文章就够了
而使用Netty,则解决这个问题的方法就简单多了。Netty已经提供了四个拆包器:
在这里,我们选用分隔符拆包器
首先定义分隔符
public class Config { public static final String DATA_PACK_SEPARATOR = "#$&*"; } 复制代码
在服务端的channelHandler配置中,需要增加
@Override protected void initChannel(SocketChannel channel) throws Exception { //这个配置需要在添加Handler前设置 channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes()))); channel.pipeline().addLast(new NettyServerHandler()); } 复制代码
在客户端的channelHandler的配置中,同样也需要增加
@Override protected void initChannel(SocketChannel channel) throws Exception { //这个配置需要在添加Handler前设置 channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes()))); channel.pipeline().addLast(new NettyServerHandler()); } 复制代码
发送数据时,在数据的末尾增加分隔符:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Server,channelActive"); for (int i = 0; i < 100; i++) { ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端"+Config.DATA_PACK_SEPARATOR, Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); } } 复制代码
运行后,可以发现,已经解决"粘包"与"拆包"的问题。
在网络应用中,为了判断连接是否还存在,一般会通过发送心跳包来检测。在Netty中,配置心跳包的步骤如下
在客户端的channelHandler的配置中,需要增加
@Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new IdleStateHandler(5, 5, 10)); //... } 复制代码
在NettyClientHandler中,重写userEventTriggered方法
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; LogUtil.log("Client,Idle:" + event.state()); switch (event.state()) { case READER_IDLE: break; case WRITER_IDLE: ByteBuf byteBuf = Unpooled.copiedBuffer("心跳^v^v", Charset.forName("utf-8")); break; case ALL_IDLE: break; default: super.userEventTriggered(ctx, evt); break; } } 复制代码
当写空闲达到配置的时间时,往服务端发送一个心跳消息
运行后,日志输出如下:
2020-4-13 1:22:50--Server,启动Netty服务端成功,端口号:12345 2020-4-13 1:22:51--Client,channelActive 2020-4-13 1:22:51--Client,连接服务端成功 2020-4-13 1:22:51--Server,channelActive 2020-4-13 1:22:51--Client,接收到服务端发来的消息:你好,客户端 2020-4-13 1:22:56--Client,Idle:WRITER_IDLE 2020-4-13 1:22:56--Server,接收到客户端发来的消息:心跳^v^ 2020-4-13 1:22:56--Client,Idle:READER_IDLE 2020-4-13 1:23:01--Client,Idle:WRITER_IDLE 2020-4-13 1:23:01--Server,接收到客户端发来的消息:心跳^v^ 2020-4-13 1:23:01--Client,Idle:READER_IDLE 复制代码
可以看到,心跳包按我们配置的时间正常输出了。
我们上面在发送数据时,需要通过ByteBuf来转换String,而通过配置编码,解码器,我们就可以直接发送字符串。配置如下:
在服务端与客户端的channelHandler分别增加以下配置:
@Override protected void initChannel(SocketChannel channel) throws Exception { //... //这个配置需要在添加Handler前设置 channel.pipeline().addLast("encoder", new StringEncoder()); channel.pipeline().addLast("decoder", new StringDecoder()); //... } 复制代码
在发送消息时,则可以直接通过 ctx.writeAndFlush("心跳^v^" + Config.DATA_PACK_SEPARATOR)
的形式来发送。
到此,最简单的服务端与客户端通信的Demo已经完成。源码地址: github.com/milovetingt…
在上面的基础上,我们来实现一个下面的需求:
客户端需要登录到服务端
客户端登录成功后,服务端可以给客户端发送指令消息,客户端在收到消息及处理完消息后,都需要上报给服务端
为便于程序扩展,我们将客户端连接服务端的部分抽取出来。通过一个接口来定义连接的方法,而连接的具体实现由子类来实现。
定义接口
public interface IConnection { /** * 连接服务器 * * @param host 服务器地址 * @param port 端口 * @param callback 连接回调 */ public void connect(String host, int port, IConnectionCallback callback); } 复制代码
在这里还需要定义连接的回调接口
public interface IConnectionCallback { /** * 连接成功 */ public void onConnected(); } 复制代码
具体的连接实现类
public class NettyConnection implements IConnection { private NettyClient mClient; @Override public void connect(String host, int port, IConnectionCallback callback) { if (mClient == null) { mClient = new NettyClient(host, port); mClient.setConnectionCallBack(callback); mClient.connect(); } } } 复制代码
为便于管理连接,定义一个连接的管理类
public class ConnectionManager implements IConnection { private static IConnection mConnection; private ConnectionManager() { } static class ConnectionManagerInner { private static ConnectionManager INSTANCE = new ConnectionManager(); } public static ConnectionManager getInstance() { return ConnectionManagerInner.INSTANCE; } public static void initConnection(IConnection connection) { mConnection = connection; } private void checkInit() { if (mConnection == null) { throw new IllegalAccessError("please invoke initConnection first!"); } } @Override public void connect(String host, int port, IConnectionCallback callback) { checkInit(); mConnection.connect(host, port, callback); } } 复制代码
调用连接:
public class Main { public static void main(String[] args) { try { String host = "127.0.0.1"; int port = 12345; NettyServer server = new NettyServer(port); server.run(); Thread.sleep(1000); ConnectionManager.initConnection(new NettyConnection()); ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() { @Override public void onConnected() { LogUtil.log("Main,onConnected");); } }); } catch (Exception e) { e.printStackTrace(); } } } 复制代码
在调用connect方法前,需要先调用initConnection来指定具体的连接类
在连接成功后,服务端会给客户端发送一个欢迎的消息。为便于管理,我们定义一个消息Bean
public class Msg { /** * 欢迎 */ public static final int TYPE_WELCOME = 0; public int type; public String msg; } 复制代码
服务端发送消息
public class NettyServerHandler extends ChannelInboundHandlerAdapter { private ChannelHandlerContextWrapper mChannelHandlerContextWrapper; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LogUtil.log("Server,channelActive"); mChannelHandlerContextWrapper = new ChannelHandlerContextWrapper(ctx); MsgUtil.sendWelcomeMsg(mChannelHandlerContextWrapper); } } 复制代码
在这里,通过定义一个ChannelHandlerContextWrapper类来统一管理消息分隔符
public class ChannelHandlerContextWrapper { private ChannelHandlerContext mContext; public ChannelHandlerContextWrapper(ChannelHandlerContext context) { this.mContext = context; } /** * 包装writeAndFlush方法 * * @param object */ public void writeAndFlush(Object object) { mContext.writeAndFlush(object + Config.DATA_PACK_SEPARATOR); } } 复制代码
再进一步,通过定义MsgUtil类来封装发送欢迎消息
public class MsgUtil { /** * 发送欢迎消息 * * @param wrapper */ public static void sendWelcomeMsg(ChannelHandlerContextWrapper wrapper) { Msg msg = new Msg(); msg.type = Msg.TYPE_WELCOME; msg.msg = "你好,客户端"; wrapper.writeAndFlush(Global.sGson.toJson(msg)); } } 复制代码
对于客户端而言,为方便处理消息,我们需要定义一个方法来接收消息。通过在IConnection接口中新增一个registerMsgCallback方法来实现
public interface IConnection { /** * 连接服务器 * * @param host 服务器地址 * @param port 端口 * @param callback 连接回调 */ public void connect(String host, int port, IConnectionCallback callback); /** * 注册消息回调 * * @param callback */ public void registerMsgCallback(IMsgCallback callback); } 复制代码
在这里,还需要新增IMsgCallback接口
public interface IMsgCallback { /** * 接收到消息时的回调 * * @param msg */ public void onMsgReceived(Msg msg); } 复制代码
对应到实现类
public class NettyConnection implements IConnection { private NettyClient mClient; @Override public void connect(String host, int port, IConnectionCallback callback) { if (mClient == null) { mClient = new NettyClient(host, port); mClient.setConnectionCallBack(callback); mClient.connect(); } } @Override public void registerMsgCallback(IMsgCallback callback) { if (mClient == null) { throw new IllegalAccessError("please invoke connect first!"); } mClient.registerMsgCallback(callback); } } 复制代码
在客户端,为便于处理消息,我们对消息类型进行划分
修改消息Bean
public class Msg { /** * 欢迎 */ public static final int TYPE_WELCOME = 0; /** * 心跳 */ public static final int TYPE_HEART_BEAT = 1; /** * 登录 */ public static final int TYPE_LOGIN = 2; public static final int TYPE_COMMAND_A = 3; public static final int TYPE_COMMAND_B = 4; public static final int TYPE_COMMAND_C = 5; public int type; public String msg; } 复制代码
假定消息是串行的,需要一个一个地处理。为便于管理消息,增加MsgQueue类
public class MsgQueue { private PriorityBlockingQueue<Msg> mQueue; private boolean using; private MsgQueue() { mQueue = new PriorityBlockingQueue<>(128, new Comparator<Msg>() { @Override public int compare(Msg msg1, Msg msg2) { int res = msg2.priority - msg1.priority; if (res == 0 && msg1.time != msg2.time) { return (int) (msg2.time - msg1.time); } return res; } }); } public static MsgQueue getInstance() { return MsgQueueInner.INSTANCE; } private static class MsgQueueInner { private static final MsgQueue INSTANCE = new MsgQueue(); } /** * 将消息加入消息队列 * * @param msg */ public void enqueueMsg(Msg msg) { mQueue.add(msg); } /** * 从消息队列获取消息 * * @return */ public synchronized Msg next() { if (using) { return null; } Msg msg = mQueue.poll(); if (msg != null) { makeUse(true); } return msg; } /** * 标记使用状态 * * @param use */ public synchronized void makeUse(boolean use) { using = use; } /** * 是否能够使用 * * @return */ public synchronized boolean canUse() { return !using; } } 复制代码
增加消息的分发类MsgDispatcher
public class MsgDispatcher { private static Map<Integer, Class<? extends IMsgHandler>> mHandlerMap; static { mHandlerMap = new HashMap<>(); mHandlerMap.put(Msg.TYPE_WELCOME, WelcomeMsgHandler.class); mHandlerMap.put(Msg.TYPE_HEART_BEAT, HeartBeatMsgHandler.class); mHandlerMap.put(Msg.TYPE_LOGIN, HeartBeatMsgHandler.class); mHandlerMap.put(Msg.TYPE_COMMAND_A, CommandAMsgHandler.class); mHandlerMap.put(Msg.TYPE_COMMAND_B, CommandBMsgHandler.class); mHandlerMap.put(Msg.TYPE_COMMAND_C, CommandCMsgHandler.class); } public static void dispatch() { if (MsgQueue.getInstance().canUse()) { Msg msg = MsgQueue.getInstance().next(); if (msg == null) { return; } dispatch(msg); } } public static void dispatch(Msg msg) { try { IMsgHandler handler = (IMsgHandler) Class.forName(mHandlerMap.get(msg.type).getName()).newInstance(); handler.handle(msg); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } 复制代码
定义IMsgHandler,在这里定义了处理的方法,具体实现由子类实现
public interface IMsgHandler { /** * 处理消息 * * @param msg */ public void handle(Msg msg); } 复制代码
为统一管理,定义Base类BaseCommandHandler
public abstract class BaseCommandHandler implements IMsgHandler { @Override public void handle(Msg msg) { execute(msg); } public final void execute(Msg msg) { LogUtil.log("Client,received command:" + msg); doHandle(msg); MsgQueue.getInstance().makeUse(false); LogUtil.log("Client,report command:" + msg); MsgDispatcher.dispatch(); } public abstract void doHandle(Msg msg); } 复制代码
在BaseCommandHandler中,定义execute方法,顺序调用:上报消息已接收成功、处理消息、上报消息已处理完成。这里的消息上报部分,都只是输出一个日志来代替,在实际的业务中,可以抽取出一个抽象方法,让子类来实现。
定义子类,继承自BaseCommandHandler
public class LoginMsgHandler extends BaseCommandHandler { @Override public void doHandle(Msg msg) { LogUtil.log("Client,handle msg:" + msg); } } 复制代码
对应的心跳类型消息、欢迎类型消息等,都可以新增对应的处理类来实现,这里不再展开。
接收到消息时的处理
public class Main { public static void main(String[] args) { try { String host = "127.0.0.1"; int port = 12345; NettyServer server = new NettyServer(port); server.run(); Thread.sleep(1000); ConnectionManager.initConnection(new NettyConnection()); ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() { @Override public void onConnected() { LogUtil.log("Main,onConnected"); ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() { @Override public void onMsgReceived(Msg msg) { MsgQueue.getInstance().enqueueMsg(msg); MsgDispatcher.dispatch(); } }); } }); } catch (Exception e) { e.printStackTrace(); } } } 复制代码
修改消息Bean,增加登录的请求和响应
public class Msg { /** * 欢迎 */ public static final int TYPE_WELCOME = 0; /** * 心跳 */ public static final int TYPE_HEART_BEAT = 1; /** * 登录 */ public static final int TYPE_LOGIN = 2; public static final int TYPE_COMMAND_A = 3; public static final int TYPE_COMMAND_B = 4; public static final int TYPE_COMMAND_C = 5; public int type; public String msg; public int priority; public long time; /** * 登录请求信息 * * @author Administrator * */ public static class LoginRuquestInfo { /** * 用户名 */ public String user; /** * 密码 */ public String pwd; @Override public String toString() { return "LoginRuquestInfo [user=" + user + ", pwd=" + pwd + "]"; } } /** * 登录响应信息 * * @author Administrator * */ public static class LoginResponseInfo { /** * 登录成功 */ public static final int CODE_SUCCESS = 0; /** * 登录失败 */ public static final int CODE_FAILED = 100; /** * 响应码 */ public int code; /** * 响应数据 */ public String data; public static class ResponseData { public String token; } @Override public String toString() { return "LoginResponseInfo [code=" + code + ", data=" + data + "]"; } } } 复制代码
发送登录请求
public class Main { public static void main(String[] args) { try { String host = "127.0.0.1"; int port = 12345; NettyServer server = new NettyServer(port); server.run(); Thread.sleep(1000); ConnectionManager.initConnection(new NettyConnection()); ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() { @Override public void onConnected() { LogUtil.log("Main,onConnected"); ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() { @Override public void onMsgReceived(Msg msg) { MsgQueue.getInstance().enqueueMsg(msg); MsgDispatcher.dispatch(); } }); Msg msg = new Msg(); msg.type = Msg.TYPE_LOGIN; Msg.LoginRuquestInfo request = new LoginRuquestInfo(); request.user = "wangyz"; request.pwd = "wangyz"; Gson gson = new Gson(); msg.msg = gson.toJson(request); ConnectionManager.getInstance().sendMsg(msg); } }); } catch (Exception e) { e.printStackTrace(); } } } 复制代码
这里,引入Gson,将消息Bean转成json字符串后发送。
对应到服务端,为便于解析出消息,也需要对应的修改消息的Bean。服务端对消息的具体分发与处理,和客户端类似,这里不再展开。
由于篇幅限制,Demo中指令的优先级处理,模拟服务端指令下发等,这里没有再进一步详细介绍,具体可以参考源码: github.com/milovetingt…