假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到的字节 数是不确定的,故可能存在以下 4 种情况。
(1)服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包;
(2)服务端一次接收到了两个数据包,D1 和 D2 粘合在一起,被称为 TCP 粘包;
(3)服务端分两次读取到了两个数据包,第一次读取到了完整的 D1 包和 D2 包的部分 内容,第二次读取到了 D2 包的剩余内容,这被称为 TCP 拆包;
(4)服务端分两次读取到了两个数据包,第一次读取到了 D1 包的部分内容 D1_1,第 二次读取到了 D1 包的剩余内容 D1_2 和 D2 包的整包。
如果此时服务端 TCP 接收滑窗非常小,而数据包 D1 和 D2 比较大,很有可能会发生第 五种可能,即服务端分多次才能将 D1 和 D2 包接收完全,期间发生多次拆包。
由于 TCP 协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会 维持一个连接,数据在连接不断开的情况下,可以持续不断地将多个数据包发 往服务器,但是如果发送的网络数据包太小,那么他本身会启用 Nagle 算法(可配置是否启 用)对较小的数据包进行合并然后再发送。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪 些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲 区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个 数据包的情况,造成粘包现象。
在凌晨等业务低谷时段,如果发生网络闪断、连接被 Hang 住等问题时,由于没有业务 消息,应用程序很难发现。到了白天业务高峰期时,会发生大量的网络通信失败,严重的会 导致一段时间进程内无法处理业务消息。为了解决这个问题,在网络空闲时采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路,主动重连。
/** * @author andychen https://blog.51cto.com/14815984 * @description:Netty客户端业务通道处理器类 * 负责处理和服务端的所有IO业务事件处理 */ public class ClientChannelHandler extends SimpleChannelInboundHandler<ByteBuf> { /** * 统计当前通道第几次读取数据 */ private final AtomicInteger counter = new AtomicInteger(0); /** * 客户端读取网络通道数据后处理 * @param channelHandlerContext 通道上下文(很重要,网络IO业务就靠它了) * @param byteBuf 网络传送过来的数据 * @throws Exception */ protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { //打印接收到的服务端数据和当前统计计数 System.out.println("Accept server data {"+byteBuf.toString(CharsetUtil.UTF_8)+"], the counter is:"+this.counter.incrementAndGet()); } /** * 连接建立成功事件回调 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //连接建立后,循环10次向服务端发送连续报文 ByteBuf buf = null; String msg = null; for (int i=0;i<10;i++){ if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){ UserAddress address = new UserAddress("abc@163.com", "ChengDu.SiChuan"); User user = new User(i,"AndyChen"+i,"WaveBeed"+i, address); ctx.write(user); }else{ msg = "Client message"+i+" data "; if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED){ msg += "(line_based)"+System.getProperty("line.separator"); } else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){ msg += "(custom_based)"+Constant.CUSTOM_SPLIT_MARK; }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){ msg = Constant.FIXED_LEN_CLIENT_TXT; } buf = Unpooled.buffer(msg.length()); buf.writeBytes(msg.getBytes()); ctx.writeAndFlush(buf); } } if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){ ctx.flush(); } } /** * 处理器异常集中处理 * @param ctx 处理器上下文 * @param cause 异常 * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Netty客户端启动器类 */ public class NettyClientStarter { //采用netty内置定时器 private static final HashedWheelTimer timer = new HashedWheelTimer(); /** * 启动客户端 * @param args */ public static void main(String[] args){ try { start(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 启动客户端 */ private static void start() throws InterruptedException { //线程组 EventLoopGroup group = new NioEventLoopGroup(); try { //Netty客户端启动类 final Bootstrap boot = new Bootstrap(); boot.group(group)//将线程组绑定到启动器 .channel(NioSocketChannel.class)//使用NIO进行网络传输 //绑定服务端连接地址 .remoteAddress(new InetSocketAddress(Constant.SERV_HOST, Constant.SERV_PORT)); /** * 定义监听器 */ final ChannelMonitor<Channel> monitor = new ClientMonitorHandler(Constant.SERV_HOST, Constant.SERV_PORT, timer, boot); boot.handler(new ChannelInitializer<Channel>(){ protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(monitor.setMonitorHandlers()); } }); //这里会阻塞,直到连接完成 ChannelFuture future = boot.connect().sync(); System.out.println("Has connected to server:"+Constant.SERV_PORT+" ..."); //这里也会阻塞,直到连接通道关闭 future.channel().closeFuture().sync(); }finally { //线程池组关闭 //group.shutdownGracefully().sync(); } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Netty服务端业务通道处理器类 * 负责向对端响应业务应答和服务端相关IO业务处理 */ public class ServerChannelHandler extends ChannelInboundHandlerAdapter { /** * 消息接收计数器 */ private final AtomicInteger counter = new AtomicInteger(0); /** * 接收客户端发送的数据 * @param ctx 处理器上下文 * @param msg 消息 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String clientData = null; User user = null; if(msg instanceof User){ user = (User)msg; clientData = user.toString(); }else{ clientData = ((ByteBuf)msg).toString(CharsetUtil.UTF_8); } System.out.println("Accept client data ["+clientData+"], the counter is:"+this.counter.incrementAndGet()); //回馈消息给客户端 String toClientData = "Data has been accepted by server"; if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED || Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){ toClientData += "(line_based)"+System.getProperty("line.separator"); } else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){ toClientData += "(custom_based)"+Constant.CUSTOM_SPLIT_MARK; }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){ toClientData = Constant.FIXED_LEN_SERVER_TXT; } ctx.writeAndFlush(Unpooled.copiedBuffer(toClientData.getBytes())); if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN && null != user){ ctx.fireChannelRead(user); } } /** * * @param ctx 处理器上下文 * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("Client data recevied completed!"); } /** * 管道关闭时触发 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client: "+ctx.channel().remoteAddress()+" channel will close..."); } /** * 异常处理器 * @param ctx 处理器上下文 * @param cause 异常 * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:Netty服务端启动器类 */ public class NettyServerStarter { /** * 启动服务器 * @param args */ public static void main(String[] args) { try { start(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 启动服务器端 */ private static void start() throws InterruptedException { //线程组 EventLoopGroup mainGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { //服务端启动器 ServerBootstrap boot = new ServerBootstrap(); boot.group(mainGroup, workGroup)//将线程组绑定到启动器 .channel(NioServerSocketChannel.class)//使用NIO进行网络通讯 .localAddress(new InetSocketAddress(Constant.SERV_PORT))//绑定本地端口监听 .childHandler(new ChannelInitializerExt());//为Channel添加业务处理器 //异步绑定服务器端口,sync()方法阻塞,直到绑定完成 ChannelFuture future = boot.bind().sync(); System.out.println("Server address:"+Constant.SERV_PORT+" has bind complete,waiting for data..."); //这里通道会阻塞,直到通道关闭 future.channel().closeFuture().sync(); }finally { //优雅地关闭线程池组 mainGroup.shutdownGracefully().sync(); workGroup.shutdownGracefully().sync(); } } /** * 通道初始化器扩展 * 负责定义初始化各种业务ChannelHandler */ private static class ChannelInitializerExt extends ChannelInitializer<Channel>{ /** * 初始化Channel * @param channel 当前通道 * @throws Exception */ protected void initChannel(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); //添加对客户端心跳检测 p.addLast("heartbeat-state",new IdleStateHandler(Constant.HEART_BEAT_TIMEOUT,0,0, TimeUnit.SECONDS)); p.addLast("heartbeat-check", new ServerHeartbeatHandler()); /** * 解决粘包半包问题办法一:字符分隔 */ //支持一:回车分隔 if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED){ p.addLast("linebase", new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN)); } //支持二:自定义分隔符分隔 if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED) { ByteBuf byteMark = Unpooled.copiedBuffer(Constant.CUSTOM_SPLIT_MARK.getBytes()); p.addLast("custom", new DelimiterBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN, byteMark)); } /** * 解决粘包半包问题办法二:固定长度 */ if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN) { p.addLast("fixlength", new FixedLengthFrameDecoder(Constant.FIXED_LEN_CLIENT_TXT.length())); } /** * 解决粘包半包问题办法三:带长度 */ if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN) { p.addLast("sendlen", new LengthFieldBasedFrameDecoder(65535,0,2,0,2)); p.addLast(new ProtoStuffDecoder<User>(User.class)); p.addLast("linebase", new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN)); } //业务处理Handler--往往此Handler是注册在管道的最后节点 p.addLast(new ServerChannelHandler()); } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:客户端心跳检测处理器 */ public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter { /** * 事件触发器 * @param ctx 处理器上下文 * @param evt 触发对象 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { /** * 构建心跳字节序列 */ String seq = Constant.HEART_BEAT_CLIN_MSG; if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED || Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){ seq += System.getProperty("line.separator"); } else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){ seq += Constant.CUSTOM_SPLIT_MARK; }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){ seq = Constant.FIXED_LEN_SERVER_TXT; } final ByteBuf HEARBEAT_SEQ = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(seq, CharsetUtil.UTF_8));//ISO_8859_1 /** * 发送心跳消息,并在发送失败时关闭连接 */ if(evt instanceof IdleStateEvent){ IdleState state = ((IdleStateEvent)evt).state(); /** * 服务端是数据接收端,那么客户端就是Write * 客户端5秒钟未收到应答,则认为发送心跳包检测 */ if(state == IdleState.WRITER_IDLE){ ctx.writeAndFlush(HEARBEAT_SEQ).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }else{ //非心跳事件,将它传递给下个处理器处理 super.userEventTriggered(ctx, evt); } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:服务端心跳检测处理器 */ public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter { /** * 事件触发器 * @param ctx 处理器上下文 * @param evt 触发对象 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { /** * 构建心跳字节序列 */ String seq = Constant.HEART_BEAT_SERV_MSG; if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED || Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){ seq += System.getProperty("line.separator"); } else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED){ seq += Constant.CUSTOM_SPLIT_MARK; }else if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN){ seq = Constant.FIXED_LEN_SERVER_TXT; } final ByteBuf HEARBEAT_SEQ = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(seq, CharsetUtil.UTF_8));//ISO_8859_1 /** * 发送心跳消息,并在发送失败时关闭连接 */ if(evt instanceof IdleStateEvent){ IdleState state = ((IdleStateEvent)evt).state(); /** * 客户端是数据发送端,那么服务器端就是Read * 服务器端5秒钟未收到应答,则认为发送心跳包检测 */ if(state == IdleState.READER_IDLE){ ctx.writeAndFlush(HEARBEAT_SEQ).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }else{ //非心跳事件,将它传递给下个处理器处理 super.userEventTriggered(ctx, evt); } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:通道监视器设计类 * 负责定时监听与对端的连接,当连接断开时,自动重试连接 */ @ChannelHandler.Sharable public abstract class ChannelMonitor<T extends Channel> extends ChannelInboundHandlerAdapter implements MonitoredChannelHandler, TimerTask { private final String host;//监视的主机 private final int port;//监视的端口 private final Timer timer;//定时器 private final AbstractBootstrap boot;//启动对象 private final AtomicInteger counter = new AtomicInteger(0);//重试次数统计 public ChannelMonitor(String host, int port, Timer timer, AbstractBootstrap boot) { this.host = host; this.port = port; this.timer = timer; this.boot = boot; } /** * 运行定时任务检测通道是否断开 * @param timeout 定时时间 * @throws Exception */ public void run(Timeout timeout) throws Exception { ChannelFuture future = null; /** * 设置绑定 */ //synchronized (this.boot) { if (this.boot instanceof ServerBootstrap) { ((ServerBootstrap) this.boot).childHandler(new ChannelInitializerExt()); future = this.boot.bind(new InetSocketAddress(this.host, this.port)); } else { this.boot.handler(new ChannelInitializerExt()); future = ((Bootstrap) this.boot).connect(this.host, this.port); } //} /** * 监听通道连接 */ future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { boolean success = channelFuture.isSuccess(); System.out.println("Retry channel connection "+(success ? "success" : "fail")+"("+counter.get()+"times)."); if(!success){ channelFuture.channel().pipeline().fireChannelInactive(); } } }); } /** * 连接断开事件处理 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Monitored channel will disconnection..."); System.out.println("Monitored channel disconnected and will auto reconnect..."); int times = this.counter.incrementAndGet(); if(Constant.CH_RETRY_TIMES >= times){ //逐步延长重试时间 int timeout = 4 << times; this.timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); } //触发对下个断开事件的调用 ctx.fireChannelInactive(); } /** * 初始化器扩展 */ private class ChannelInitializerExt extends ChannelInitializer<T> { //加入handler protected void initChannel(T t) throws Exception { ChannelHandler[] handlers = setMonitorHandlers(); t.pipeline().addLast(handlers); } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:客户端监视器实现类 */ @ChannelHandler.Sharable public class ClientMonitorHandler extends ChannelMonitor { public ClientMonitorHandler(String host, int port, Timer timer, AbstractBootstrap boot) { super(host, port, timer, boot); } /** * 定义客户端处理器 * @return */ public ChannelHandler[] setMonitorHandlers() { int aLen = (Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN ? 7 : 5); ChannelHandler[] handlers = new ChannelHandler[aLen]; handlers[0] = this;//监听器本身也是Handler,这里也加入 handlers[1] = new IdleStateHandler(0, Constant.HEART_BEAT_TIMEOUT+1,0, TimeUnit.SECONDS); handlers[2] = new ClientHeartbeatHandler(); /** * 解决粘包半包问题办法一:字符分隔 */ //支持一:回车分隔 if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.LINE_BASED){ handlers[3] = new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN); } //支持二:自定义分隔符分隔 if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.DELIMITER_BASED) { ByteBuf byteMark = Unpooled.copiedBuffer(Constant.CUSTOM_SPLIT_MARK.getBytes()); handlers[3] = new DelimiterBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN, byteMark); } /** * 解决粘包半包问题办法二:固定长度 */ if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.FIXED_LEN) { handlers[3] = new FixedLengthFrameDecoder(Constant.FIXED_LEN_SERVER_TXT.length()); } /** * 解决粘包半包问题办法三:带长度 */ if(Constant.STICKUP_SOLUTION_TYPE == StickupSolutionType.SEND_LEN){ handlers[3] = new LengthFieldPrepender(2); handlers[4] = new ProtoStuffEncoder<User>(User.class); handlers[5] = new LineBasedFrameDecoder(Constant.DECODER_BYTE_MAX_LEN); } handlers[aLen-1] = new ClientChannelHandler(); return handlers; } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:定义被监视的ChannelHandler */ public interface MonitoredChannelHandler { /** * 设置被监视的ChannelHandler * @return */ ChannelHandler[] setMonitorHandlers(); }
/** * @author andychen https://blog.51cto.com/14815984 * @description:ProtoStuff解码器类 */ public class ProtoStuffDecoder<T> extends MessageToMessageDecoder<ByteBuf> { private final RuntimeSchema<T> schema; public ProtoStuffDecoder(Class<T> clazz) { this.schema = (RuntimeSchema<T>) RuntimeSchema.createFrom(clazz); } /** * 自定义实现高效解码方法 * @param channelHandlerContext 处理器上下文 * @param byteBuf 缓冲对象 * @param list 对象集合 * @throws Exception */ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { int length = byteBuf.readableBytes(); if(0 < length){ byte[] bytes = new byte[length]; byteBuf.getBytes(byteBuf.readerIndex(), bytes, 0, length); T t = this.schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, t, this.schema); list.add(t); } } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:序列化编码器 */ public class ProtoStuffEncoder<T> extends MessageToByteEncoder<Object> { private final RuntimeSchema<T> schema; public ProtoStuffEncoder(Class<T> clazz) { this.schema = (RuntimeSchema<T>) RuntimeSchema.createFrom(clazz); } /** * 实现高效序列化方法 * @param channelHandlerContext 处理器上下文 * @param o 传输对象 * @param byteBuf buffer缓冲 * @throws Exception */ protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { byte[] bytes = ProtostuffIOUtil.toByteArray((T)o, this.schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)); byteBuf.writeBytes(bytes); } }
/** * @author andychen https://blog.51cto.com/14815984 * @description:系统相关常量配置 * 建议直接从全局应用配置文件中 */ public class Constant { /** * 服务器端ip */ public static final String SERV_HOST = ""; /** * 服务端监听端口 */ public static final int SERV_PORT = 6888; /** * 解码器处理字节最大长度 */ public static final int DECODER_BYTE_MAX_LEN = 1024; /** * 固定长度文本 */ public static final String FIXED_LEN_CLIENT_TXT = "Netty Client Fixed Len Text"; /** * 固定长度文本 */ public static final String FIXED_LEN_SERVER_TXT = "Netty Server Fixed Len Text"; /** * 自定义分隔标记 */ public static final String CUSTOM_SPLIT_MARK = "@#"; /** * 粘包解决方法 */ public static final StickupSolutionType STICKUP_SOLUTION_TYPE = StickupSolutionType.LINE_BASED; /** * 心跳超时时间 */ public static final int HEART_BEAT_TIMEOUT = 4; /** * 服务端心跳 */ public static final String HEART_BEAT_SERV_MSG = "SERV_HEARTBEAT"; /** * 客户端心跳 */ public static final String HEART_BEAT_CLIN_MSG = "CLIN_HEARTBEAT"; /** * 通道重试次数 */ public static final int CH_RETRY_TIMES = 30; }
/** * @author andychen https://blog.51cto.com/14815984 * @description:粘包解决类型定义 */ public enum StickupSolutionType{ LINE_BASED(1, "回车分隔"), DELIMITER_BASED(2,"自定义分隔"), FIXED_LEN(4, "固定长度"), SEND_LEN(8, "带长度"); public int getCode() { return code; } public String getMsg() { return msg; } private final int code; private final String msg; StickupSolutionType(int code, String msg) { this.code = code; this.msg = msg; } }