本文以及接下来的几篇文章将介绍Flink运行时TaskManager间进行数据交换的核心部分——基于Netty通信框架远程请求ResultSubpartition。作为系列文章的第一篇,先列出需要了解的基础对象。
Netty连接管理器(NettyConnectionManager)是连接管理器接口(ConnectionManager)针对基于Netty的远程连接管理的实现者。它是TaskManager中负责网络通信的网络环境对象(NetworkEnvironment)的核心部件之一。
一个TaskManager中可能同时运行着很多任务实例,有时某些任务需要消费某远程任务所生产的结果分区,有时某些任务可能会生产结果分区供其他任务消费。所以对一个TaskManager来说,其职责并非单一的,它既可能充当客户端的角色也可能充当服务端角色。因此,一个NettyConnectionManager会同时管理着一个Netty客户端(NettyClient)和一个Netty服务器(NettyServer)实例。当然除此之外还有一个Netty缓冲池(NettyBufferPool)以及一个分区请求客户端工厂(PartitionRequestClientFactory,用于创建分区请求客户端PartitionRequestClient),这些对象都在NettyConnectionManager构造器中被初始化。
每个PartitionRequestClientFactory实例都依赖一个NettyClient。也就是说所有PartitionRequestClient底层都共用一个NettyClient。
Netty客户端和服务器对象的启动和停止都是由NettyConnectionManager统一控制的。NettyConnectionManager启动的时机是当TaskManager跟JobManager关联上之后调用NetworkEnvironment的associateWithTaskManagerAndJobManager方法时。而当TaskManager跟JobManager解除关联时停止。
NettyClient和NettyServer在实例化Netty通信的核心对象时都需要配置各自的“字节缓冲分配器”用于为Netty读写数据分配内存单元。Netty自身提供了一个池化的字节缓冲分配器(PooledByteBufAllocator),但Flink又在此基础上进行了包装并提供了Netty缓冲池(NettyBufferPool)。此举的目的是严格控制所创建的分配器(Arena)的个数,转而依赖TaskManager的相关配置指定。
什么是Arena?当指定PooledByteBufAllocator来执行ByteBuf分配时,最终的内存分配工作被委托给类PoolArena。由于Netty通常用于高并发系统,所以各个线程进行内存分配时竞争不可避免,这可能会极大的影响内存分配的效率,为了缓解高并发时的线程竞争,Netty允许使用者创建多个分配器(Arena)来分离锁,提高内存分配效率。
NettyBufferPool在构造器内部以固定的参数实例化PooledByteBufAllocator并作为自己的内部分配器。具体做了哪些限制呢?首先,PooledByteBufAllocator本身既支持堆内存分配也支持堆外内存分配,NettyBufferPool将其限定为只在堆外内存上进行分配。其次, 显式指定了pageSize大小为8192,maxOrder值为11。这两个参数是什么意思呢?Netty中的内存池包含页(page)和块(chunk)两种分配单位,通过PooledByteBufAllocator构造器可以设置页大小(也即pageSize参数),该参数在PooledByteBufAllocator中的默认值为8192,而参数maxOder则用于计算块的大小。
计算公式为:chunkSize = pageSize << maxOrder;因此这里块大小为16MB。
另外,NettyBufferPool通过反射还拿到了PooledByteBufAllocator中的PoolArena分配器对象集合,但此举更多的是出于调试目的。并且显式关闭了对堆内存相关的操作方法。
NettyClient的主要职责是初始化Netty客户端的核心对象,并根据NettyProtocol配置用于客户端事件处理的ChannelPipeline。
NettyClient并不用于发起远程结果子分区请求,该工作将由PartitionRequestClient完成。
一个Netty引导客户端的创建步骤如下:
bootstrap = new Bootstrap();
switch (config.getTransportType()) { case NIO: initNioBootstrap(); break; case EPOLL: initEpollBootstrap(); break; case AUTO: if (Epoll.isAvailable()) { initEpollBootstrap(); LOG.info("Transport type 'auto': using EPOLL."); } else { initNioBootstrap(); LOG.info("Transport type 'auto': using NIO."); } }
Netty自版本4.0.16开始,对于Linux系统提供原生的套接字通信传输支持(也即,epoll机制,借助于JNI调用),这种传输机制拥有更高的性能。
bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public voidinitChannel(SocketChannel channel)throwsException{ channel.pipeline().addLast(protocol.getClientChannelHandlers()); } });
注意以上设置的是基于NettyPotocol获得的一个ChannelHandler数组组成的管道。
return bootstrap.connect(serverSocketAddress);
以上就是一个Netty客户端从初始化到跟服务器建立连接的大致过程。但这里需要注意的是,一个TaskManager根本上只会存在一个NettyClient对象(对应的也只有一个Bootstrap实例)。但一个TaskManager中的子任务实例很有可能会跟多个不同的远程TaskManager通信,所以同一个Bootstrap实例可能会跟多个目标服务器建立连接,所以它是复用的,这一点不存在问题因为无论跟哪个目标服务器通信,Bootstrap的配置都是不变的。至于不同的RemoteChannel如何跟某个连接建立对应关系,这一点由PartitionRequestClientFactory来保证。
Netty自版本4.0.16开始,对于Linux系统提供原生的套接字通信传输支持(也即,epoll机制,借助于JNI调用),这种传输机制拥有更高的性能。
跟NettyClient一样,NettyServer也会初始化Netty服务端的核心对象,除此之外它会启动对特定端口的侦听并准备接收客户端发起的请求。下面是NettyServer的初始化与启动步骤:
bootstrap = new ServerBootstrap();
switch (config.getTransportType()) { case NIO: initNioBootstrap(); break; case EPOLL: initEpollBootstrap(); break; case AUTO: if (Epoll.isAvailable()) { initEpollBootstrap(); LOG.info("Transport type 'auto': using EPOLL."); } else { initNioBootstrap(); LOG.info("Transport type 'auto': using NIO."); } }
bootstrap.localAddress(config.getServerAddress(), config.getServerPort());
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public voidinitChannel(SocketChannel channel)throwsException{ channel.pipeline().addLast(protocol.getServerChannelHandlers()); } });
注意以上设置的是基于NettyPotocol获得的一个ChannelHandler数组组成的管道。
bindFuture = bootstrap.bind().syncUninterruptibly();
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)