转载

初识Netty原理

创建一个快速开始的Maven项目,导入Netty4.0版本的依赖(我的JDK是1.8,官方建议1.6以上)。Netty依赖如下:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.6.Final</version>
</dependency>
复制代码

这里使用了门面日志框架Slf4j,所以需要引入slf4j和日志实现框架logback。

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
复制代码

服务端程序

先上代码,代码里已有详细注释,具体的介绍后面再说。

public class NettyDiscardServer {
	private static final Logger logger = LoggerFactory.getLogger(NettyDiscardServer.class);

	final int port;
    // 创建一个服务器端的启动器
	ServerBootstrap sb = new ServerBootstrap();

	NettyDiscardServer(int port) {
		this.port = port;
	}

	public void runServer() {
		// 创建反应器线程组
		// 开启一个线程(其实就是创建一个反应器类的对象)用于父通道,负责新连接的监听
		EventLoopGroup boss = new NioEventLoopGroup(1);
		// 使用不带参数的构造函数,默认线程数是CPU核数 * 2
		// 开启一个线程池用于子通道,负责通道的IO事件的处理
		EventLoopGroup workers = new NioEventLoopGroup();

		try {
			// 设置反应器的线程组,一个父线程组,一个子线程组
			sb.group(boss, workers)
					// 设置NIO类型的通道
					.channel(NioServerSocketChannel.class)
					// 设置服务端监听的端口
					.localAddress("127.0.0.1", port)
					// 设置通道的参数
                    // SO_KEEPALIVE指打开TCP心跳检测
					.option(ChannelOption.SO_KEEPALIVE, true)
                    // Netty的内存在堆外直接内存上分配,可避免字节缓冲区的二次拷贝
					.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT)
                    // 装配子通道流水线
					.childHandler(new ChannelInitializer<SocketChannel>() {
                                // 当有新连接到达时会创建一个通道
								protected void initChannel(SocketChannel ch) {
									// 向子通道流水线添加第一个Handler处理器
									ch.pipeline().addLast(new NettyDiscardHandler());
                                    // 向子通道流水线添加第二个Handler处理器
									ch.pipeline().addLast(new NettyDiscardHandler1());
								}
							});
			// 开始绑定服务器
			// 调用sync同步方法阻塞直到绑定成功
			ChannelFuture channel = sb.bind().sync();
			logger.info("服务器启动成功,监听端口:" + channel.channel().localAddress());
			// 服务器监听通道会一直等待 通道关闭的异步任务结束
			ChannelFuture close = channel.channel().closeFuture();
			// 同步关闭
			close.sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 释放所有使用的线程,释放所有资源(包括关闭通道和选择器)
			boss.shutdownGracefully();
			workers.shutdownGracefully();
		}
	}

	public static void main(String[] args) {
		new NettyDiscardServer(8888).runServer();
	}
}
复制代码

业务处理器

在Netty的反应器模式中,所有的业务处理都在Handler处理器中完成。这里写一个简单的入站处理(读取消息不回复),以及演示简单的通道流水线处理。这里两个类我都是做为上面服务器类的内部类实现的。

// 继承netty默认实现的入站处理器,在内部实现自己的业务逻辑
    static class NettyDiscardHandler extends ChannelInboundHandlerAdapter {
        // 读取缓冲区中的数据
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg)
				throws Exception {
            // 将数据转为ByteBuf(类似Java NIO中的ByteBuffer)类来处理
			ByteBuf buf = (ByteBuf) msg;
			try {
				logger.info("收到消息 : ");
                // 循环读取缓冲区中的字符
				while (buf.isReadable()) {
					System.out.print((char) buf.readByte());
				}
				System.out.println();
			} finally {
				// 通过 通道处理器上下文 将消息传播到下一个处理器节点
				ctx.fireChannelRead(msg);
			}
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
				throws Exception {
			// 遇到异常时关闭连接
			cause.printStackTrace();
			ctx.close();
		}
	}

	static class NettyDiscardHandler1 extends ChannelInboundHandlerAdapter {
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg)
				throws Exception {
			logger.info("我是第二个处理器");
            // 抛弃消息
            ReferenceCountUtil.release(msg);
		}
	}
复制代码

简单的客户端程序

实现一个简单的想服务器发送消息的客户端。

public class NettyDiscardClient {
	public static void main(String[] args) throws IOException {
		SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8888));
		channel.configureBlocking(false);
		// 自旋,等待连接完成
		while (!channel.finishConnect());
        // 向服务端发送一个消息
		ByteBuffer buf = ByteBuffer.allocate(1024);
		buf.put("hello world!".getBytes());
		buf.flip();
		channel.write(buf);
		channel.close();
	}
}
复制代码

