转载

Java多线程编程模式实战指南(三):Two-phase Termination模式

停止线程是一个目标简单而实现却不那么简单的任务。首先,Java没有提供直接的API用于停止线程。此外,停止线程时还有一些额外的细节需要考虑,如待停止的线程处于阻塞(等待锁)或者等待状态(等待其它线程)、尚有未处理完的任务等。本文介绍的Two-phase Termination模式提供了一种通用的用于优雅地停止线程的方法。

Two-phase Termination模式简介

Java并没有提供直接的API用于停止线程。Two-phase Termination模式通过将停止线程这个动作分解为准备阶段和执行阶段这两个阶段,以应对停止线程过程中可能存在的问题。

准备阶段。该阶段主要动作是“通知”目标线程(欲停止的线程)准备进行停止。这一步会设置一个标志变量用于指示目标线程可以准备停止了。但是,由于目标线程可能正处于阻塞状态(等待锁的获得)、等待状态(如调用Object.wait)或者I/O(如InputStream.read)等待等状态,即便设置了这个标志,目标线程也无法立即“看到”这个标志而做出相应动作。因此,这一阶段还需要通过调用目标线程的interrupt方法,以期望目标线程能够通过捕获相关的异常侦测到该方法调用,从而中断其阻塞状态、等待状态。对于能够对interrupt方法调用作出响应的方法(参见表1),目标线程代码可以通过捕获这些方法抛出的InterruptedException来侦测线程停止信号。但也有一些方法(如InputStream.read)并不对interrupt调用作出响应,此时需要我们手工处理,如同步的Socket I/O操作中通过关闭socket,使处于I/O等待的socket抛出java.net.SocketException。

表 1. 能够对Thread.interrupt作出响应的一些方法

方法

响应interrupt调用抛出的异常

Object.wait() 、 Object.wait(long timeout) 、Object.wait(long timeout, int nanos)

InterruptedException

Thread.sleep(long millis) 、Thread.sleep(long millis, int nanos)

InterruptedException

Thread.join()、Thread.join(long millis) 、Thread.join(long millis, int nanos)

InterruptedException

java.util.concurrent.BlockingQueue.take()

InterruptedException

java.util.concurrent.locks.Lock.lockInterruptibly()

InterruptedException

java.nio.channels.InterruptibleChannel

java.nio.channels.ClosedByInterruptException

执行阶段。该阶段的主要动作是检查准备阶段所设置的线程停止标志和信号,在此基础上决定线程停止的时机,并进行适当的“清理”操作。

Two-phase Termination模式的架构

Two-phase Termination模式的主要参与者有以下几种。其类图如图1所示。

图 1. Two-phase Termination模式的类图

Java多线程编程模式实战指南(三):Two-phase Termination模式

  • ThreadOwner :目标线程的拥有者。Java语言中,并没有线程的拥有者的概念,但是线程的背后是其要处理的任务或者其所提供的服务,因此我们不能在不清楚某个线程具体是做什么的情况下贸然将其停止。一般地,我们可以将目标线程的创建者视为该线程的拥有者,并假定其“知道”目标线程的工作内容,可以安全地停止目标线程。
  • TerminatableThread :可停止的线程。其主要方法及职责如下:
    • terminate :设置线程停止标志,并发送停止“信号”给目标线程。
    • doTerminate 留给子类实现线程停止时所需的一些额外操作,如目标线程代码中包含Socket I/O,子类可以在该方法中关闭Socket以达到快速停止线程,而不会使目标线程等待I/O完成才能侦测到线程停止标记。
    • doRun 留给子类实现线程的处理逻辑。相当于Thread.run,只不过该方法中无需关心停止线程的逻辑,因为这个逻辑已经被封装在TerminatableThread的run方法中了。
    • doCleanup 留给子类实现线程停止后可能需要的一些清理动作。
  • TerminationToken 线程停止标志。toShutdown用于指示目标线程可以停止了。reservations可用于反映目标线程还有多少数量未完成的任务,以支持等目标线程处理完其任务后再行停止。

准备阶段的序列图如图2所示:

图 2. 准备阶段的序列图

Java多线程编程模式实战指南(三):Two-phase Termination模式

1、客户端代码调用线程拥有者的shutdown方法。

2、shutdown方法调用目标线程的terminate方法。

3~4、terminate方法将terminationToken的toShutdown标志设置为true。

5、terminate方法调用由TerminatableThread子类实现的doTerminate方法,使得子类可以为停止目标线程做一些其它必要的操作。

6、若terminationToken的reservations属性值为0,则表示目标线程没有未处理完的任务或者ThreadOwner在停止线程时不关心其是否有未处理的任务。此时,terminate方法会调用目标线程的interrupt方法。

