网络环境(NetworkEnvironment)是TaskManager进行网络通信的主对象,主要用于跟踪中间结果并负责所有的数据交换。每个TaskManager的实例都包含一个网络环境对象,在TaskManager启动时创建。NetworkEnvironment管理着多个协助通信的关键部件,它们是:
当NetworkEnvironment被初始化时,它首先根据配置创建网络缓冲池(NetworkBufferPool)。创建NetworkBufferPool时需要指定Buffer数目、单个Buffer的大小以及Buffer所基于的内存类型,这些信息都是可配置的并封装在配置对象NetworkEnvironmentConfiguration中。
NetworkEnvironment对象包含了上面列举的网络I/O相关的各种部件,这些对象并不随着NetworkEnvironment对象实例化而被立即实例化,它们的实例化会被延后到NetworkEnvironment对象跟TaskManager以及JobManager 关联 (associate)上之后。TaskManager在启动后会向JobManager注册,随后NetworkEnvironment的associateWithTaskManagerAndJobManager方法会得到调用,在其中所有的辅助部件都会得到实例化:
this.partitionManager = new ResultPartitionManager(); this.taskEventDispatcher = new TaskEventDispatcher(); this.partitionConsumableNotifier = new JobManagerResultPartitionConsumableNotifier( executionContext,jobManagerGateway,taskManagerGateway,jobManagerTimeout); this.partitionStateChecker = new JobManagerPartitionStateChecker(jobManagerGateway, taskManagerGateway); final Option<NettyConfig> nettyConfig = configuration.nettyConfig(); connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager(); try { //启动网络连接管理器 connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool); } catch (Throwable t) { throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t); }
当然在TaskManager触发stop动作之后,在其postStop逻辑中,也会跟JobManager进行 解关联 操作。从而触发NetworkEnvironment的disassociate方法。在disassociate方法中,上述所有的辅助通信部件也将会被释放或回收资源。
在任务执行的核心逻辑中,有一个步骤是需要将自身(Task)注册到网络栈(也就是这里的NetworkEnvironment)。该步骤会调用NetworkEnvironment的实例方法registerTask进行注册,注册之后NetworkEnvironment会对任务的通信进行管理:
public void registerTask(Task task) throws IOException { //获得当前任务对象所生产的结果分区集合 final ResultPartition[] producedPartitions = task.getProducedPartitions(); //同时获得所有的结果分区写入器 final ResultPartitionWriter[] writers = task.getAllWriters(); //正常情况下结果分区数与写入器的数目应该是相等的 if (writers.length != producedPartitions.length) { throw new IllegalStateException("Unequal number of writers and partitions."); } ResultPartitionConsumableNotifier jobManagerNotifier; synchronized (lock) { if (isShutdown) { throw new IllegalStateException("NetworkEnvironment is shut down"); } //如果当前网络环境对象还没有跟TaskManager进行关联,那么说明调用的时机出现问题,直接抛出异常 if (!isAssociated()) { throw new IllegalStateException("NetworkEnvironment is not associated with a TaskManager"); } //遍历任务的每个结果分区,依次进行初始化 for (int i = 0; i < producedPartitions.length; i++) { final ResultPartition partition = producedPartitions[i]; final ResultPartitionWriter writer = writers[i]; BufferPool bufferPool = null; try { //用网络缓冲池创建本地缓冲池,该缓冲池是非固定大小的且请求的缓冲个数是结果分区的子分区个数 bufferPool = networkBufferPool.createBufferPool( partition.getNumberOfSubpartitions(), false); //将本地缓冲池注册到结果分区 partition.registerBufferPool(bufferPool); //结果分区会被注册到结果分区管理器 partitionManager.registerResultPartition(partition); } catch (Throwable t) { if (bufferPool != null) { bufferPool.lazyDestroy(); } if (t instanceof IOException) { throw (IOException) t; } else { throw new IOException(t.getMessage(), t); } } //向任务事件分发器注册结果分区写入器 taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer); } //获得任务的所有输入闸门 final SingleInputGate[] inputGates = task.getAllInputGates(); //遍历输入闸门,为它们设置缓冲池 for (SingleInputGate gate : inputGates) { BufferPool bufferPool = null; try { //为每个输入闸门设置本地缓冲池,这里创建的本地缓冲池也非固定大小的,且初始化的缓冲数为其包含的输入信道数 bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false); gate.setBufferPool(bufferPool); } catch (Throwable t) { if (bufferPool != null) { bufferPool.lazyDestroy(); } if (t instanceof IOException) { throw (IOException) t; } else { throw new IOException(t.getMessage(), t); } } } jobManagerNotifier = partitionConsumableNotifier; } //遍历所有的结果分区 for (ResultPartition partition : producedPartitions) { //如果某个结果分区的消费者是主动部署的 if (partition.getEagerlyDeployConsumers()) { //则直接通知JobManager,让其告知消费者任务,当前结果分区可被消费 jobManagerNotifier.notifyPartitionConsumable( partition.getJobId(), partition.getPartitionId()); } } }
从任务被注册到NetworkEnvironment对象的代码段中,我们能够得到一些信息。NetworkEnvironment对象会为当前任务生产端的每个ResultPartition都创建本地缓冲池,缓冲池中的Buffer数为结果分区的子分区数,同时为当前任务消费端的InputGate创建本地缓冲池,缓冲池的Buffer数为InputGate所包含的输入信道数。这些缓冲池都是非固定大小的,也就是说他们会按照网络缓冲池内存段的使用情况进行重平衡。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)