运行服务器和客户端。其中可以看到消息被两个处理器处理了,这就是通道的流水线处理。

[main] INFO discardServer.NettyDiscardServer - 服务器启动成功,监听端口:/127.0.0.1:8888
[nioEventLoopGroup-3-1] INFO discardServer.NettyDiscardServer - 收到消息 : hello world!
[nioEventLoopGroup-3-1] INFO discardServer.NettyDiscardServer - 我是第二个处理器
复制代码

服务器启动代码分析

Netty提供了一个非常便利的工厂类Bootstrap,可以用它来方便的完成netty组件的组装和配置。

父子通道

在Netty中,将有接收关系的NioServerSocketChannel和NioSocketChannel,叫做父子通道。

  • NioServerSocketChannel负责服务器端新连接的监听和接收,叫做父通道。
  • NioSocketChannel负责通道间数据的传输,叫做子通道。

EventLoopGroup线程组

在Netty中,一个EventLoop就是一个反应器。由于Netty是多线程版本的反应器模式,所以使用EventLoopGroup来实现多线程版本的反应器。

EventLoopGroup一般常用两个构造函数,一个是带参数的(用于指定线程数),一个是不带参数的。下面来看一下内部源码。

// 带参数,直接指定线程数
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
// 不带参数,内部调用带参数的构造函数,传入的参数为0
public NioEventLoopGroup() {
    this(0);
}
复制代码

对于无参的构造函数,传入的线程数居然是0,带着疑问继续往源码里钻。经过层层调用,最后找到了,是调用了父类MultithreadEventLoopGroup的构造函数。

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
复制代码

可以发现,如果传入的线程数为0,则把默认的DEFAULT_EVENT_LOOP_THREADS赋值给线程数nThreads。这个DEFAULT_EVENT_LOOP_THREADS是多少呢,在父类中找找看。

static {
    // 将CPU核数 * 2 作为默认线程数
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    // 如果日志开启了debug模式,则打印出开启的线程数
    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}
复制代码

通过源码可以发现,如果传入一个无参的构造函数,则默认开始CPU核数 * 2个EventLoop线程。

另外,开篇的服务器端代码中,创建了两个线程组实例。其中一个用于负责新连接的监听和连接,查询父通道的IO事件(这里用带参数的构造函数创建了一个线程)。还有一个线程组用于负责查询所有子通道的IO事件,并执行Handler处理器中的业务处理。

Bootstrap的启动流程

Bootstrap可以方便的组装和配置Netty的组件。下面介绍一下流程。

  1. 创建一个服务器端的启动器ServerBootstrap。
ServerBootstrap sb = new ServerBootstrap();
复制代码
  1. 创建反应器线程组,并赋值给ServerBootstrap。
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup workers = new NioEventLoopGroup();
sb.group(boss, workers);  // 对应的方法原型 ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
复制代码
  1. 设置通道的IO类型,这里服务器端一般使用NioServerSocketChannel,也就是Java NIO类型。
sb.channel(NioServerSocketChannel.class) // 对应的方法原型 B channel(Class<? extends C> channelClass)
复制代码
  1. 设置监听的地址和端口。
sb.localAddress("127.0.0.1", port) // 对应的方法原型 B localAddress(String inetHost, int inetPort)
复制代码
  1. 设置传输通道的配置选项,这里是给父通道接收连接通道设置一些选项。对于ChannelOption常量的分析。参考 Netty支持的ChannelOption分析
sb.option(ChannelOption.SO_KEEPALIVE, true)
sb.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) // 对应的方法原型 B option(ChannelOption<T> option, T value)
复制代码
  1. 装配子通道的Pipeline流水线,通过调用childHandler方法,传入一个ChannelInitializer通道初始化类的实例(这个实例的泛型需要与前面的启动器中设置的通道类型对应)。在父通道成功接收一个连接,并创建成功一个子通道后,就会初始化子通道,这个实例就会被调用。在这个实例中有一个initChannel初始化方法,用于向子通道流水线添加业务处理器。
sb.childHandler(new ChannelInitializer<SocketChannel>() {
	protected void initChannel(SocketChannel ch) {
	    ch.pipeline().addLast(new NettyDiscardHandler());
		ch.pipeline().addLast(new NettyDiscardHandler1());
	}
});
复制代码
  1. 绑定服务器新连接的监听端口。内部通过调用一个doBind(final SocketAddress localAddress)创建一个之前设置的新连接通道类型实例,并将这个通道实例和地址端口绑定。这里调用了同步阻塞方法,阻塞到地址端口绑定成功为止,服务器正式启动啦。
