掌握线程池是后端程序员的基本要求,相信大家求职面试过程中,几乎都会被问到有关于线程池的问题。我在网上搜集了几道经典的线程池面试题,并以此为切入点,谈谈我对线程池的理解。如果有哪里理解不正确,非常希望大家指出,接下来大家一起分析学习吧。
线程池:简单理解,它就是一个管理线程的池子。
线程池可以通过ThreadPoolExecutor来创建,我们来看一下它的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
几个核心参数的作用:
好的,到这里。 面试问题1->Java的线程池说一下,各个参数的作用,如何进行的? 是否已经迎刃而解啦, 我觉得这个问题,回答: 线程池构造函数的corePoolSize,maximumPoolSize等参数,并且能描述清楚线程池的执行流程 就差不多啦。
在使用线程池处理任务的时候,任务代码可能抛出RuntimeException,抛出异常后,线程池可能捕获它,也可能创建一个新的线程来代替异常的线程,我们可能无法感知任务出现了异常,因此我们需要考虑线程池异常情况。
我们先来看一段代码:
ExecutorService threadPool = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { threadPool.submit(() -> { System.out.println("current thread name" + Thread.currentThread().getName()); Object object = null; System.out.print("result## "+object.toString()); }); } 复制代码
显然,这段代码会有异常,我们再来看看执行结果
虽然没有结果输出,但是没有抛出异常,所以我们无法感知任务出现了异常,所以需要添加try/catch。 如下图:
OK,线程的异常处理, 我们可以直接try...catch捕获。通过debug上面有异常的submit方法( 建议大家也去debug看一下,图上的每个方法内部是我打断点的地方 ),处理有异常submit方法的主要执行流程图:
//构造feature对象 /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } //线程池执行 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } //捕获异常 public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } 复制代码
通过以上分析, submit执行的任务,可以通过Future对象的get方法接收抛出的异常,再进行处理。 我们再通过一个demo,看一下Future对象的get方法处理异常的姿势,如下图:
除了以上 1.在任务代码try/catch捕获异常,2.通过Future对象的get方法接收抛出的异常,再处理 两种方案外,还有以上两种方案:
我们直接看这样实现的正确姿势:
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> { Thread t = new Thread(r); t.setUncaughtExceptionHandler( (t1, e) -> { System.out.println(t1.getName() + "线程抛出的异常"+e); }); return t; }); threadPool.execute(()->{ Object object = null; System.out.print("result## " + object.toString()); }); 复制代码
运行结果:
这是jdk文档的一个demo:
class ExtendedExecutor extends ThreadPoolExecutor { // 这可是jdk文档里面给的例子。。 protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); if (t == null && r instanceof Future<?>) { try { Object result = ((Future<?>) r).get(); } catch (CancellationException ce) { t = ce; } catch (ExecutionException ee) { t = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (t != null) System.out.println(t); } }} 复制代码
。
ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。
LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;
SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。
针对面试题: 线程池都有哪几种工作队列? 我觉得, 回答以上几种ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue等,说出它们的特点,并结合使用到对应队列的常用线程池(如newFixedThreadPool线程池使用LinkedBlockingQueue),进行展开阐述, 就可以啦。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } 复制代码
ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < Integer.MAX_VALUE; i++) { executor.execute(()->{ try { Thread.sleep(10000); } catch (InterruptedException e) { //do nothing } }); 复制代码
IDE指定JVM参数:-Xmx8m -Xms8m :
run以上代码,会抛出OOM:
答案 :会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长(比如,上面demo设置了10秒),会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。
FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } 复制代码
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。
ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { executor.execute(() -> { System.out.println(Thread.currentThread().getName()+"正在执行"); }); } 复制代码
运行结果:
用于并发执行大量短期的小任务。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } 复制代码
ExecutorService executor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { executor.execute(() -> { System.out.println(Thread.currentThread().getName()+"正在执行"); }); } 复制代码
运行结果:
适用于串行执行任务的场景,一个任务一个任务地执行。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } 复制代码
/** 创建一个给定初始延迟的间隔性的任务,之后的下次执行时间是上一次任务从执行到结束所需要的时间+* 给定的间隔时间 */ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleWithFixedDelay(()->{ System.out.println("current Time" + System.currentTimeMillis()); System.out.println(Thread.currentThread().getName()+"正在执行"); }, 1, 3, TimeUnit.SECONDS); 复制代码
运行结果:
/** 创建一个给定初始延迟的间隔性的任务,之后的每次任务执行时间为 初始延迟 + N * delay(间隔) */ ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(()->{ System.out.println("current Time" + System.currentTimeMillis()); System.out.println(Thread.currentThread().getName()+"正在执行"); }, 1, 3, TimeUnit.SECONDS);; 复制代码
周期性执行任务的场景,需要限制线程数量的场景
回到面试题: 说说几种常见的线程池及使用场景?
回答这四种经典线程池 :newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool,分线程池特点,工作机制,使用场景分开描述,再分析可能存在的问题,比如newFixedThreadPool内存飙升问题 即可
线程池有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
//线程池状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; 复制代码
TERMINATED该状态表示线程池彻底终止