java.util.concurrent.ThreadPoolExecutor
这个类。在搞清楚这个类怎么用之前,我们先要过一遍基本概念。
初学者可能会觉得这套机制有些复杂,所以最好多看几遍以加深理解。
当创建一个 ThreadPoolExecutor
对象时,其中的池和任务队列都会被创建起来。创建的时候,池和任务队列都会有一些配置项:
类型 | 配置项 |
---|---|
int | corePoolSize |
int | maximumPoolSize |
long | keepAliveTime |
TimeUnit | unit |
BlockingQueue<Runnable> | workQueue |
ThreadFactory | threadFactory |
RejectedExecutionHandler | handler |
这些配置项可以在 ThreadPoolExecutor
的构造方法中找到。下面分别介绍下。
maximumPoolSize
指的是池的最大可用线程数。如果池中的线程数达到这个最大值,就不会再增加新的线程了。
corePoolSize
可以理解为线程池“想要持有的线程数量”,什么意思呢?
keepAliveTime
和 unit
就是用来指定当一个线程空闲多长时间,池需要检查该不该销毁它。这个配置仅当实际线程数多于 corePoolSize
时有效,当实际线程数少于或等于 corePoolSize
时,不论线程空闲多长时间都不会被销毁。 threadFactory
用来指定一个创建线程的工厂对象。 ThreadPoolExecutor
有一个默认的线程工厂,同时允许我们通过这个参数来自己定义如何创建线程。
首先最大线程数应该是多少?这个取决于任务是 CPU 密集型还是 I/O 密集型。前者主要消耗 CPU,后者主要读写网络或磁盘或其他的流。如果任务是 CPU 密集型,那么线程池的 maximumPoolSize
超过主机 CPU 核数是没有意义的,一般设置为跟核数一样即可。如果任务是 I/O 密集型的,那么线程池可以设置的非常大,一个典型例子是 Tomcat 的线程池配置,设置为几百上千的都有。
其次是是否有必要自定义 threadFactory
参数?答案是有必要,我们需要给每一个线程起名字,这样在运维的时候我们就能直观的看到一个线程是做什么的。下面是一个简单的例子:
/** * 用于创建带名字的线程的线程池 */ public class NamedThreadFactory implements ThreadFactory { private String name; private AtomicInteger counter = new AtomicInteger(); public NamedThreadFactory(String name) { this.name = name; } @Override public Thread newThread(Runnable r) { return new Thread(r, name + "-" + counter.incrementAndGet()); } }
任务队列是一个 BlockingQueue
对象(我们通常选用 LinkedBlockingQueue
,它的结构使得操作队列头尾的效率最高)。 LinkedBlockingQueue
只有一个配置参数,就是队列长度。默认的队列长度为 Integer.MAX_VALUE
。
最后一个配置就是 ThreadPoolExecutor
的 handler
参数,类型是 RejectedExecutionHandler
,其含义是当队列满了(此时池中也必然没有空闲的线程)的时候,对于新的任务该如何处理。Java 默认提供下面几个 RejectedExecutionHandler
的子类:
CallerRunsPolicy
:让提交任务的线程自己去执行这个任务,也就意味着这个线程会因此而阻塞。 AbortPolicy
:拒绝执行这个任务,并且在提交任务的线程中抛出 RejectedExecutionException
异常。 DiscardPolicy
:同样拒绝执行这个任务,但是不抛出异常。 DiscardOldestPolicy
:从任务队列中把最早加入的任务丢弃,然后把当前任务加进任务队列。 首先不可使用默认的队列长度。如果这么做的话,在队列满掉之前,主机的内存就已经被撑爆,进程就挂掉了。队列长度直接影响了内存使用,因此要合理安排。
其次该选用哪个 RejectedExecutionHandler
,一般情况下都会选 CallerRunsPolicy
,因为只有它才不会把任务给丢了呀。
有了前面的说明,你应该知道这些参数怎么用了。下面是一个例子:
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(10), new NamedThreadFactory("DEMO") // 这个类参考上面的代码示例 );
只要调用线程池的 execute(Runnable)
方法就能将任务提交给线程池执行。
注意任务内容当中一定要捕获所有的异常,线程池本身可不会对异常做任何处理,包括打印错误日志,这些必须你自己来做。
下面是一个例子:
Runnable task = () -> System.out.println( "Hello from thread " + Thread.currentThread().getName() ); // 向线程池提交 10 个任务 for (int i = 0; i < 10; i++) { executor.execute(task); }
当应用结束的时候,有两个要素要考虑:
首先我们在设计上应该保证,队列中未被处理的任务是可以随时丢弃的。下次启动线程池后,这些未处理的任务可以重新再提交给线程池,业务不受影响。
其次对于正在执行的任务,我们应该保证,一旦在执行任务过程中,进程被结束,那么有两种处理策略:
但不管怎么样,我们希望当进程结束时,应该尽可能等待正在处理的任务执行完成,以减少出错的可能性。 ThreadPoolExecutor
提供这样一种方式。下面是一个例子:
// 告诉线程池不再接受新的任务,也不再处理队列中的任务 executor.shutdownNow(); // 等待线程池中正在执行的任务都处理完毕,最多1小时 executor.awaitTermination(1, TimeUnit.HOURS);
以上就是对线程池的介绍。