ExecutorService抽象从java5一直持续到现在。我们在这里讨论2004,简单提醒一下:java5和java6将不会被支持,java7 won’t be in half a year 。我提出这个问题的原因是因为很多java程序员仍然不能完全理解ExecutorService的工作原理。有很多地方需要了解,今天我想分享一些鲜为人知的特性和实践。然而这篇文章针对中级程序员,没有特别牛逼的。
我不能强调这一点。在dump一个运行的jvm的所有线程或者调试的时候,默认的线程池命名模式是pool-N-thread-M,其中N代表线程池序列号(每次你创建一个新的线程池,全局N计数增加),M是线程池里面线程的序列号。例如pool-2-thread-3表示在jvm进程生命周期里面创建的第二个线程池的第三个线程。参阅:Executors.defaultThreadFactory()。不是很具描述性。JDK使正确命名线程变得稍微有的复杂因为命名策略隐藏在ThreadFactory。幸运的是Guava(google开源的一组工具类集合)工具包有个帮助类来做这件事情:
import com.google.common.util.concurrent.ThreadFactoryBuilder; //通过ThreadFactoryBuilder这个类来设置线程池的名称并返回一个ThreadFactory final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("Orders-%d") .setDaemon(true) .build(); final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
默认情况下线程池创建非守护线程,取决于你是否需要这种类型的线程。
这是一个我从 supercharged-jstack-how-to-debug-your-servers-at-100mph 学到的技巧,一旦我们记住线程的名称,我们可以在运行随时修改他们!这是有意义的因为线程dump显示了类和方法名称,而不是参数和本地变量。通过调整线程名称来保留一些基本的事物标识符,我们可以容易跟踪哪个消息/记录/查询等很慢或者引起了死锁。例如:
private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + messageId); try { //这里是业务逻辑... } finally { currentThread.setName(oldName); } }); }
在try-finally代码块里面当前线程命名为Processing-WHATEVER-MESSAGE-ID-IS。当追踪经过这个系统的消息的时候这可能会带来便利。
在客户线程(待提交运行的任务)和线程池(执行任务的线程)之间有个任务对列。当你的应用程序关闭的时候,你必须关心两间事情:排队的任务如何处理以及已经在线程池里面的任务怎么运行(稍后会详细介绍)。令人惊讶的是很多开发者并没有正确地或有意识地关闭线程池。有两种技术:让所有排队的任务执行(通过shutdown()这个方法)或者从队列删除他们(通过shutdownNow()这个方法)-完全取决于你的实际情况。例如如果我们想提交一堆任务并且想当它们都完成了才结束,这个情况可以使用shutdown():
private void sendAllEmails(List <string> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES); log.debug("All e-mails were sent so far? {}", done); </string>
在这个场景,我们发送一堆邮件,每一个邮件的发送在线程池里面作为一个单独的任务。在提交这些任务之后我们关闭线程池以至于它不再接收新的任务。这时候我们等待最长一分钟,直到这些任务都完成。然而一些任务仍然未结束,awaitTermination()会返回false。此外,未结束的任务会继续执行。我知道赶时髦的人这样做:
emails.parallelStream().forEach(this::sendEmail);
称我为老式的,但是我喜欢控制并发线程的数量。不要紧,一个优雅替代shutdown()的是shutdownNow():
final List <runnable> rejected = executorService.shutdownNow(); log.debug("Rejected tasks: {}", rejected.size()); </runnable>
这次所有排队任务会被丢弃返回。已经执行的任务任然可以继续执行。
Future接口的鲜为人知的功能是取消。查看之前的文章 InterruptedException and interrupting threads explained
大小不正确的线程池可能导致缓慢,不稳定以及内存溢出。如果你配置太少的线程,队列会堆积,耗费很多内存。另一方面太多的线程会减慢整个系统,因为过多的线程上下文切换 - 并导致和之前相当的症状。查看队列的深度并保持有界很重要,因此超载的线程暂时拒绝新的任务。
final BlockingQueue <runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue); </runnable>
上面的代码等同于Executors.newFixedThreadPool(n)。而不是使用默认的无界LinkedBlockingQueue我们使用容量大小为100的ArrayBlockingQueue。这意味着如果一百个任务已经在队列里面了(并且n个正在执行)新的任务会被拒绝返回RejectedExecutionException。此外,由于队列现在可以在外部使用,我们可以定期调用size()并将其放在logs / JMX /任何您使用的监视机制中。
下面代码片段的结果是?
executorService.submit(() -> { System.out.println(1 / 0); });
我被上面的代码坑过很多次了,它不会打印任何东西。没有 java.lang.ArithmeticException: / by zero 整个的标注,什么都没有。线程池只是吞掉整个异常,好像它从未发生过一样。如果它是一个从头开始创建的线程,UncaughtExceptionHandler 可以工作。但是使用线程池必须更加小心。如果你提交一个Runnable(像上面没有任何结果)你需要用try catche包括代码主体,并且打印日志。如果你提交一个Callable,确保你总是使用阻塞get()取消引用它来重新抛出异常:
final Future <integer> division = executorService.submit(() -> 1 / 0); //below will throw ExecutionException caused by ArithmeticException division.get(); </integer>
有趣的是即使spring框架使用@Async提交了整个bug,参阅:SPR-8995和SPR-12090。
监控工作队列深度是一方面。然而在追踪单个事物/任务问题的时候很有必要看下在提交任务和实际执行之间花了多少时间。该持续时间应该优先为0(当线程池里面有空闲线程的时候),然而它会增长当任务必须排队的时候。此外,如果线程池没有一个固定的线程数,运行新的任务需要创建新的线程,也会消耗短暂的时间。为了清楚的监控这个指标,用与此类似的东西包转原来的ExecutorService:
public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this.target = target; } @Override public <t> Future <t> submit(Callable <t> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug("Task {} spent {}ms in queue", task, queueDuration); return task.call(); } ); } @Override public <t> Future <t> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit(new Callable <void> () { @Override public Void call() throws Exception { task.run(); return null; } }); } //... } </void> </t> </t> </t> </t> </t>
这不是一个完整的实现,但你得到了基本的思想。当我们像线程池提交任务的时候,我们立即测量开始时间,一旦任务被选择并执行我们就停止测量。不要被源代码中的startTime和queueDuration非常接近而迷惑。事实上,这两行是在不同的线程中进行评估的,可能是几毫秒甚至几秒,例如:
ask com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue
如今,反应式编程似乎引起了很多关注。 Reactive manifesto, reactive streams, RxJava (just released 1.0!), Clojure agents, scala.rx… 他们都很好用,但堆栈跟踪不再是你的朋友,它们至多是无用的。例如,在提交给线程池的任务中发生异常:
java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na] at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0] at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
我们容易地发下在76行MyTask threw NPE,但是我们不知道谁提交了这个任务,因为堆栈跟踪只显示Thread和ThreadPoolExecutor。我们可以在技术上浏览源代码,希望找到一个创建MyTask的地方。但是没有线程(更不用说事件驱动,响应式, actor-ninja-programming)我们可以立即看到全貌。如果我们可以保留客户端代码的堆栈并且显示它,例如如果失败了?这个想法并不新鲜,例如Hazelcast 从所有者代码传递异常到客服端代码。这就是在出现故障时保持客户端堆栈跟踪的天真支持:
public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this.target = target; } @Override public <t> Future <t> submit(Callable <t> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <t> Callable <t> wrap(final Callable <t> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception("Client stack trace"); } @Override public <t> List <future> <t> > invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map(this::submit).collect(toList()); } //... } </t> </future> </t> </t> </t> </t> </t> </t> </t>
这次在出现故障的时候我们可以检索提交任务的地方所有的堆栈和线程的名称。与之前看到的标准异常相比,它更有价值:
Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na] at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na] at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0] at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0] at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
在Java 8中引入了更强大的CompletableFuture。请尽可能使用它。ExecutorService未扩展为支持这种增强的抽象,因此您必须自己处理它。代替:
final Future <bigdecimal> future = executorService.submit(this::calculate); </bigdecimal>
这样做:
final CompletableFuture <bigdecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService); </bigdecimal>
CompletableFuture扩展了Future,所以一切都像以前一样工作。 但是,API的更高级消费者将真正欣赏CompletableFuture提供的扩展功能。
SynchronousQueue是一个有趣的BlockingQueue,它不是真正的队列。 它本身甚至都不是数据结构。 最好将其解释为容量为0的队列。引用JavaDoc:
”each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.“
这个怎么和线程池关联上呢?尝试将SynchronousQueue与ThreadPoolExecutor一起使用:
BlockingQueue <runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue); </runnable>
我们创建了一个带有两个线程的线程池,并在它前面有一个SynchronousQueue。因为SynchronousQueue本质上是一个容量为0的队列,所以如果有可用的空闲线程,这样的ExecutorService将只接受新任务。 如果所有线程都忙,新任务将立即被拒绝,永远不会等待。 当在后台处理必须立即开始或被丢弃时,这个执行方式是被期待的。
就是这样,我希望你找到至少一个有趣的功能!