转载

Hadoop2源码分析-YARN 的服务库和事件库

1.概述

在《 Hadoop2源码分析-YARN RPC 示例介绍 》一文当中,给大家介绍了YARN 的 RPC 机制,以及相关代码的演示,今天我们继续去学习 YARN 的服务库和事件库,分享目录如下所示:

  • 服务库和事件库介绍
  • 使用示例
  • 截图预览

下面开始今天的内容分享。

2.服务库和事件库介绍

2.1服务库

YARN对于生命周期较长的对象使用服务的对象模型进行管理,注音特点如下:

  • 用于被服务化的对象包含4个状态,他们分别是:被创建、已初始化、已启动和已停止。源代码地址在 org.apache.hadoop.service 的 Service 接口中,内容如下所示:
public enum STATE {   /** Constructed but not initialized */   NOTINITED(0, "NOTINITED"),    /** Initialized but not started or stopped */   INITED(1, "INITED"),    /** started and not stopped */   STARTED(2, "STARTED"),    /** stopped. No further state transitions are permitted */   STOPPED(3, "STOPPED");    /**    * An integer value for use in array lookup and JMX interfaces.    * Although {@link Enum#ordinal()} could do this, explicitly    * identify the numbers gives more stability guarantees over time.    */   private final int value;    /**    * A name of the state that can be used in messages    */   private final String statename;    private STATE(int value, String name) {     this.value = value;     this.statename = name;   }    /**    * Get the integer value of a state    * @return the numeric value of the state    */   public int getValue() {     return value;   }    /**    * Get the name of a state    * @return the state's name    */   @Override   public String toString() {     return statename;   } } 
public enum STATE {   /** Constructed but not initialized */   NOTINITED(0, "NOTINITED"),    /** Initialized but not started or stopped */   INITED(1, "INITED"),    /** started and not stopped */   STARTED(2, "STARTED"),    /** stopped. No further state transitions are permitted */   STOPPED(3, "STOPPED");    /**    * An integer value for use in array lookup and JMX interfaces.    * Although {@link Enum#ordinal()} could do this, explicitly    * identify the numbers gives more stability guarantees over time.    */   private final int value;    /**    * A name of the state that can be used in messages    */   private final String statename;    private STATE(int value, String name) {     this.value = value;     this.statename = name;   }    /**    * Get the integer value of a state    * @return the numeric value of the state    */   public int getValue() {     return value;   }    /**    * Get the name of a state    * @return the state's name    */   @Override   public String toString() {     return statename;   } } 
public abstract class AbstractService implements Service {      // ......  }

通过阅读代码,我们可以看出,服务的对象它实现了接口Service,并定义了最基本的服务状态:创建、初始化、启动以及停止。对于 AbstractService 类来说,它实现了 Service 接口。

