我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。
本文不打算深入地详细分析TaskTracker某个具体的处理流程,而是概要地分析TaskTracker在MapReduce框架中的主要负责处理那些事情,是我们能够在宏观上了解TaskTracker端都做了哪些工作。我尽量将TaskTracker端的全部要点内容提出来,但是涉及到详细的分析,只是点到为止,后续会对相应模块的处理流程结合代码进行分析。
TaskTracker主要负责MapReduce计算集群中Task运行的管理,所以TaskTracker要管理的事情比较多。一个MapReduce Job由很多的Task组成,而一个Job的所有Task被分成几个相斥的子集,每个子集被分配到某一个TaskTracker上去运行,所以一个TaskTracker管理运行了一个Job的所有Task的一个子集,也就是说TaskTracker不仅要维护每个Job对应的一个Task的子集,还要维护这些Task所属的Job的运行状态,对于Job/Task的状态的管理都是与JobTracker通过RPC通信保持状态的同步。
下面是TaskTracker端的主要组件,如下图所示:
为了了解TaskTracker中各个组件都负责处理哪些工作,我们通过下表来简要地说明各个组件的功能,如下表所示:
组件名称 | 组件功能 |
localFs: FileSystem | TaskTracker本地文件系统,用来管理本地文件和目录 |
systemFS: FileSystem | HDFS分布式文件系统,可以访问HDFS,用来检索Job/Task对应的资源文件等。 |
TrackerDistributedCacheManager | TrackerDistributedCacheManager负责跨Job的缓存的管理,每个Job会对应一个TaskDistributedCacheManager实例。比如,每次TaskTracker被分配执行一个Job的一组Task,此时需要将该Job对应的资源文件和split相关数据从HDFS下载到TaskTracker本地,这些文件都需要进行管理,包括位置查询、文件访问、文件清理等。 |
TaskTrackerInstrumentation | 用来管理TaskTracker上运行的一些Task的监控数据,主要是采集某些点的数据,如Task完成时、Task失败时、Task超时时等,目前该组件中都是空实现。 |
IndexCache | Map阶段需要输出临时文件,要对MapTask的输出写入TaskTracker本地文件系统,需要对这些输出数据进行分区(partition),IndexCache负责管理分区文件的相关信息。 |
UserLogManager | 负责管理TaskTracker节点上执行Task输出的日志信息,目前通过UserLogEvent定义了JVM_FINISHED、JOB_STARTED,、JOB_COMPLETED,、DELETE_JOB这4种事件,通过UserLogManager可以实现日志记录的输出。 |
ACLsManager | 用来控制MapReduce管理员管理Job和Queue级别操作的访问权限。 |
NodeHealthCheckerService | 用来检测节点之间的心跳服务。 |
ResourceCalculatorPlugin | 用来计算系统的资源的插件,默认使用的是LinuxResourceCalculatorPlugin实现,可以方便地访问系统中的资源信息状态,如内存、CPU。 |
JvmManager | 为了保证TaskTracker与实际Task(MapTask/ReduceTask)运行的隔离性,会将Task在单独的JVM实例中运行,JvmManager用来管理Task运行所在的JVM实例的信息,包括创建/销毁JVM实例等操作。 |
LocalStorage | 管理TaskTracker本地文件系统的存储目录信息,如访问本地目录失败、检测目录可用性等。 |
LocalDirAllocator | 管理TaskTracker本地目录分配,初始化LocalDirAllocator基于配置mapred.local.dir指定的目录,它采用的Round-Robin方式,在Task运行之前需要写一个启动Task的脚本文件,使用LocalDirAllocator来控制对应文件的读写。 |
JettyBugMonitor | 在Map阶段输出中间结果,Reduce阶段会基于HTTP协议(基于Jetty)来拷贝属于自己的分区,为了解决Jetty已知的一些类存在的Bug,它们可能会影响TaskTracker,通过检测Jetty所在JVM实例使用CPU量,当超过配置的值时终止TaskTracker进程。 |
MapOutputServlet | TaskTracker上启动一个Jetty容器,该Servlet用来负责暴露HTTP接口,供其它运行ReduceTask的TaskTracker拉取Map输出文件。 |
jobClient: InterTrackerProtocol | 与JobTracker进行RPC通信的代理(Proxy)对象。 |
taskReportServer: Server | TaskTracker节点上启动的RPC Server,在其上运行的Task,在运行过程中会向TaskTracker汇报状态,使TaskTracker知道Task的运行状态报告。 |
CleanupQueue | 负责清理Job或Task运行完成后遗留下的一些不再使用的文件或目录。 |
TaskTrackerStatus | 维护TaskTracker当前的状态信息,主要包括:TaskTracker的配置信息、TaskTracker上资源状态信息、TaskTracker上运行的Task的状态报告信息。 |
JobTokenSecretManager | 用来管理Job运行的令牌相关信息。 |
ShuffleServerInstrumentation | 管理Job运行过程中,shuffle阶段的监控数据,包括一组计数器:serverHandlerBusy、outputBytes、failedOutputs、successOutputs、exceptionsCaught。 |
TaskController | 用来管理Task的初始化、完成、清理工作,还负责启动和终止Task运行所在的JVM实例。 |
HttpServer | 用来处理Map输出的Jetty容器,其中MapOutputServlet会注册到该HTTP server中。 |
ShuffleExceptionTracker | 跟踪Shuffle阶段出现异常情况的信息。 |
MapEventsFetcherThread | 跟踪每个运行的Job对应的ReduceTask的Shuffle阶段,如果有Map完成,会对应着TaskCompletionEvent触发该线程,从已经完成的Map所在节点拷贝Map输出的中间结果数据,为ReduceTask运行做准备。 |
ReduceTaskLauncher | 启动ReduceTask。 |
MapTaskLauncher | 启动MapTask。 |
TaskCleanupThread | 负责清理Job/Task执行完成后遗留的文件或目录。 |
TaskMemoryManagerThread | 管理在该TaskTracker上运行的Task使用内存的信息。 |
通过上表,我们可以了解到TaskTracker端各个组件的基本功能,也稍微了解到组件之间的一些关系。下面,我们从TaskTracker抽象层次的视角,来分析组件之间的关系和交互,概要地描述一些主要的处理流程:
下面,我们分别分析上述列举的5个处理流程:
TaskTracker周期性地向JobTracker发送心跳报告,将TaskTracker上运行的Task的状态信息、节点资源信息、节点健康状况信息封装到TaskTrackerStatus对象中,通过RPC调用heartbeat将心跳发送到JobTracker端,并返回HeartbeatResponse,其中心跳响应对象中包包含了JobTracker分配的任务,通过TaskAction这种指令(包括:LaunchTaskAction/KillTaskAction/CommitTaskAction)列表的方式进行指派。TaskTracker解析RPC调用返回的心跳响应,根据TaskAction指令列表,执行具体的操作。
TaskTracker处理心跳响应的流程,如下序列图所示:
TaskTracker收到心跳响应,首先会检查是否存在需要恢复的Job,如果存在,则会检查要进行恢复的Job的状态,从而将需要进行恢复的Job对应的Task加入到恢复队列中,等待调度运行。
接着,TAskTracker会检查TaskAction指令的类型,根据其实际类型,执行对应的处理流程:
由于某些重要的处理流程,如启动一个Task的详细流程,我们会在后续单独写几篇文章,用更加合适的方式来详细分析。
这里,我们介绍一下MapReduce计算中是如何实现Job的恢复的,包括JobTracker端和TaskTracker端之间的简单交互流程。JobTracker存在一个系统目录(System Directory),默认值为/tmp/hadoop/mapred/system,也可以根据配置项mapred.system.dir指定该值。当JobClient提交一个Job到JobTracker时,JobTracker会首先将该Job的信息写入到JobTracker的系统目录下,每个Job对应一个以Job ID为名称的子目录,以便JobTracker因为重启,能够恢复这些Job的运行。我们可以看一下JobTracker中submitJob方法中保存Job信息的实现,代码如下所示:
// Store the job-info in a file so that the job can be recovered // later (if at all) // Note: jobDir & jobInfo are owned by JT user since we are using // his fs object if (!recovered) { Path jobDir = getSystemDirectoryForJob(jobId); // Job目录,例如:/tmp/hadoop/mapred/system/job_200912121733_0002 FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); // Job文件,例如:/tmp/hadoop/mapred/system/job_200912121733_0002/job-info jobInfo.write(out); // 将JobInfo结构对应的数据写入到Job文件 out.close(); }
上面代码中,JobInfo主要包含了JobID信息、用户名称、Job提交目录(例如,/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/ ,该目录是在JobClient提交Job时在HDFS上创建的,用于将该Job所需要的资源都拷贝到该Job对应的提交目录下面,便于后续JobTracker能够读取这些数据)。
如果JobTracker因为某些原因重新启动了,那么在JobTracker重启之后,需要从JobTracker的系统目中读取这些Job的信息,以便能够恢复这些尚未完成的Job的运行,并以HeartbeatResponse的结构,在TaskTracker发送Heartbeat的时候响应给TaskTracker,TaskTracker解析响应数据,然后去恢复这些Job的运行。
上面的序列图中,我们可以看到,当TaskTracker发送Heartbeat并收到响应后,从HeartbeatResponse中解析取出需要Recovered的Job,并进行处理,代码如下所示:
// Check if the map-event list needs purging Set<JobID> jobs = heartbeatResponse.getRecoveredJobs(); // 获取到需要Recovered的Job列表 if (jobs.size() > 0) { synchronized (this) { // purge the local map events list for (JobID job : jobs) { RunningJob rjob; synchronized (runningJobs) { // TaskTracker维护的当前在其上运行的Job列表 rjob = runningJobs.get(job); if (rjob != null) { synchronized (rjob) { FetchStatus f = rjob.getFetchStatus(); if (f != null) { f.reset(); } } } } } // Mark the reducers in shuffle for rollback synchronized (shouldReset) { for (Map.Entry<TaskAttemptID, TaskInProgress> entry : runningTasks.entrySet()) { if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) { this.shouldReset.add(entry.getKey()); // 将处于SHUFFLE阶段的Task放到shouldReset集合中 } } } } }
通过上面的代码我们可以看到,当JobTracker重启的时候,已经在TaskTracker上运行的属于某些Job的Task可能无法立即感知到,对应的Job仍然存在于TaskTracker的runningJobs集合中。在JobTracker重启之后,TaskTracker所发送的第一个Heartbeat返回的响应数据中,应该会存在需要Recovered的Job列表,所以这时在TaskTracker端只需要从runningJobs中取出需要Recovered的Job,并查看其是否存在Fetch状态,如果存在,应该重新设置状态(主要对应于MapEventsFetcherThread 维护的TaskCompletionEvent列表,触发ReduceTask拉取MapTask的输出中间结果),以便该Job的各个Task恢复运行。如果该ReduceTask正在运行于SHUFFLE阶段,需要将对应的Job的MapTask的输出拷贝到该ReduceTask所在的节点上,通过调用FetchStatus的reset方法重置状态,这样就重新恢复了ReduceTask的运行。
由于MapReduce用户程序包含用户代码,可能会存在Bug,为了不因为用户代码存在的Bug影响TaskTracker服务,所以MapReduce采用了隔离Task运行的方式来运行MapTask/ReduceTask。在运行Task时,会单独创建一个独立的JVM实例,让Task的代码再该JVM实例中加载运行,TaskTracker需要跟踪该JVM实例中运行的Task的状态。在TaskTracker端,加载一个运行Task的JVM实例,是通过org.apache.hadoop.mapred.Child类来实现。下面,我们看一下Child类是如何实现Task加载运行的,如下面序列图所示:
Child类包含一个入口主方法main,在运行的时候需要传递对应的参数,来运行MapTask和ReduceTask,通过上面序列图我们可以看出,命令行输入如下5个参数:
有了上述参数,就可以获取到一个Task运行所需要的全部资源,如一个Task处理哪一个Split,一个Task对应的Job配置信息,还可以方便TaskTracker监控该Task实例所在的JVM的状态。该Child创建时,会创建一个到TaskTracker的RPC代理对象,通过该RPC连接向TaskTracker汇报Task执行进度及其状态信息。然后,一切运行Task的基本条件都已经具备,接下来从该Task对应的Job的代码(job.jar)开始加载任务处理类,如果是MapTask则执行MapTask运行的处理流程,如果是ReduceTask则执行ReduceTask的处理流程,最后,断开Task汇报状态的RPc连接,Task运行结束。
在Child类中加载启动Task,如果是MapTask,则执行MapTask对应的处理流程,如下序列图所示:
启动一个MapTask运行,包含4个阶段,我们通过运行各个阶段的方法来表示:
在MapTask运行过程中, 如果阶段或者状态发生变化,要与TaskTracker进行通信,汇报状态,并更新TaskTracker维护的关于Task和Job对应的状态数据。最后,Task运行完成,也要通知TaskTracker。
在Child类中加载启动Task,如果是ReduceTask,则执行ReduceTask对应的处理流程,如下序列图所示:
启动一个ReduceTask运行,与MapTask的处理流程有很大的不同,它包含3个阶段,如下所示:
执行reduce阶段,比MapTask复杂的多。在ReduceTask运行过程中,也会周期性地与TaskTracker通信,汇报Task运行进度和状态,以保证与TaskTracker所维护的Task的状态数据同步。当ReduceTask完成后,如果有输出的话,最终的结果数据会输出到HDFS中保存。
本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。