前面两节我们介绍了学习Netty的准备知识,分别是:
如果大家没有看过这两节的话, 推荐先去看看, 因为这两节是基础知识
Netty 的介绍,优点,特性等已经在第一节中介绍过,这里不再重复,接下来我们先给大家看一个 Netty 的小例子, 直接通过完整的例子让大家对 Netty 有一个清晰的认识:
案例说明:
服务端接收客户端发送的消息
我们先创建项目, 配置环境
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.50.Final</version> </dependency> 复制代码
基于 maven
的方式引入 netty
,相信不用再多说什么
public class IMNettyServer { public static final String HOST = "127.0.0.1"; public static final int PORT = 45882; public static void main(String[] args) { new IMNettyServer().start_server(); } private void start_server() { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() // 反应器组 .group(boss, work) // 绑定端口 .localAddress(PORT) // NIO类型的通道 .channel(NioServerSocketChannel.class) // 通道的参数 .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 装配子通道流水线 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加处理器到流水线中 socketChannel.pipeline().addLast(new IMNettyServerHandler()); } }); ChannelFuture future = bootstrap.bind().sync(); System.out.println("服务器启动成功, 监听端口:" + future.channel().localAddress()); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { work.shutdownGracefully(); boss.shutdownGracefully(); } } } 复制代码
@ChannelHandler.Sharable public class IMNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端通道成功注册"); super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端监听者:" + ctx.channel().localAddress()); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("读取数据"); ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.getBytes(0, bytes); System.out.println("客户端:" + new String(bytes)); super.channelRead(ctx, msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("通道缓存区已经读完"); super.channelReadComplete(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端" + ctx.channel().localAddress() + "断开"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("发生了异常"); super.exceptionCaught(ctx, cause); } } 复制代码
大家看到上面的代码, 估计很多都会蒙圈, 别着急, 我们看完客户端的代码之后, 一点一点的剖析它们
public class IMNettyClient { public static void main(String[] args) { new IMNettyClient().connect(); } private void connect() { NioEventLoopGroup work = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() // 反应器组 .group(work) // 通道类 .channel(NioSocketChannel.class) // 连接服务端 .remoteAddress(IMNettyServer.HOST, IMNettyServer.PORT) // 设置 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) // 装配通道流水线 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加处理器到流水线 socketChannel.pipeline().addLast("read", new ImNettyClientHandler()); } }); ChannelFuture sync = bootstrap .connect() .sync(); sendMsg(sync); } catch (InterruptedException e) { e.printStackTrace(); } } private void sendMsg(ChannelFuture sync) { Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); ByteBuf buf = sync.channel().alloc().buffer(); buf.writeBytes(msg.getBytes()); sync.channel().writeAndFlush(buf); } } } 复制代码
从代码上, 我们可以看到, 客户端的编码和服务端编码大同小异, 区别就在于 Netty 各个组件, 下面我们就来剖析这个小案例
前面已经说过, Netty 是基于 Reactor反应器模式 设计出来的高性能,高可扩展的为网络服务器和客户端程序而提供的异步事件驱动基础框架和工具, 那么我们看看在 Netty 中式如果体现的
回顾 反应器模式 的处理流程
通道注册选择器,并指定对应的IO事件
一个反应器负责一个线程,不断轮询,查询选择器中的IO事件
将查询到的IO事件,分发给于IO事件有绑定关系的Handler业务处理器
通道是 Netty 中非常重要的组件, 从上面的流程来看,大家肯定也明白:
所以我们首先来了解 Netty 中的通道组件
Netty 不直接使用 NIO 的通道组件,而是 Netty 针对不同的通信协议,对每一种通信协议都进行了自己的封装,而且 Netty 不仅仅只是支持异步,还对 标准阻塞式IO 进行了封装
所以 Netty 中的每一种协议的通道,都有 异步IO 和 同步IO 两种版本,
不过在 Netty 4.x
中, 同步IO
被标注为过时类,所以就不介绍他们了,大家可以去看代码,同步IO都是以 Oio
开头的类
常见的 Netty通道 如下所示:
异步非阻塞socket服务端监听通道,在上面服务端代码中已经用到
异步非阻塞socket客户端监听通道,在上面客户端代码中已经用到
可以说, 上面两个通道是我们在Netty开发TCP协议最常用的通道
异步非阻塞UDP传输通道
可以看到,在 Netty
中通过调用 channel()
方法,传入指定通道类,就可以了
// 指定客户端通道 channel(NioSocketChannel.class); // 指定服务端通道 channel(NioServerSocketChannel.class); 复制代码
在 NIO版Reactor反应器模式 中,反应器会负责事件处理线程,不断轮询,通过 Selector选择器 查询注册的IO事件,而在 Netty 中,也是有这样的存在,那就是: EventLoopGroup
该类是一个接口类,我们在 Netty
中主要使用其实现类: NioEventLoopGroup
Netty中的反应器模式肯定是多线程版本的,所以
NioEventLoopGroup是多线程版本中的Reactor反应器模式的实现类,除了包含IO事件外, NioEventLoopGroup 还存在一个重要属性: Thread线程类成员:用于指定内部的线程数 ,类似于线程池的概念,其内部执行思路和之前是一致的:
一个 NioEventLoop 拥有一个Thread线程,负责一个选择器的IO事件轮询
在上面的代码中,我们可以看到我们采用的是无参构造方式,
NioEventLoopGroup work = new NioEventLoopGroup(); 复制代码
猜一猜,无参的构造函数内部的线程数是多少?
无参的构造函数的内部线程数为最大可用的CPU处理器数量的2倍
查看上面服务端代码,我们为服务端提供了两个 NioEventLoopGroup
NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup work = new NioEventLoopGroup(); group(boss, work) 复制代码
我们可以这样理解,
说完了反应器,我们来聊一聊处理器
之前我们讲到,可供选择器监控的IO事件类型包括:
SelectionKey.OP_READ
SelectionKey.OP_WRITE
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
NioEventLoop 反应器内部有一个选择器执行以上事件的查询,然后进行事件的分发,目的地就是我们定义的Handler处理器
childHandler()
来装配定义的处理器 childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 自定义处理器 socketChannel.pipeline().addLast(new IMNettyServerHandler()); } }); 复制代码
handler()
来装配定义的处理器 handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 自定义处理器 socketChannel.pipeline().addLast("read", new ImNettyClientHandler()); } }); 复制代码
先聊处理器,随后再聊 流水线 的问题
所有的业务处理都是在处理器中完成,在 Netty 中,处理器分为两大类, 一类是 入站处理器 ,另一类是 出站处理器
在 Netty
中,入站处理器是由 Netty
从通道底层触发,通过层层传递,然后调用 ChannelInboundHandler
进行的逻辑处理。基本上所有的业务处理都是通过入站处理器来进行处理的
单一句话估计看不懂,我们通过具体的例子来说明:以 OP_READ 事件为例
OP_READ
事件后,会被 NioEventLoop
查询到 ChannelInboundHandler
入站处理器,调用其中的方法 channelRead()
channelRead()
中,我们可以从通道中读取到数据,进行业务逻辑操作 从上面的处理过程中我们可以看到, Netty的入站处理器 的触发方向: 从底层通道到ChannelInboundHandler入站处理器
ChannelInboundHandler
是一个接口类,在 Netty
中,我们一般使用其子类 ChannelInboundHandlerAdapter
其中几个重要的方法,我们来一一看看
channelRegistered
当有客户端连接进来,会触发此方法
channelActive
当有客户端连接成功后,会触发此方法,我们可以通过此方法监控客户端连接地址,在线人数等等业务功能
channelRead
当通道缓冲区可读,会触发通道可读事件,在此方法中,我们可以获取通道缓冲区的数据
channelReadComplete
当通道缓冲区可读完后, Netty 会触发通道读取完成事件
channelInactive
当连接被断开或者不可用, Netty 会触发此方法,我们可以在此方法中做用户退出连接等业务功能
exceptionCaught
当通道处理过程中发生异常, Netty 会触发异常捕获事件
接下来我们看下各个方法的执行顺序,请看下面的代码,将其中的方法都重写,通过输出查看执行顺序
public class IMNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端通道成功注册"); super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端监听者:" + ctx.channel().localAddress()); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("读取数据"); super.channelRead(ctx, msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("通道缓存区已经读完"); super.channelReadComplete(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端" + ctx.channel().localAddress() + "断开"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("发生了异常"); super.exceptionCaught(ctx, cause); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("生命周期方法:add"); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("生命周期方法:remove"); super.handlerRemoved(ctx); } } 复制代码
大家可以猜一猜
生命周期方法:add 客户端通道成功注册 客户端监听者:/127.0.0.1:45882 复制代码
读取数据 客户端:aaa 通道缓存区已经读完 读取数据 客户端:asdf 通道缓存区已经读完 复制代码
其中, 这两者方法是会多次执行的,只要有数据发送过来就会执行到
通道缓存区已经读完 发生了异常 客户端/127.0.0.1:45882断开 生命周期方法:remove 复制代码
因为我是强制关掉客户端的,所以会触发异常
了解完了 入站处理器 ,那么我们再来看看出站处理器
出站处理器表示的是 ChannelOutboundHandler
到通道的某次IO操作,也就是说,在应用程序完成业务处理后,可以通过该处理器将处理结果写入底层通道,然后发送给另外一端。最常用的一个方法就是 write()
所以出站处理器的触发方向为: Netty上层的通道,去操作底层Java IO通道
ChannelOutboundHandler
是一个接口类,在 Netty
中,我们一般使用其子类 ChannelOutboundHandlerAdapter
其中几个重要的方法,我们来一一看看
bind
监听地址绑定:完成底层 Java IO通道的IO地址绑定。如果是TCP传输协议,方法用于服务端
connect
连接服务端,完成服务端的连接操作。如果是TCP协议,方法用于客户端
write
写数据到底层,完成 Netty 通道向底层Java IO通道的数据写入。 此方法只是触发操作,并不是完成实际的数据写入操作, 后面我们学到编解码器,我们就能真正明白这个意思:一般我们可以将版本号,魔数,消息长度等信息写在这里
flush
刷新数据,将缓冲区的数据写到对端
read
从底层读取数据
disConnect
断开服务端连接,如果TCP协议,方法用于客户端
close
主动关闭通道
说明:实在话,我使用出站处理器使用到的地方略少,或者说我对出站处理器的使用也是一知半解,我就不多说,评论区给大家开放,大家可以尽情发言,我们一起来讨论
下面我们来看看绑定通道和Handler处理器之间关系的特殊组件
我们来梳理下 Netty的反应器模式 中,各个组件之间的关系:
在这种情况下,为了能够很好的协调各个组件,保证应用程序的正常运行, Netty 为我们提供了一个特殊的组件: ChannelPipline ,我们叫它 流水线
因为它像一条管道,将绑定到一个通道的多个Handler处理器串在一起,所有被添加进来的Handler处理器都是这条管道上的节点,就好像是工厂里的流水作业
ChannelPipline被设计成一个 双向链表 的结构,可以支持动态添加、删除Handler业务处理器,比如
addLast
addFirst
remove
...
在流水线中,入站处理器和出站处理器的执行顺序是不同的
入站处理器的顺序是从前往后,按照我们在流水线中添加处理器的顺序来执行的, 比如
socketChannel.pipeline().addLast(new InHandlerA()); socketChannel.pipeline().addLast(new InHandlerB()); socketChannel.pipeline().addLast(new InHandlerB()); 复制代码
那么,他们在流水线中的结构是 A --> B --> C 的结构,执行顺序也就是 A --> B --> C
出站处理器的顺序是从后往前,按照我们在流水线中添加处理器顺序的倒序来执行,比如还是上面的添加方式,不过是出站处理器
socketChannel.pipeline().addLast(new OutHandlerA()); socketChannel.pipeline().addLast(new OutHandlerB()); socketChannel.pipeline().addLast(new OutHandlerB()); 复制代码
他们在流水线中的结构是 A --> B --> C 的结构,但是他们是从后往前来执行的,所以执行顺序是 C --> B --> A
大家可以亲自验证下
每一个来自通道的IO事件,都会进入流水线中处理器,那么处理器在处理的过程中会遇到3中情况
上面第三种情况说到,我们可以手动截断流水线的传递,那么我们来看看如何截断,
用入站处理器来说明:
public class IMNettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("读取数据"); // super.channelRead(ctx, msg); } } 复制代码
每一个重写的方法,我们都会调用父类的方法,如果我们 不调用父类的方法 ,那么流水线将终止向下传递
那么,相信大家还记得上面的代码, Netty 是如何向流水线中装配处理器
// 服务端 childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IMNettyServerHandler()); } }); // 客户端 handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 这里的代码和服务端的Handler基本一样, 就不再展示 socketChannel.pipeline().addLast("read", new ImNettyClientHandler()); } }); 复制代码
这里涉及到一个类: ChannelInitializer
,也叫 通道初始化处理器
,这里只要我们实现 initChannel()
方法,得到 新接收的通道(参数)
,我们就可以将处理器装配到流水线之中
Bootstrap是 Netty 为我们提供的一个便利的工厂类,通过这个类可以来完成 Netty 的客户端或服务端的组件组装以及程序的初始化。
Netty 为我们提供了两个启动类,且提供了非常方便的链式调用的方式
Bootstrap
客户端启动类
ServerBootstrap
服务端启动类
从上面的代码中我们可以看到,我们通过 option()
对服务端通道或者客户端通道设置了一系列选项,下面我们来看一些常用的选项
此为TCP参数,用来设置每个TCP socket在内核中的发送缓冲区和接收缓冲区的大小
此为TCP参数,表示底层TCP协议的心跳机制。true为连接保持心跳,默认为false
此为TCP参数,表示服务器端接收连接的队列长度,如果队列满,客户端拒绝连接。
window默认为200,其他操作系统为128.
此为TCP参数,表示设置广播模式
定义ByteBuf的实例方式,下一节我们介绍 ByteBuf
到此,有关 Netty 的基础知识就全部完成,代码上其实就是一些固定写法,重要是要理解 Netty 的模式等
关于上面的点,有什么写的不好的,或者写的有问题的,欢迎大家指正出来,以上都是最基础的知识点,更深入的知识我们一起学习,也欢迎大家来跟我一起讨论。
下面的是我用 Hexo + NexT 新搭建的个人知识体系源,最新内容都会先放到这里, 欢迎大家来访
最基础,什么都没有,O(∩_∩)O哈哈~
我的知识体系总结