最近正在学习netty,跟着教程写了一个基于WebSocket的网页聊天室,对netty有了一定的了解,现在正好项目使用到长连接,选用了netty。
项目目标:客户端A(网页)和服务端通过WebSocket进行通信,客户端B和服务端通过Socket通信,把客户端B的数据传输到客户端A,桥梁为服务端
Socket服务端监听8090端口,长连接服务端监听8089端口,客户端A连接到8089端口,客户端B连接到8090端口
由于是需要对两个端口数据进行不同处理,所以我们创建两个 ServerBootstrap
,分别绑定两个端口,一个 ServerGzhBootstrap
处理客户端B和服务端的socket通信; ServerWxQBootstrap
处理客户端A和服务端之间的WebSocket长连接通信
ServerInitializer
,实现 ChannelInitializer
,负责初始化客户端B和服务端通信的处理器Handler WebSocketChannelInitializer
,实现 ChannelInitializer
,负责初始化客户端A和服务端长连接通信的处理器Handler ServerInitializer
添加一个自定义 SimpleChannelInboundHandler
负责处理客户端B和服务端socket通信 WebSocketChannelInitializer
添加一个自定义 SimpleChannelInboundHandler
负责处理客户端A和服务端WebSocket长连接通信 网页聊天室作为客户端A,客户端B通过Socket通信并接收控制台的输入作为通信数据传递给服务端,服务端再传递给客户端A
问题:
netty中SimpleChannelInboundHandler类的泛型中指定了传入的消息的类型,只能接收这种类型的消息,客户端B发送的String类型消息与客户端A接收的TextWebSocketFrame类型不同,客户端A无法接收。
解决方法:
我们把客户端B发送的String类型消息在Socket服务端接收到,要将其发送给客户端A(需要将其封装成TextWebSocketFrame类型才能发送给客户端A),而且我们就必须要有客户端A的channel,我们才可以调用 writeAndFlush
方法把数据写入客户端A
那就是 ChannelGroup
,我们定义一个类保存全部Channel客户端作为全局ChannelGroup,每次有客户端Channel创建( handlerAdded
方法),我们就把它保存到该全局ChannelGroup中,每次channel使用完毕,ChannelGroup会为我们自动删除其中无用的channel,这样我们就可以获取所有的客户端channel
客户端A和客户端B很大一个区别就是端口号,我们可以通过端口号来判断是客户端A还是客户端B
public class GlobalChannelGroup { public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }
public class Server { public static void main(String[] args) throws InterruptedException { //两个事件循环组 boss获取连接发送 worker接收处理 EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //server启动器 ServerBootstrap serverWxQBootstrap = new ServerBootstrap(); ServerBootstrap serverGzhBootstrap = new ServerBootstrap(); // 定义组 // channel(反射) // 定义处理器(自定义):连接上channel后执行init System.out.println("启动server"); serverGzhBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()); serverWxQBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new WebSocketChannelInitializer()); //绑定端口,同步 ChannelFuture wxq = serverGzhBootstrap.bind(8090).sync(); ChannelFuture gzh = serverWxQBootstrap.bind(8089).sync(); gzh.channel().closeFuture().sync(); wxq.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
public class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));//用于解码 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));//用于编码 pipeline.addLast(new ServerHandler());//自定义处理器 } }
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+","+msg); ctx.channel().writeAndFlush("消息已经进入数据库,正在赶往微信墙!"); GlobalChannelGroup.channelGroup.forEach(o->{ //如果端口以8089结尾,说明这个channel是客户端A if (o.localAddress().toString().endsWith("8089")){ TextWebSocketFrame text = new TextWebSocketFrame(o.remoteAddress() + "发送消息:" + msg + "/n"); o.writeAndFlush(text); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+":连接到微信墙模式成功!"); int size = GlobalChannelGroup.channelGroup.size(); System.out.println("当前微信墙连接数:"+(size==0?0:size-1)); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); GlobalChannelGroup.channelGroup.add(channel); } }
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); //用于将http数据聚合到一起发送一个请求 fullHttpRequest pipeline.addLast(new HttpObjectAggregator(8192)); pipeline.addLast(new WebSocketServerProtocolHandler("/"));//传入websocket path pipeline.addLast(new TextWebSocketHandler());//传入websocket path } }
public class TextWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { int size = GlobalChannelGroup.channelGroup.size(); System.out.println("当前微信墙连接数:"+(size==0?0:size-1)); System.out.println("收到消息:"+msg.text()); Channel channel = ctx.channel(); GlobalChannelGroup.channelGroup.forEach(o->{ if (o.localAddress().toString().endsWith("8090")){ o.writeAndFlush(msg.text()); }else { TextWebSocketFrame text = new TextWebSocketFrame(o.remoteAddress() + "发送消息:" + msg.text() + "/n"); o.writeAndFlush(text); } }); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); GlobalChannelGroup.channelGroup.add(ch); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress()+":离开聊天室"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel ch = ctx.channel(); System.out.println(ch.remoteAddress()+":连接到聊天室"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常"); ctx.close(); } }
客户端把控制台的标准输入作为参数传入,创建客户端channel时将其发送
public class GzhClient { public static void main(String[] args) { EventLoopGroup eventExecutors = null; ChannelFuture channelFuture = null; try{ // while (true) { eventExecutors = new NioEventLoopGroup(); Scanner scanner = new Scanner(System.in); String json = scanner.nextLine(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors).channel(NioSocketChannel.class) .handler(new GzhClientInitializer(json)); System.out.println("启动客户端"); channelFuture = bootstrap.connect("localhost", 8090).sync(); // } // channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } }
public class GzhClientInitializer extends ChannelInitializer<SocketChannel> { private String json; public GzhClientInitializer(String json){ this.json = json; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new LengthFieldPrepender(4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new GzhClientHandler(json)); } }
public class GzhClientHandler extends SimpleChannelInboundHandler<String> { private String json; public GzhClientHandler(String json){ this.json = json; } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("client receive:"+msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("进入微信墙模式,清发消息:"); ctx.writeAndFlush(json); } }
服务端接收到消息,打印出来并准备转发给客户端A(也就是网页聊天室)
网页聊天室接收到服务端发送的消息