最近遇到了一个OOM的问题,提示的是无法创建更多的线程,定位问题,发现是类似下面的这段代码出现了问题,用JConsole监测,发现某一时段线程数量忽然飙升,由此引发了下面的思考
public class DemoController { private ExecutorService executorService = Executors.newWorkStealingPool(20); @RequestMapping("/test") public String test() { ExecutorService forkJoinPool = Executors.newWorkStealingPool(10); CompletableFuture[] completableFutures = new CompletableFuture[600]; for (int i = 0; i < 600; i++) { int j = i; completableFutures[i] = CompletableFuture.runAsync(() -> { getAssociatedInfo(forkJoinPool); }, forkJoinPool); } CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures); voidCompletableFuture.join(); return "OK"; } public String getAssociatedInfo(ExecutorService service) { CompletableFuture<String> trialAssociatedInfoCompletableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("按理说你已在运行,不是吗"); TimeUnit.SECONDS.sleep(100); System.out.println("你已经完成了"); } catch (InterruptedException e) { e.printStackTrace(); } return "a"; }, executorService); CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(trialAssociatedInfoCompletableFuture); voidCompletableFuture.join(); return "ok"; } } 复制代码
在这段代码中,http线程启600个任务,使用自定义的线程池并发数量为10个。每一个任务启一个子任务,使用类定义的线程池,并发数量为20个。在我的理解中,按理说最多多三十多个线程数量才对,但是短时间内居然飙升好几百,那么这几百个线程到底是如何产生的呢?在研究了源代码之后,终于理解了其中的奥秘。
completableFutures[i] = CompletableFuture.runAsync(() -> { getAssociatedInfo(forkJoinPool); }, forkJoinPool); 复制代码
这一句的作用是启异步任务,交由forkJoinPool线程池管理,当线程池数量不足10个时,启动一个线程,立即执行,当超过10个时,加入任务队列。
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(completableFutures); 复制代码
allOf的作用是递归地构造完成树,汇总并返回成一个总任务,如下图所示:
这里的任务1、任务2等就是我们前面定义的任务,每个任务对象存储着一个结果,当最终任务有结果时,必须要下面的汇总任务都有结果,进而每一个定义的任务都要有结果,通俗来说,就是对voidCompletableFuture的管理即为对所有定义任务的管理。
// 从多线程的角度,若任务未完成,会阻塞 voidCompletableFuture.join(); return "OK"; CompletableFuture->join(): return reportJoin((r = result) == null ? waitingGet(false) : r); CompletableFuture->waitingGet(): Signaller q = null; boolean queued = false; int spins = -1; Object r; // 当返回任务不为空,循环结束 while ((r = result) == null) { if (spins < 0) spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8 : 0; // Use brief spin-wait on multiprocessors else if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) // 实例化一个信号量 --1 q = new Signaller(interruptible, 0L, 0L); else if (!queued) queued = tryPushStack(q); else if (interruptible && q.interruptControl < 0) { q.thread = null; cleanStack(); return null; } else if (q.thread != null && result == null) { try { // 若迟迟没有返回结果,最终会走到这个方法中,下面是ForkJoinPool对信号量的管理 ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } ForkJoinPool->managedBlock(): Thread t = Thread.currentThread(); if ((t instanceof ForkJoinWorkerThread) && (p = (wt = (ForkJoinWorkerThread)t).pool) != null) { WorkQueue w = wt.workQueue; while (!blocker.isReleasable()) { // if (p.tryCompensate(w)) { // --2 try { do {} while (!blocker.isReleasable() && !blocker.block()); } finally { U.getAndAddLong(p, CTL, AC_UNIT); } break; } } } else { do {} while (!blocker.isReleasable() && !blocker.block()); } ForkJoinPool->tryCompensate(): // --2 canBlock = add && createWorker(); // throws on exception 复制代码
CompletableFuture->Signaller->Signaller(): // --1 Signaller(boolean interruptible, long nanos, long deadline) { // thread变量是当前线程 this.thread = Thread.currentThread(); this.interruptControl = interruptible ? 1 : 0; this.nanos = nanos; this.deadline = deadline; } 复制代码
到第一个voidCompletableFuture.join(),该线程是http线程,由forkJoinPool线程池管理,最多10个线程并行,然后到waitingGet(),由于其不是forkJoin线程,因此走的是else方法
到第二个voidCompletableFuture.join(),该线程是forkJoinPool执行的任务,每一个任务都会执行一次getAssociatedInfo方法,由executorService线程池管理,最多20个线程并行,然后到waitingGet(),由于它是forkJoin线程,所以会新建一个线程,帮助执行forkJoinPool线程池里的任务,然而受到executorService线程池数量的制约,即使线程数多了,也不能加快执行,随着越来越多getAssociatedInfo方法的Join,导致了线程数量的飙升,又不能即时释放,最终导致了OOM的发生
猜想:将http线程的任务与forkJoinPool线程池的任务放在同一线程池,这样每当forkJoinPool线程池新产生一个线程时,都能窃取到任务从而执行,并且随着线程数量的上升,越来越多的任务被执行,这样就减少了线程创建的数量。最终的结果果然如此