实现基于Netty的“请求-响应”同步通信机制。
Netty提供了异步IO和同步IO的统一实现,但是我们的需求其实和IO的同步异步并无关系。我们的关键是要实现请求-响应这种典型的一问一答交互方式。要实现这个需求,需要解决两个问题:
客户端发送数据后,服务端返回响应结果的时候,怎么和客户端的请求正确匹配起来呢,(即一个请求对应一个自己的响应)?
解决思路:通过客户端唯一的RequestId,服务端返回的响应中需要包含该RequestId,这样客户端就可以通过RequestId来正确匹配请求响应。
请求线程会在发出请求后,同步等待服务端的返回。因此,就需要解决,Netty客户端在接受到响应之后,怎么通知请求线程结果。
解决思路:客户端线程在发送请求后,进入等待,服务器返回响应后,根据RequestId来唤醒客户端的请求线程,并把结果返回给请求线程。
利用Java中的CountDownLatch类来实现同步Future。
具体过程是:客户端发送请求后将<请求ID,Future>的键值对保存到一个缓存中,这时候用Future等待结果,挂住请求线程;当Netty客户端收到服务端的响应后,响应线程根据请求ID从缓存中取出Future,然后设置响应结果到Future中。这个时候利用CountDownLatch的通知机制,通知请求线程。请求线程从Future中拿到响应结果,然后做业务处理。
缓存使用google的guava
首先引入依赖
<!-- guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.0-jre</version> </dependency> <!-- Netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
SyncFuture: 同步的Future。这个是核心,通过这个工具类来实现线程等待。
package com.topinfo.ci.netty.client; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class SyncFuture<T> implements Future<T> { // 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。 private CountDownLatch latch = new CountDownLatch(1); // 需要响应线程设置的响应结果 private T response; // Futrue的请求时间,用于计算Future是否超时 private long beginTime = System.currentTimeMillis(); public SyncFuture() { } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { if (response != null) { return true; } return false; } // 获取响应结果,直到有结果才返回。 @Override public T get() throws InterruptedException { latch.await(); return this.response; } // 获取响应结果,直到有结果或者超过指定时间就返回。 @Override public T get(long timeout, TimeUnit unit) throws InterruptedException { if (latch.await(timeout, unit)) { return this.response; } return null; } // 用于设置响应结果,并且做countDown操作,通知请求线程 public void setResponse(T response) { this.response = response; latch.countDown(); } public long getBeginTime() { return beginTime; } }
NettyClient: 有消息同步的和异步的方法,具体内容如下:
package com.topinfo.ci.netty.client; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; /** *@Description: Netty客户端 *@Author:杨攀 *@Since:2019年9月26日下午8:54:59 */ @Component public class NettyClient { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class); private EventLoopGroup group = new NioEventLoopGroup(); /** *@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致 */ public static final String DELIMITER = "@@"; /** * @Fields hostIp : 服务端ip */ private String hostIp = "192.168.90.96"; /** * @Fields port : 服务端端口 */ private int port= 8888; /** * @Fields socketChannel : 通道 */ private SocketChannel socketChannel; /** *@Fields clientHandlerInitilizer : 初始化 */ @Autowired private NettyClientHandlerInitilizer clientHandlerInitilizer; /** * @Description: 启动客户端 * @Author:杨攀 * @Since: 2019年9月12日下午4:43:21 */ @SuppressWarnings("unchecked") @PostConstruct public void start() { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) // 指定Channel .channel(NioSocketChannel.class) // 服务端地址 .remoteAddress(hostIp, port) // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输 .option(ChannelOption.SO_KEEPALIVE, true) // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输 .option(ChannelOption.TCP_NODELAY, true) .handler(clientHandlerInitilizer); // 连接 ChannelFuture channelFuture = bootstrap.connect(); //客户端断线重连逻辑 channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()) { LOGGER.info("连接Netty服务端成功..."); }else { LOGGER.info("连接Netty服务端失败,进行断线重连..."); final EventLoop loop =future.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { LOGGER.info("连接正在重试..."); start(); } }, 20, TimeUnit.SECONDS); } } }); socketChannel = (SocketChannel) channelFuture.channel(); } /** *@Description: 消息发送 *@Author:杨攀 *@Since: 2019年9月12日下午5:08:47 *@param message */ public void sendMsg(String message) { String msg = message.concat(NettyClient.DELIMITER); ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8); ChannelFuture future = socketChannel.writeAndFlush(byteBuf); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()) { System.out.println("===========发送成功"); }else { System.out.println("------------------发送失败"); } } }); } /** *@Description: 发送同步消息 *@Author:杨攀 *@Since: 2019年9月12日下午5:08:47 *@param message */ public String sendSyncMsg(String message, SyncFuture<String> syncFuture) { String result = ""; String msg = message.concat(NettyClient.DELIMITER); ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8); try { ChannelFuture future = socketChannel.writeAndFlush(byteBuf); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()) { System.out.println("===========发送成功"); }else { System.out.println("------------------发送失败"); } } }); // 等待 8 秒 result = syncFuture.get(8, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return result; } public String getHostIp() { return hostIp; } public void setHostIp(String hostIp) { this.hostIp = hostIp; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
NettyClientHandlerInitilizer: 初始化
package com.topinfo.ci.netty.client; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; @Component public class NettyClientHandlerInitilizer extends ChannelInitializer<Channel> { /** *@Fields clientHandler : 客户端处理 */ @Autowired private NettyClientHandler clientHandler; @Override protected void initChannel(Channel ch) throws Exception { // 通过socketChannel去获得对应的管道 ChannelPipeline channelPipeline = ch.pipeline(); /* * channelPipeline中会有很多handler类(也称之拦截器类) * 获得pipeline之后,可以直接.addLast添加handler */ ByteBuf buf = Unpooled.copiedBuffer(NettyClient.DELIMITER.getBytes()); channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf)); //channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8)); //channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8)); channelPipeline.addLast(clientHandler); } }
NettyClientHandler: 客户端处理类,实现了接收
package com.topinfo.ci.netty.client; import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.topinfo.ci.netty.service.NettyClientService; import com.topinfo.ci.netty.utils.ExceptionUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; import io.netty.channel.SimpleChannelInboundHandler; @Component @ChannelHandler.Sharable // 标注一个channel handler可以被多个channel安全地共享 public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientHandler.class); @Autowired private NettyClientService service; @Autowired private NettyClient nettyClient; /** * @Description: 服务端发生消息给客户端,会触发该方法进行接收消息 * @Author:杨攀 * @Since: 2019年9月12日下午5:03:31 * @param ctx * @param byteBuf * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { String msg = byteBuf.toString(CharsetUtil.UTF_8); LOGGER.info("客户端收到消息:{}", msg); //service.ackMsg(msg); service.ackSyncMsg(msg); // 同步消息返回 } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("请求连接成功..."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("连接被断开..."); // 使用过程中断线重连 final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { // 重连 nettyClient.start(); } }, 20, TimeUnit.SECONDS); super.channelInactive(ctx); } /** * 处理异常, 一般将实现异常处理逻辑的Handler放在ChannelPipeline的最后 * 这样确保所有入站消息都总是被处理,无论它们发生在什么位置,下面只是简单的关闭Channel并打印异常信息 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 输出到日志中 ExceptionUtil.getStackTrace(cause); Channel channel = ctx.channel(); if (channel.isActive()) { ctx.close(); } } }
NettyClientServiceImpl: 客户端封装实现类, 它接口就不贴出来了。
package com.topinfo.ci.netty.service.impl; import com.topinfo.ci.netty.bean.RealDataInfo; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.topinfo.ci.netty.bean.Message; import com.topinfo.ci.netty.client.NettyClient; import com.topinfo.ci.netty.client.SyncFuture; import com.topinfo.ci.netty.service.NettyClientService; import com.topinfo.ci.netty.utils.AESUtil; @Service public class NettyClientServiceImpl implements NettyClientService { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientServiceImpl.class); //缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存 private static LoadingCache<String, SyncFuture> futureCache = CacheBuilder.newBuilder() //设置缓存容器的初始容量为10 .initialCapacity(100) // maximumSize 设置缓存大小 .maximumSize(10000) //设置并发级别为20,并发级别是指可以同时写缓存的线程数 .concurrencyLevel(20) // expireAfterWrite设置写缓存后8秒钟过期 .expireAfterWrite(8, TimeUnit.SECONDS) //设置缓存的移除通知 .removalListener(new RemovalListener<Object, Object>() { @Override public void onRemoval(RemovalNotification<Object, Object> notification) { LOGGER.debug("LoadingCache: {} was removed, cause is {}",notification.getKey(), notification.getCause()); } }) //build方法中可以指定CacheLoader,在缓存不存在时通过CacheLoader的实现自动加载缓存 .build(new CacheLoader<String, SyncFuture>() { @Override public SyncFuture load(String key) throws Exception { // 当获取key的缓存不存在时,不需要自动添加 return null; } }); @Autowired private NettyClient nettyClient; @Autowired private CacheManager cacheManager; @Override public boolean sendMsg(String text, String dataId, String serviceId) { LOGGER.info("发送的内容:{}", text); //TODO //nettyClient.sendMsg(json); return true; } @Override public String sendSyncMsg(String text, String dataId, String serviceId) { SyncFuture<String> syncFuture = new SyncFuture<String>(); // 放入缓存中 futureCache.put(dataId, syncFuture); // 封装数据 JSONObject object = new JSONObject(); object.put("dataId", dataId); object.put("text", text); // 发送同步消息 String result = nettyClient.sendSyncMsg(object.toJSONString(), syncFuture); return result; } @Override public void ackSyncMsg(String msg) { LOGGER.info("ACK确认信息: {}",msg); JSONObject object =JSON.parseObject(msg); String dataId = object.getString("dataId"); // 从缓存中获取数据 SyncFuture<String> syncFuture = futureCache.getIfPresent(dataId); // 如果不为null, 则通知返回 if(syncFuture != null) { syncFuture.setResponse(msg); } } }
TestController: 测试TestController。
package com.topinfo.ci.netty.controller; import com.alibaba.fastjson.JSON; import com.topinfo.ci.netty.bean.CmwSensoralert; import com.topinfo.ci.netty.bean.Equip; import com.topinfo.ci.netty.bean.JsonResult; import com.topinfo.ci.netty.bean.RealDataInfo; import com.topinfo.ci.netty.mapper.SensorAlertMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.topinfo.ci.netty.service.NettyClientService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @RestController @RequestMapping("/test") public class TestController { @Autowired private NettyClientService clientService; @Autowired private SensorAlertMapper sensorAlertMapper; @RequestMapping("/sendSyncMsg") public String sendSyncMsg(String dataId, String text) { String serviceId = "mmmm"; String result = clientService.sendSyncMsg(text, dataId, serviceId); return "result:"+result ; } }
测试,完美实现了“请求-响应”的效果。
package com.topinfo.ju.ccon.netty.server; import java.net.InetSocketAddress; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @Component public class NettyServer { private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class); /** *@Fields DELIMITER : 自定义分隔符,服务端和客户端要保持一致 */ public static final String DELIMITER = "@@"; /** * @Fields boss : boss 线程组用于处理连接工作, 默认是系统CPU个数的两倍,也可以根据实际情况指定 */ private EventLoopGroup boss = new NioEventLoopGroup(); /** * @Fields work : work 线程组用于数据处理, 默认是系统CPU个数的两倍,也可以根据实际情况指定 */ private EventLoopGroup work = new NioEventLoopGroup(); /** * @Fields port : 监听端口 */ private Integer port = 8888; @Autowired private NettyServerHandlerInitializer handlerInitializer; /** * @throws InterruptedException * @Description: 启动Netty Server * @Author:杨攀 * @Since: 2019年9月12日下午4:21:35 */ @PostConstruct public void start() throws InterruptedException { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work) // 指定Channel .channel(NioServerSocketChannel.class) // 使用指定的端口设置套接字地址 .localAddress(new InetSocketAddress(port)) // 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 .childOption(ChannelOption.SO_KEEPALIVE, true) // 将小的数据包包装成更大的帧进行传送,提高网络的负载,即TCP延迟传输 .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(handlerInitializer); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { LOGGER.info("启动 Netty Server..."); } } @PreDestroy public void destory() throws InterruptedException { boss.shutdownGracefully().sync(); work.shutdownGracefully().sync(); LOGGER.info("关闭Netty..."); } }
package com.topinfo.ju.ccon.netty.server; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; @Component public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> { /** *@Fields serverHandler : 服务处理 */ @Autowired private NettyServerHandler serverHandler; @Override protected void initChannel(Channel ch) throws Exception { // 通过socketChannel去获得对应的管道 ChannelPipeline channelPipeline = ch.pipeline(); /* * channelPipeline中会有很多handler类(也称之拦截器类) * 获得pipeline之后,可以直接.addLast添加handler */ ByteBuf buf = Unpooled.copiedBuffer(NettyServer.DELIMITER.getBytes()); channelPipeline.addLast("framer", new DelimiterBasedFrameDecoder(1024*1024*2, buf)); //channelPipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8)); //channelPipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8)); // 自定义解码器,粘包/拆包/断包 channelPipeline.addLast(serverHandler); } }
package com.topinfo.ju.ccon.netty.server; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.topinfo.ju.ccon.netty.bean.Message; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; @Component @ChannelHandler.Sharable //标注一个channel handler可以被多个channel安全地共享 public class NettyServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class); public static AtomicInteger nConnection = new AtomicInteger(0); @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { String txt = msg.toString(CharsetUtil.UTF_8); LOGGER.info("收到客户端的消息:{}", txt); ackMessage(ctx, txt); } /** *@Description: 确认消息 *@Author:杨攀 *@Since: 2019年9月17日上午11:22:27 *@param ctx *@param message */ public void ackMessage(ChannelHandlerContext ctx, String message) { //自定义分隔符 String msg = message+NettyServer.DELIMITER; ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8); //回应客户端 ctx.writeAndFlush(byteBuf); } /** *@Description: 每次来一个新连接就对连接数加一 *@Author:杨攀 *@Since: 2019年9月16日下午3:04:42 *@param ctx *@throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { nConnection.incrementAndGet(); LOGGER.info("请求连接...{},当前连接数: :{}", ctx.channel().id(),nConnection.get()); } /** *@Description: 每次与服务器断开的时候,连接数减一 *@Author:杨攀 *@Since: 2019年9月16日下午3:06:10 *@param ctx *@throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { nConnection.decrementAndGet(); LOGGER.info("断开连接...当前连接数: :{}", nConnection.get()); } /** *@Description: 连接异常的时候回调 *@Author:杨攀 *@Since: 2019年9月16日下午3:06:55 *@param ctx *@param cause *@throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); // 打印错误日志 cause.printStackTrace(); Channel channel = ctx.channel(); if(channel.isActive()){ ctx.close(); } } }
核心代码基本就这些,希望对大家有帮助。