7、terminate方法调用结束。

8、shutdown调用返回,此时目标线程可能还仍然在运行。

执行阶段由目标线程的代码去检查terminationToken的toShutdown属性、reservations属性的值,并捕获由interrupt方法调用抛出的相关异常以决定是否停止线程。在线程停止前由TerminatableThread子类实现的doCleanup方法会被调用。

Two-phase Termination模式实战案例

某系统需要对接告警系统以实现告警功能。告警系统是一个C/S结构的系统,它提供了一套客户端API(AlarmAgent)用于与其对接的系统给其发送告警。该系统将告警功能封装在一个名为AlarmMgr的单件类(Singleton)中,系统中其它代码需要发送告警的只需要调用该类的sendAlarm方法。该方法将告警信息缓存入队列,由专门的告警发送线程负责调用AlarmAgent的相关方法将告警信息发送至告警服务器。

告警发送线程是一个用户线程(User Thread),因此在系统的停止过程中,该线程若未停止则会阻止JVM正常关闭。所以,在系统停止过程中我们必须主动去停止告警发送线程,而非依赖JVM。为了能够尽可能快的以优雅的方式将告警发送线程停止,我们需要处理以下两个问题:

  1. 当告警缓存队列非空时,需要将队列中已有的告警信息发送至告警服务器。
  2. 由于缓存告警信息的队列是一个阻塞队列(LinkedBlockingQueue),在该队列为空的情况下,告警发送线程会一直处于等待状态。这会导致其无法响应我们的关闭线程的请求。

上述问题可以通过使用Two-phase Termination模式来解决。

AlarmMgr相当于图1中的ThreadOwner参与者实例,它是告警发送线程的拥有者。系统停止过程中调用其shutdown方法(AlarmMgr.getInstance().shutdown())即可请求告警发送线程停止。其代码如清单1所示:

清单 1. AlarmMgr源码

