转载

hadoop/spark关闭钩子研究

标签: hadoop spark 研究 | 发表时间:2016-02-06 01:39 | 作者:distantlight1

出处:http://www.iteye.com

引子:在使用spark和hadoop的时候,遇到一些进程退出时的报错。因此顺便研究了一下jvm以及一些开源框架的关闭钩子的机制。这篇文章不涉及底层native实现,仅限Java层面

1.jvm关闭钩子

注册jvm关闭钩子通过Runtime.addShutdownHook(),实际调用ApplicationShutdownHooks.add()。后者维护了一个钩子集合IdentityHashMap<Thread, Thread> hooks

ApplicationShutdownHooks类初始化的时候,会注册一个线程到Shutdown类

static {     try {         Shutdown.add(1, false,             new Runnable() {                 public void run() {                     runHooks();                 }             }         );         hooks = new IdentityHashMap<>();     } catch (IllegalStateException e) {         hooks = null;     } }

Shutdown类里也维护了一个钩子集合

private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];

这个集合是分优先级的(优先级就是下标数值),自定义的钩子优先级默认是1,也就是最先执行。关闭钩子最终触发就是从这个集合进行

应用关闭时,以System.exit()为例,依次调用Runtime.exit()、Shutdow.exit()

Shutdown执行jvm退出逻辑,并维护了若干关闭状态

private static final int RUNNING = 0; // 初始状态,开始关闭 private static final int HOOKS = 1; // 运行钩子 private static final int FINALIZERS = 2; // 运行finalizer private static int state = RUNNING;  static void exit(int status) {     boolean runMoreFinalizers = false;     synchronized (lock) {  // 根据退出码status参数做不同处理         if (status != 0) runFinalizersOnExit = false; // 只有正常退出才会运行finalizer         switch (state) {         case RUNNING:       // 执行钩子并修改状态             state = HOOKS;             break;         case HOOKS:         // 执行钩子             break;         case FINALIZERS:    // 执行finalizer             if (status != 0) {                 halt(status); // 如果是异常退出,直接退出进程。halt()底层是native实现,这时不会执行finalizer             } else {      // 正常退出则标记是否需要执行finalizer                 runMoreFinalizers = runFinalizersOnExit;             }             break;         }     }      if (runMoreFinalizers) { // 如果有需要,就执行finalizer,注意只有state=FINALIZERS会走这个分支         runAllFinalizers();         halt(status);     }      synchronized (Shutdown.class) {         // 这里执行state= HOOKS逻辑,包括执行钩子和finalizer         sequence();         halt(status);     } }  private static void sequence() {     synchronized (lock) {         if (state != HOOKS) return;     }     runHooks(); // 执行钩子,这里会依次执行hooks数组里的各线程     boolean rfoe; // finalizer逻辑     synchronized (lock) {         state = FINALIZERS;         rfoe = runFinalizersOnExit;     }     if (rfoe) runAllFinalizers(); }  private static void runHooks() {     for (int i=0; i < MAX_SYSTEM_HOOKS; i++) {         try {             Runnable hook;             synchronized (lock) {                 currentRunningHook = i;                 hook = hooks[i];             }  // 由于之前注册了ApplicationShutdownHooks的钩子线程,这里又会回调ApplicationShutdownHooks.runHooks             if (hook != null) hook.run();         } catch(Throwable t) {             if (t instanceof ThreadDeath) {                 ThreadDeath td = (ThreadDeath)t;                 throw td;             }         }     } }  static void runHooks() {     Collection<Thread> threads;     synchronized(ApplicationShutdownHooks.class) {         threads = hooks.keySet();         hooks = null;     }      // 注意ApplicationShutdownHooks里的钩子之间是没有优先级的,如果定义了多个钩子,那么这些钩子会并发执行     for (Thread hook : threads) {         hook.start();     }     for (Thread hook : threads) {         try {             hook.join();         } catch (InterruptedException x) { }     } }

2.Spring关闭钩子