ChannelFuture channel = sb.bind().sync();
复制代码
  1. 自我阻塞,直到通道关闭。当通道关闭时,closeFuture实例的sync方法会返回。
// 获取一个closeFuture实例
ChannelFuture close = channel.channel().closeFuture();
// 自我阻塞,直到通道关闭,就返回。
close.sync();
复制代码
  1. 关闭EventLoopGroup。同时会关闭内部的反应器线程以及选择器和所有子通道。并释放掉底层的资源。
boss.shutdownGracefully();
workers.shutdownGracefully();
复制代码

Netty中的反应器模式

反应器模式中的IO处理流程

在Reactor模式中的IO处理流程图如下: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XWq76zbM-1575900038358)( i.loli.net/2019/12/09/… )]

  1. 通道注册 :首先需要将通道注册到选择器中,通道中就绪的IO事件才能被选择器查询到。 2. 查询选择 :一个反应器对应一个线程,不断的轮询查找选择器中的IO就绪事件。
  2. 事件分发 :如果查询到就绪的IO事件,就分发给绑定在对应通道上的处理器去处理。
  3. 业务处理 :完成真正的IO操作和业务处理。

Netty中的Channel组件

Netty中没有使用Java NIO的Channel,而是实现了自己的Channel通道组件。由于Netty还能处理OIO,所以也封装了对应OIO通道组件。

但是在Netty中使用最多的还是NioSocketChannel(异步非阻塞TCP Socket传输通道),NioServerSocketChannel(异步非阻塞TCP Socket服务器端监听通道)。

其中有个好玩的通道就是EmbeddedChannel(嵌入式通道),这个通道主要是用于单元测试的(它可以让开发人员在开发中方便的模拟入站与出站的操作,而不进行底层实际的传输)。因为Netty实际开发中主要就是开发业务处理器,但是对于业务处理器的测试需要用到客户端和服务器的操作,每次测试都要开启两个程序测试很麻烦,所以就有了EmbeddedChannel。除了不进行传输之外,其它处理流程和真实的传输通道的是一样的。

其中主要的API如下:

名称 说明
writeInbound (Object... msgs) 向通道中写入inbound入站数据,模拟通道收到客户端的数据
readInbound() 从EmbeddedChannel通道中读取入站数据,也就是返回入站处理器处理完的数据,如果没有数据,返回NULL
writeOutbound (Object... msgs) 向通道中写入outbound出站数据,模拟通道发送数据到客户端。
readOutbound() 从EmbeddedChannel通道中读取出站数据,类似readInbound。
flush() 结束EmbeddedChannel,它会调用通道的close方法,并释放所有数据的底层资源。

Netty中的Reactor反应器

Netty中的反应器有多个实现类,对应于Channel通道。NioSocketChannel对应的反应器类是NioEventLoop。

NioEventLoop有两个重要成员,一个是Thread线程类的成员,一个是Java NIO选择器的成员。

一个NioEventLoop拥有一个Thread线程,负责一个Java NIO选择器的IO事件轮询。但是一个NioEventLoop反应器可以对应多个NioSocketChannel通道。

Netty中的Handler处理器

Netty中有两大类的Handler处理器。他们都继承与ChannelHandler接口。

  • ChannelInboundHandler入站处理器,最常用的channelRead用于从通道中读取数据并处理。对应的Netty默认实现为ChannelInboundHandlerAdapter入站处理适配器。
  • 是ChannelIOutboundHandler出站处理器,最常用的channelWrite用于把数据写入通道中。对应的Netty默认实现为ChannelOutboundHandlerAdapter出站处理适配器。

Handler处理器的生命周期:添加处理器到流水线 -> 注册到流水线中 -> 通道激活后回调 -> 入站方法回调 -> 底层连接关闭 -> 移除注册 -> 从流水线中删除。

Netty的Pipeline流水线

在Netty中通道和处理器实例的关系为一对多。这里就用到了通道流水线(ChannelPipeline)。一个通道(有一个pipeline成员)拥有一个通道流水线。

通道流水线ChannelPipeline基于责任链模式设计的,内部是一个双向链表结构,每一个节点对应一个ChannelHandlerContext处理器上下文对象(处理器添加到流水线时,会创建一个通道处理器上下文,里面包裹了一个ChannelHandler处理器对象,并关联着ChannelPipeline流水线。用于控制流水线上处理器的传播),可以支持动态的增加和删除处理器上下文对象。

在通道流水线中,IO事件按照既定的顺序处理。入站处理器的执行次序为从前往后,而出站处理器的次序为从后往前。

原文  https://juejin.im/post/5df84c076fb9a016194b006f
正文到此结束
Loading...