public class AlarmMgr {  private final BlockingQueue<AlarmInfo> alarms = new LinkedBlockingQueue<AlarmInfo>();  //告警系统客户端API  private final AlarmAgent alarmAgent = new AlarmAgent();  //告警发送线程  private final AbstractTerminatableThread alarmSendingThread;   private boolean shutdownRequested = false;   private static final AlarmMgr INSTANCE = new AlarmMgr();   private AlarmMgr() {   alarmSendingThread = new AbstractTerminatableThread() {    @Override    protected void doRun() throws Exception {     if (alarmAgent.waitUntilConnected()) {      AlarmInfo alarm;      alarm = alarms.take();      terminationToken.reservations.decrementAndGet();      try {       alarmAgent.sendAlarm(alarm);      } catch (Exception e) {       e.printStackTrace();      }     }    }     @Override    protected void doCleanup(Exception exp) {     if (null != exp) {      exp.printStackTrace();     }     alarmAgent.disconnect();    }    };    alarmAgent.init();  }   public static AlarmMgr getInstance() {   return INSTANCE;  }   public void sendAlarm(AlarmType type, String id, String extraInfo) {   final TerminationToken terminationToken = alarmSendingThread.terminationToken;   if (terminationToken.isToShutdown()) {    // log the alarm    System.err.println("rejected alarm:" + id + "," + extraInfo);    return;    }   try {    AlarmInfo alarm = new AlarmInfo(id, type);    alarm.setExtraInfo(extraInfo);    terminationToken.reservations.incrementAndGet();    alarms.add(alarm);   } catch (Throwable t) {    t.printStackTrace();   }  }   public void init() {   alarmSendingThread.start();  }   public synchronized void shutdown() {   if (shutdownRequested) {    throw new IllegalStateException("shutdown already requested!");   }      alarmSendingThread.terminate();   shutdownRequested = true;  }   public int pendingAlarms() {   return alarmSendingThread.terminationToken.reservations.get();  }  }  class AlarmAgent {  // 省略其它代码  private volatile boolean connectedToServer = false;   public void sendAlarm(AlarmInfo alarm) throws Exception {   // 省略其它代码   System.out.println("Sending " + alarm);   try {    Thread.sleep(50);   } catch (Exception e) {    }  }   public void init() {   // 省略其它代码   connectedToServer = true;  }   public void disconnect() {   // 省略其它代码   System.out.println("disconnected from alarm server.");  }   public boolean waitUntilConnected() {   // 省略其它代码   return connectedToServer;  } }

从上面的代码可以看出,AlarmMgr每接受一个告警信息放入缓存队列便将terminationToken的reservations值增加1,而告警发送线程每发送一个告警到告警服务器则将terminationToken的reservations值减少1。这为我们可以在停止告警发送线程前确保队列中现有的告警信息会被处理完毕提供了线索:AbstractTerminatableThread的run方法会根据terminationToken的reservations是否为0来判断待停止的线程已无未处理的任务,或者无需关心其是否有待处理的任务。

AbstractTerminatableThread的源码见清单2:

清单 2. AbstractTerminatableThread源码

public abstract class AbstractTerminatableThread extends Thread       implements Terminatable {  public final TerminationToken terminationToken;  public AbstractTerminatableThread() {   super();   this.terminationToken = new TerminationToken();  }       /**   *    * @param terminationToken 线程间共享的线程终止标志实例  */  public AbstractTerminatableThread(TerminationToken terminationToken) {   super();   this.terminationToken = terminationToken;  }  protected abstract void doRun() throws Exception;  protected void doCleanup(Exception cause) {}  protected void doTerminiate() {}  @Override  public void run() {   Exception ex = null;   try {    while (true) {  /*      * 在执行线程的处理逻辑前先判断线程停止的标志。      */     if (terminationToken.isToShutdown()         && terminationToken.reservations.get() <= 0) {      break;     }     doRun();    }   } catch (Exception e) {    // Allow the thread to terminate in response of a interrupt invocation    ex = e;   } finally {    doCleanup(ex);   }  }  @Override  public void interrupt() {   terminate();  }  @Override  public void terminate() {   terminationToken.setToShutdown(true);   try {    doTerminiate();   } finally {    // 若无待处理的任务,则试图强制终止线程    if (terminationToken.reservations.get() <= 0) {     super.interrupt();    }   }  } } 

AbstractTerminatableThread是一个可复用的TerminatableThread参与者实例。其terminate方法完成了线程停止的准备阶段。该方法首先将terminationToken的toShutdown变量设置为true,指示目标线程可以准备停止了。但是,此时目标线程可能处于一些阻塞(Blocking)方法的调用,如调用Object.sleep、InputStream.read等,无法侦测到该变量。调用目标线程的interrupt方法可以使一些阻塞方法(参见表1)通过抛出异常从而使目标线程停止。但也有些阻塞方法如InputStream.read并不对interrupt方法调用作出响应,此时需要由TerminatableThread的子类实现doTerminiate方法,在该方法中实现一些关闭目标线程所需的额外操作。例如,在Socket同步I/O中通过关闭socket使得使用该socket的线程若处于I/O等待会抛出SocketException。因此,terminate方法下一步调用doTerminate方法。接着,若terminationToken.reservations的值为非正数(表示目标线程无待处理任务、或者我们不关心其是否有待处理任务),则terminate方法会调用目标线程的interrupt方法,强制目标线程的阻塞方法中断,从而强制终止目标线程。

执行阶段在AbstractTerminatableThread的run方法中完成。该方法通过对TerminationToken的toShutdown属性和reservations属性的判断或者通过捕获由interrupt方法调用而抛出的异常来终止线程。并在线程终止前调用由TerminatableThread子类实现的doCleanup方法用于执行一些清理动作。

在执行阶段,由于AbstractTerminatableThread.run方法每次执行线程处理逻辑(通过调用doRun方法实现)前都先判断下toShutdown属性和reservations属性的值,在目标线程处理完其待处理的任务后(此时reservations属性的值为非正数)目标线程run方法也就退出了while循环。因此,线程的处理逻辑代码(doRun方法)将不再被调用,从而使本案例在不使用Two-phase Termination模式的情况下停止目标线程存在的两个问题得以解决(目标线程停止前可以保证其处理完待处理的任务——发送队列中现有的告警信息到服务器)和规避(目标线程发送完队列中现有的告警信息后,doRun方法不再被调用,从而避免了队列为空时BlockingQueue.take调用导致的阻塞)。

从上可知,准备阶段、执行阶段需要通过TerminationToken作为“中介”来协调二者的动作。TerminationToken的源码如清单3所示:

清单 3. TerminationToken源码

public class TerminationToken {  //使用volatile修饰,以保证无需显示锁的情况下该变量的内存可见性  protected volatile boolean toShutdown = false;  public final AtomicInteger reservations = new AtomicInteger(0);  public boolean isToShutdown() {   return toShutdown;  }  protected void setToShutdown(boolean toShutdown) {   this.toShutdown = true;  } } 

Two-phase Termination模式的评价与实现考量

Two-phase Termination模式使得我们可以对各种形式的目标线程进行优雅的停止。如目标线程调用了能够对interrupt方法调用作出响应的阻塞方法、目标线程调用了不能对interrupt方法调用作出响应的阻塞方法、目标线程作为消费者处理其它线程生产的“产品”在其停止前需要处理完现有“产品”等。Two-phase Termination模式实现的线程停止可能出现延迟,即客户端代码调用完ThreadOwner.shutdown后,该线程可能仍在运行。

本文案例展示了一个可复用的Two-phase Termination模式实现代码。读者若要自行实现该模式,可能需要注意以下几个问题。

线程停止标志

本文案例使用了TerminationToken作为目标线程可以准备停止的标志。从清单3的代码我们可以看到,TerminationToken使用了toShutdown这个boolean变量作为主要的停止标志,而非使用Thread.isInterrupted()。这是因为,调用目标线程的interrupt方法无法保证目标线程的isInterrupted()方法返回值为true:目标线程可能调用一些能够捕获InterruptedException而不保留线程中断状态的代码。另外,toShutdown这个变量为了保证内存可见性而又能避免使用显式锁的开销,采用了volatile修饰。这点也很重要,笔者曾经见过一些采用boolean变量作为线程停止标志的代码,只是这些变量没有用volatile修饰,对其访问也没有加锁,这就可能无法停止目标线程。

生产者——消费者问题中的线程停止

在多线程编程中,许多问题和一些多线程编程模式都可以看作生产者——消费者问题。停止处于生产者——消费者问题中的线程,需要考虑更多的问题:需要注意线程的停止顺序,如果消费者线程比生产者线程先停止则会导致生产者生产的新”产品“无法被处理,而如果先停止生产者线程又可能使消费者线程处于空等待(如生产者消费者采用阻塞队列中转”产品“)。并且,停止消费者线程前是否考虑要等待其处理完所有待处理的任务或者将这些任务做个备份也是个问题。本文案例部分地展示生产者——消费者问题中线程停止的处理,其核心就是通过使用TerminationToken的reservations变量:生产者每”生产“一个产品,Two-phase Termination模式的调用方代码要使reservations变量值增加1(terminationToken.reservations.incrementAndGet());消费者线程每处理一个产品,Two-phase Termination模式的调用方代码要使reservations变量值减少1(terminationToken.reservations.decrementAndGet())。当然,在停止消费者线程时如果我们不关心其待处理的任务,Two-phase Termination模式的调用方代码可以忽略对reservations变量的操作。清单4展示了一个完整的停止生产者——消费者问题中的线程的例子:

清单 4. 停止生产者——消费者问题中的线程的例子

public class ProducerConsumerStop {  class SampleConsumer<P> {   private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();    private AbstractTerminatableThread workThread      = new AbstractTerminatableThread() {    @Override    protected void doRun() throws Exception {     terminationToken.reservations.decrementAndGet();     P product = queue.take();     // ...     System.out.println(product);    }    };    public void placeProduct(P product) {    if (workThread.terminationToken.isToShutdown()) {     throw new IllegalStateException("Thread shutdown");    }    try {     queue.put(product);     workThread.terminationToken.reservations.incrementAndGet();    } catch (InterruptedException e) {     }   }    public void shutdown() {    workThread.terminate();   }    public void start() {    workThread.start();   }  }   public void test() {   final SampleConsumer<String> aConsumer = new SampleConsumer<String>();    AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {    private int i = 0;     @Override    protected void doRun() throws Exception {     aConsumer.placeProduct(String.valueOf(i));    }     @Override    protected void doCleanup(Exception cause) {     // 生产者线程停止完毕后再请求停止消费者线程     aConsumer.shutdown();    }    };    aProducer.start();   aConsumer.start();  } }

隐藏而非暴露可停止的线程

为了保证可停止的线程不被其它代码误停止,一般我们将可停止线程隐藏在线程拥有者背后,而使系统中其它代码无法直接访问该线程,正如本案例代码(见清单1)所展示:AlarmMgr定义了一个private字段alarmSendingThread用于引用告警发送线程(可停止的线程),系统中的其它代码只能通过调用AlarmMgr的shutdown方法来请求该线程停止,而非通过引用该线程对象自身来停止它。

总结

本文介绍了Two-phase Termination模式的意图及架构。并结合笔者工作经历提供了一个实际的案例用于展示一个可复用的Two-phase Termination模式实现代码,在此基础上对该模式进行了评价并分享在实际运用该模式时需要注意的事项。

参考资源

  • 本文的源代码在线阅读:https://github.com/Viscent/JavaConcurrencyPattern/
  • Brian Göetz et al., Java Concurrency In Practice
  • Mark Grand, Patterns in Java ,Volume 1, 2nd Edition

感谢郭蕾对本文的审校。

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注我们,并与我们的编辑和其他读者朋友交流。

正文到此结束
Loading...