  • 任何服务状态的变化都可以触发其他的动作,例如:
public void start() {   if (isInState(STATE.STARTED)) {     return;   }   //enter the started state   synchronized (stateChangeLock) {     if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {       try {         startTime = System.currentTimeMillis();         serviceStart();         if (isInState(STATE.STARTED)) {           //if the service started (and isn't now in a later state), notify           if (LOG.isDebugEnabled()) {             LOG.debug("Service " + getName() + " is started");           }           notifyListeners();         }       } catch (Exception e) {         noteFailure(e);         ServiceOperations.stopQuietly(LOG, this);         throw ServiceStateException.convert(e);       }     }   } } 

这里,我们会去触发一个监听动作,全局监听状态的改变,异常的捕捉监听等。

  • 可以通过组合的方式进行服务组合,这样做的好处是便于统一去管理:在 YARN 中,如果是非组合服务,可以直接继承 AbstractService 类,否则需继承 CompositeService。

2.2事件库

在 YARN 中,核心服务其本质就是一个中央异步调度器,包含有ResourceManager、 NodeManager、MRAppMaster等内容,YARN 事件与事件处理器的关系在

org.apache.hadoop.yarn.event  中。在使用 YARN 事件库的时候,需要先定义一个中央异步调度器 AsyncDispatcher,它负责事件的处理与转发,然后我们根据实际业务需求定义一系列事件 Event 与事件处理器 EventHandler,并将事件注册到中央异步调度器中用于完成事件统一管理和应用调度。流程如下图所示:

Hadoop2源码分析-YARN 的服务库和事件库

3.使用示例

接下来,我们编写示例代码,去代码中理解这部分流程。

  • 首先是 JMRAppMaster 类:
package cn.hadoop.task.exec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import cn.hadoop.task.CompositeService; import cn.hadoop.task.JobEvent; import cn.hadoop.task.JobEventType; import cn.hadoop.task.TaskEvent; import cn.hadoop.task.TaskEventType; /**  * @Date Jul 22, 2015  *  * @Author dengjie  *  * @Note TODO  */ public class JMRAppMaster extends CompositeService {  private Dispatcher dispatcher; // AsyncDispatcher  private String jobID;  private int taskNumber; // include numbers  private String[] taskIDs; // include all task  public JMRAppMaster(String name, String jobID, int taskNumber) {   super(name);   this.jobID = jobID;   this.taskNumber = taskNumber;   taskIDs = new String[taskNumber];   for (int i = 0; i < taskNumber; i++) {    taskIDs[i] = new String(this.jobID + "_task_" + i);   }  }  public void serviceInit(Configuration conf) throws Exception {   dispatcher = new AsyncDispatcher();// default a AsyncDispatcher   dispatcher.register(JobEventType.class, new JobEventDispatcher());// register a job   dispatcher.register(TaskEventType.class, new TaskEventDispatcher());// register a task   addService((Service) dispatcher);   super.serviceInit(conf);  }  public Dispatcher getDispatcher() {   return dispatcher;  }  private class JobEventDispatcher implements EventHandler<JobEvent> {   @SuppressWarnings("unchecked")   public void handle(JobEvent event) {    if (event.getType() == JobEventType.JOB_KILL) {     System.out.println("Receive JOB_KILL event, killing all the tasks");     for (int i = 0; i < taskNumber; i++) {      dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));     }    } else if (event.getType() == JobEventType.JOB_INIT) {     System.out.println("Receive JOB_INIT event, scheduling tasks");     for (int i = 0; i < taskNumber; i++) {      dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));     }    }   }  }  private class TaskEventDispatcher implements EventHandler<TaskEvent> {   public void handle(TaskEvent event) {    if (event.getType() == TaskEventType.T_KILL) {     System.out.println("Receive T_KILL event of task id " + event.getTaskID());    } else if (event.getType() == TaskEventType.T_SCHEDULE) {     System.out.println("Receive T_SCHEDULE event of task id " + event.getTaskID());    }   }  } } 

另外,还需要添加一些其他类,这些类以来可以在 Hadoop 源码工程中找到,这里就不贴代码了,大家可以到 Hadoop 工程的源码中找到对应的类,相关类名如下图所示:

Hadoop2源码分析-YARN 的服务库和事件库

接下来是一个测试类,去测试一下我们所编写的示例流程。

  • JMRAppMasterTest类:

package cn.hadoop.rpc.test.yarn.task; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import cn.hadoop.task.JobEvent; import cn.hadoop.task.JobEventType; import cn.hadoop.task.exec.JMRAppMaster; /**  * @Date Jul 22, 2015  *  * @Author dengjie  *  * @Note TODO  */ public class JMRAppMasterTest {  @SuppressWarnings({ "unchecked", "resource" })  public static void main(String[] args) {   String jobID = "job_20150723_11";   JMRAppMaster appMaster = new JMRAppMaster("Simple MRAppMaster Test", jobID, 10);   YarnConfiguration conf = new YarnConfiguration(new Configuration());   try {    appMaster.serviceInit(conf);    appMaster.serviceStart();   } catch (Exception e) {    e.printStackTrace();   }   appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));   appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT));  } } 

4.截图预览

在编写完成相关流程代码后,我们运行代码来观察整个流程,截图如下所示:

Hadoop2源码分析-YARN 的服务库和事件库

5.总结

在编写这部分流程代码时,可以参考 Hadoop YARN 部分的工程源码,通过运行调试代码,掌握对事件库和服务库的流程,以及它们的工作机制。另外,在编写的过程当中,最好将源码的文件引入到自己的工程,不要单独使用 JAR 包的方式导入,由于我们是独立运行某个模块,需要改动源代码的函数访问权限,若是直接引入 JAR 包地址,会导致函数修饰权限问题而不能运行,这里大家在运行调试的时候注意即可。

6.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

正文到此结束
Loading...