Spring在AbstractApplicationContext里维护了一个shutdownHook属性,用来关闭Spring上下文。但这个钩子不是默认生效的,需要手动调用ApplicationContext.registerShutdownHook()来开启

在自行维护ApplicationContext(而不是托管给tomcat之类的容器时),注意尽量使用ApplicationContext.registerShutdownHook()或者手动调用ApplicationContext.close()来关闭Spring上下文,否则应用退出时可能会残留资源

public void registerShutdownHook() {   if (this.shutdownHook == null) {    this.shutdownHook = new Thread() {     @Override     public void run() {      // 这里会调用Spring的关闭逻辑,包括资源清理,bean的销毁等      doClose();     }    };    // 这里会把spring的钩子注册到jvm关闭钩子    Runtime.getRuntime().addShutdownHook(this.shutdownHook);   }  }

3.Hadoop关闭钩子

Hadoop客户端初始化时,org.apache.hadoop.util.ShutdownHookManager会向Runtime注册一个钩子线程。ShutdownHookManager是一个单例类,并维护了一个钩子集合

private Set<HookEntry> hooks;  static {     Runtime.getRuntime().addShutdownHook(       new Thread() {         @Override         public void run() {           MGR.shutdownInProgress.set(true);  // MGR是本类的单例           for (Runnable hook: MGR.getShutdownHooksInOrder()) {             try {               hook.run();             } catch (Throwable ex) {               LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +                        "' failed, " + ex.toString(), ex);             }           }         }       }     );   }

这里HookEntry是hadoop封装的钩子类,HookEntry是带优先级的,一个priority属性。MGR.getShutdownHooksInOrder()方法会按priority依次(单线程)执行钩子

默认挂上的钩子就一个:org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer(priority=10),这个钩子用来清理hadoop FileSystem缓存以及销毁FileSystem实例。这个钩子是在第一次hadoop IO发生时(如FileSystem.get)lazy加载

此外调用FileContext.deleteOnExit()方法也会通过注册钩子

hadoop集群(非客户端)启动时,还会注册钩子清理临时路径

4.SparkContext关闭钩子

Spark也有关闭钩子管理类org.apache.spark.util.ShutdownHookManager,结构与hadoop的ShutdownHookManager基本类似

hadoop 2.x开始,spark的ShutdownHookManager会挂一个SparkShutdownHook钩子到hadoop的ShutdownHookManager(priority=40),用来实现SparkContext的清理逻辑。hadoop 1.x没有ShutdownHookManager,所以SparkShutdownHook直接挂在jvm上

def install(): Unit = {   val hookTask = new Runnable() {     // 执行钩子的回调进程,根据priority依次执行钩子     override def run(): Unit = runAll()   }   Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {     case Success(shmClass) =>       val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get(null).asInstanceOf[Int]       val shm = shmClass.getMethod("get").invoke(null)       shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int]).invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))      case Failure(_) => // hadoop 1.x        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));   } }

顺便说一下,hadoop的FileSystem实例底层默认是复用的,所以如果执行了两次fileSystem.close(),第二次会报错FileSystem Already Closed异常(即使表面上是对两个实例执行的)

一个典型的场景是同时使用Spark和Hadoop-Api,Spark会创建FileSystem实例,Hadoop-Api也会创建,由于底层复用,两者其实是同一个。因为关闭钩子的存在,应用退出时会执行两次FileSystem.close(),导致报错。解决这个问题的办法是在hdfs-site.xml增加以下配置,关闭FileSystem实例复用

<property>         <name>fs.hdfs.impl.disable.cache</name>         <value>true</value>     </property>

5.总结

以下为相关调用逻辑整理

紫红色箭头表示钩子注册,蓝色箭头表示钩子触发

蓝色、黄色、红色线框分别表示Spring、Hadoop、Spark相关代码

hadoop/spark关闭钩子研究

已有 0 人发表留言,猛击->> 这里 <<-参与讨论

ITeye推荐

  • —软件人才免语言低担保 赴美带薪读研!—
原文  http://itindex.net/detail/55171-hadoop-spark-研究
正文到此结束
Loading...