很多时候服务都有平滑退出的需求,例如RPC服务在停止之后需要从注册服务摘除节点、从消息队列已经消费的消息需要正常处理完成等。一般地我们希望能让服务在退出前能执行完当前正在执行的任务,这个时候就需要我们在JVM关闭的时候运行一些清理现场的代码。
JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,允许用户注册一个JVM关闭的钩子。这个钩子可以在以下几种场景被调用:
一般地发布系统会通过kill命令来停止服务。这个时候服务可以接收到关闭信号并执行钩子程序进行清理工作。
假设以下场景,有个生产者往内部队列发消息,有个消费者读取队列消息并执行。当我们停止服务的时候,希望队列的消息都能正常处理完成,代码示例如下:
/** * 服务关闭测试 */ public class ShutDownTest { private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50); private static AtomicLong taskId = new AtomicLong(0); // 生产任务 private static class ProduceTask implements Runnable { private AtomicBoolean stopped = new AtomicBoolean(false); @Override public void run() { while (!stopped.get()) { long element = taskId.incrementAndGet(); queue.add(element); System.out.println("add element : " + element); try { Thread.sleep(50); } catch (InterruptedException e) { } } } public void setStopped() { stopped.compareAndSet(false, true); System.out.println("stop producer."); } } // 消费任务 private static class ConsumeTask implements Runnable { private AtomicBoolean stopped = new AtomicBoolean(false); @Override public void run() { while (!stopped.get() || queue.size() > 0) { try { long element = queue.take(); System.out.println("consume element : " + element); doWork(); } catch (InterruptedException e) { } } } private void doWork() { try { // 消费速度比生产速度稍慢,模拟积压情况 Thread.sleep(60); } catch (InterruptedException e) { } } public void setStopped() { stopped.compareAndSet(false, true); System.out.println("stop consumer."); } } public static void main(String[] args) { final ProduceTask producerTask = new ProduceTask(); final Thread producerThread = new Thread(producerTask); final ConsumeTask consumeTask = new ConsumeTask(); Thread consumeThread = new Thread(consumeTask); // 先启动消费 consumeThread.start(); // 再启动生产 producerThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("try close..."); // 先关闭生产 producerTask.setStopped(); // 再关闭消费 consumeTask.setStopped(); try { System.out.println("close wait..."); Thread.sleep(5000); } catch (InterruptedException e) { } System.out.println("close finished..."); })); } } 复制代码
执行结果如下所示,可以看到服务关闭的时候钩子程序执行成功,在等待消息处理完成后才退出。
在使用ShutdownHook的时候,我们往往控制不了钩子的执行顺序。Java.Runtime.addShutdownHook是对外公开的API接口。在前述场景里面,假若是独立注册钩子,在更复杂的项目里面是不是就没办法保证执行的顺序呢?曾在实际场景中遇到过这样的问题,从kafka队列消费消息,交给内部线程池去处理,我们自定义了线程池的拒绝策略为一直等待(为了保证消息确实处理),然后就会偶尔出现服务无法关闭的问题。原因正是线程池先被关闭,kafka队列却还在消费消息,导致消费线程一直在等待。
Java同时提供了signal信号机制,我们的服务也可以接收到关闭信号。
使用Signal机制有以下原因:
这里核心的原因还是希望能完全保证服务关闭的顺序,避免出现问题。我们在服务内部按顺序维护关闭任务,上述代码调整后如下所示:
public class TermHelper { private static AtomicBoolean signalTriggered = new AtomicBoolean(false); private static AtomicBoolean stopping = new AtomicBoolean(false); private static AtomicBoolean registeredHolder = new AtomicBoolean(false); private static Deque<Runnable> terms = new ConcurrentLinkedDeque<>(); private static void tryRegisterOnlyOnce() { boolean previousRegistered = registeredHolder.getAndSet(true); if (!previousRegistered) { registerTermSignal(); } } private static void registerTermSignal() { Signal.handle(new Signal("TERM"), signal -> { boolean previous = signalTriggered.getAndSet(true); if (previous) { System.out.println("Term has been triggered."); return; } termAndExit(); }); } public static void addTerm(Runnable runnable) { tryRegisterOnlyOnce(); terms.addLast(runnable); } public static void addFirstTerm(Runnable runnable) { tryRegisterOnlyOnce(); terms.addFirst(runnable); } private static void termAndExit() { try { Thread current = Thread.currentThread(); current.setName(current.getName() + "(退出线程)"); System.out.println("do term cleanup...."); doTerm(); System.out.println("exit success."); System.exit(0); } catch (Throwable e) { e.printStackTrace(); System.exit(1); } } public static void doTerm() { boolean previousStopping = stopping.getAndSet(true); if (previousStopping) { System.out.println("Term routine already running, wait until done!"); return; } for (Runnable runnable : terms) { try { System.out.println("execute term runnable : " + runnable); runnable.run(); } catch (Throwable e) { e.printStackTrace(); } } try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } } 复制代码
TermHelper内部使用队列维护关闭任务,在服务关闭的时候串行执行相关任务,保证其顺序。我们也可以在此基础上维护关闭任务的优先级,实现按优先级高低依次执行关闭任务。
public class ShutDownTest { private static BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(50); private static AtomicLong taskId = new AtomicLong(0); // 生产任务 private static class ProduceTask implements Runnable { private AtomicBoolean stopped = new AtomicBoolean(false); @Override public void run() { while (!stopped.get()) { long element = taskId.incrementAndGet(); queue.add(element); System.out.println("add element : " + element); try { Thread.sleep(50); } catch (InterruptedException e) { } } } public void setStopped() { stopped.compareAndSet(false, true); System.out.println("stop producer."); } } // 消费任务 private static class ConsumeTask implements Runnable { private AtomicBoolean stopped = new AtomicBoolean(false); @Override public void run() { while (!stopped.get() || queue.size() > 0) { try { long element = queue.take(); System.out.println("consume element : " + element); doWork(); } catch (InterruptedException e) { } } } private void doWork() { try { // 消费速度比生产速度稍慢,模拟积压情况 Thread.sleep(60); } catch (InterruptedException e) { } } public void setStopped() { stopped.compareAndSet(false, true); System.out.println("stop consumer."); } } public static void main(String[] args) { final ProduceTask producerTask = new ProduceTask(); final Thread producerThread = new Thread(producerTask); final ConsumeTask consumeTask = new ConsumeTask(); Thread consumeThread = new Thread(consumeTask); // 先启动消费 consumeThread.start(); // 再启动生产 producerThread.start(); TermHelper.addFirstTerm(() -> { // 关闭生产 producerTask.setStopped(); }); TermHelper.addTerm(() -> { // 再关闭消费 consumeTask.setStopped(); }); Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("shut down hook..."); })); } } 复制代码
执行结果如下所示。需要注意的是我们只注册了TERM信号,所以需要通过kill -TERM的方式关闭服务。从图中可以看到我们测试的生产者和消费者都正常退出了,内部的消息最后也处理完成。
若需要平滑停止服务,我们一般可以通过ShutdownHook和Signal来实现。ShutdownHook一般比较难保证关闭任务的执行顺序,这个时候可以考虑使用Signal机制来完全托管我们关闭服务的执行顺序。