new Thread
创建线程问题: Thread线程
属于一个重量级的对象,通过 new Thread
创建一个线程,首先它是一个 Java对象
,需要分配堆空间资源;同时 Thread
需要调用操作 系统内核API
,在 系统层面创建一个线程与此对应 ,操作系统还要为线程分配一系列资源。所以,创建线程的成本是很高的,应该 避免频繁创建和销毁线程 。
另外,线程缺乏统一管理,可能无限制新建线程,相互之间竞争加剧,以及可能占用过多系统资源导致服务器宕机或 OOM
等。
如何去解决这个问题,就是采用经常使用到的 资源池 方案,比如数据库连接池等,将资源提前初始化后放入到池中进行管理,待需要使用时从池中获取一个空闲资源,使用完后再将资源放回到池中达到释放目的,这样其它任务就可以继续重复使用该资源,避免资源被不停创建、销毁。
由于 Thread API
在接口设计上的问题,线程池和一般的资源池在使用上是有些差异的,比如连接池:从连接池获取可用连接 --> 使用连接执行任务 --> 将连接放入到连接池。如果我们从线程池中获取到一个 Thread
对象,根本没法处理我们的任务,因为 Thread
线程在启动之前要么 重写 run()
、要么 传入 Runnable
方式将任务和 Thread
绑定在一起。所以, Java线程池
是没有提供 申请线程
和 释放线程
的方法,而是采用一种 生产者/消费者模式 去构建线程池执行机制。
Java
线程 Thread
是被一对一映射到本地操作系统线程,即 Java
启动时会创建一个本地操作系统线程,当 Java
线程终止时,对应操作系统线程会被回收。
Java 5
之前,仅仅只能使用 Thread
、 Runnable
、 ThreadLocal
、 synchronized
等进行多线程开发,线程的使用及其简陋; Java 5
极大的改善并发编程,构建出了多线程开发 API
的基础体系,这些类主要位于 java.util.concurrent
包下,简称 J.U.C
。
Executor
就是 J.U.C
中比较重要的一块,用于构建多线程开发中使用最普遍的线程池:
Executor
【接口】:最顶层接口,该接口只定义了一个方法:
void execute(Runnable command)
ExecutorService Executor Callable Future
ScheduledExecutorService
【接口】:继承了
ExecutorService
,增加了定时任务相关的方法
ThreadPoolExecutor
【实现类】:
ExecutorService
的默认实现
ScheduledThreadPoolExecutor ThreadPoolExecutor ScheduledExecutorService ScheduledThreadPoolExecutor
ExecutorService
方法描述:
shutdown()
:优雅关闭线程池,之前提交的任务将被执行,包括当前正在执行的和等待队列中的任务,但是线程池不会再接收新任务,提交新任务会抛出异常
shutdownNow()
:调用 shutdownNow
方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断, 该方法会立刻返回,返回值为这时候队列里面被丢弃的任务列表。
isShutdown()
:如果此线程池关闭,则返回true
isTerminated()
:如果关闭后所有任务都已完成,则返回true
awaitTermination()
:当线程调用 awaitTermination
方法后,当前线程会被阻塞,直到线程池状态变为 TERMINATED
才返回, 或者等待时间超时才返回、或当前线程被中断
submit()
系列方法:
public Future<?> submit(Runnable task) Runnable Runnable submit Future Future.get() Runnable Future.get() null
public <T> Future<T> submit(Runnable task, T result) submit(Runnable task) Runnable Future.get()
public <T> Future<T> submit(Callable<T> task)
:提交一个
Callable
任务,该任务是带有返回结果的
invokeAll()
系列方法:
invokeAll(Collection<? extends Callable<T>> tasks)
:执行给定任务集合,执行完毕后返回结果
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) Future.get() CancellationException
invokeAny()
系列方法:参照 invokeAll()
方法
T invokeAny(Collection<? extends Callable<T>> tasks)
:执行给定的任务集合,任意一个执行成功则返回结果,其他任务终止
T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:执行给定的任务集合,任意一个执行成功或超时,则返回结果,其他任务终止
注意: invoke
方法是阻塞方法,即提交的任务执行完或超时才会返回结果,而 submit
系列方法是会立即返回 Future
结果。区分一个方法是不是阻塞方法可以通过方法签名是否抛出 InterruptedException
:
public Future<?> submit(Runnable task) public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
ScheduledExecutorService
schedule(Runnable command, long delay, TimeUnit unit)
schedule(Callable<V> callable, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
:周期是以两个任务开始时间间隔为周期,初始延迟时间后会触发第一次执行,执行过程中发生异常,那么任务就停止,一次任务执行时长超过周期时间,那下一次任务会等到该任务执行结束后,立即执行。
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
:周期是以上一个任务结束到下一个任务开始时间间隔为周期,指的是 以固定的延时 执行, initialDelay
初次延迟时间, delay
指的是上一次执行终止和下一次执行开始之间的延迟。
RUNNING
:接收新任务并处理排队任务;
SHUTDOWN
:不接收新任务,但处理排队任务,调用
shutdown()
会处于该状态;
STOP
:不接收新任务,也不处理排队任务,并中断正在运行的任务,调用
shutdownNow()
会处于该状态;
TIDYING workerCount TIDYING terminate()
TERMINATED
:
terminate()
运行完成;
参见 ThreadPoolExecutor
:
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态具体可参加线程池终止一节中源码分析。
线程池中主要有:
AtomicInteger ctl
:二进制高3位表示线程池状态,后29位记录工作线程数量,所以线程池所能够支持的最大线程数:
2^29 - 1 = 536870911
;
线程池状态高3位表示:
RUNNING = -1 << COUNT_BITS;//111 SHUTDOWN = 0 << COUNT_BITS;//000 STOP = 1 << COUNT_BITS;//010 TIDYING = 2 << COUNT_BITS;//100 TERMINATED = 3 << COUNT_BITS;//110
Worker Worker Worker Thread Worker firstTask Worker getTask
ThreadPoolExecutor
定义:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize
:核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
maximumPoolSize
:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量
maxPoolSize
keepAliveTime 工作线程数 > 核心线程数 线程空闲超时 allowCoreThreadTimeOut(true) 工作线程数 核心线程数
workQueue
: ArrayBlockingQueue
:等待队列有固定大小;
LinkedBlockingQueue corePoolSize corePoolSize maximumPoolSize corePoolSize
SynchronousQueue Executors newCachedThreadPool()
拒绝策略 handler
:如若使用拒绝策略,等待队列一定要设置成有界队列才行;若等待队列已满,则在总线程数不大于 maximumPoolSize
的前提下,创建新的线程;若线程数大于 maximumPoolSize
,则执行拒绝策略
AbortPolicy
:拒绝任务,并且抛出异常
CallerRunsPolicy
:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
DiscardOldestPolicy
:丢弃最老的一个请求,然后把当前任务重新加入到队列中
DiscardPolicy
:丢弃无法处理的任务,没有异常信息
如果需要自定义拒绝策略可以实现 RejectedExecutionHandler
接口
工作线程数 < corePoolSize
,即使其它工作线程处于空闲状态,也会创建一个新线程来运行新任务;
工作线程数 >= corePoolSize
,但
工作线程数 < maximumPoolSize
,则提交的任务会被放入到等待队列中;
等待队列已满
,并且
工作线程数 < maxPoolSize
,则创建一个新线程来运行新任务,这个机制可能很多人都会搞错,这里就容易出现之前提交的任务还在等待队列中阻塞,但是新提交任务却被执行情况,千万注意;
工作线程数 >= maxPoolSize
,这时提交的新任务采用拒绝策略处理;
等待队列 corePoolSize LinkedBlockingQueue 等待队列 工作线程数 corePoolSize maximumPoolSize
keepAliveTime
参数;
shutdown
和
shutdownNow
方法终止线程池时,最终会把所有的工作线程都销毁,具体参见后续源码分析;
Executors
类似于 Collection
和 Collections
关系, Executors
是 Executor
的工具类,提供了一些创建线程池方法,主要如下:
ExecutorService newFixedThreadPool(int nThreads)
:创建一个固定大小、任务队列容量无界的线程池;
ExecutorService newCachedThreadPool() Integer.MAX_VALUE SynchronousQueue
ExecutorService newSingleThreadExecutor()
:只有一个线程来执行无界任务队列的单一线程池,该线程池确保任务按照加入的顺序一个一个依次执行,当唯一的线程因任务异常终止时,将创建一个新的线程来继续执行后续的任务;
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
:能定时执行任务的线程池,线程池中核心线程数由参数指定,最大线程数=
Integer.MAX_VALUE
;
FixedThreadPool
和 SingleThreadExecutor
的 Queue
是 LinkedBlockingQueue
,因为固定线程,线程满的话需要等待队列去缓存任务;
CachedThreadPool
使用的 Queue
是 SynchronousQueue
,不需要缓存任务,因为最大线程数没有限制;
ScheduledThreadPool
来说,它使用的是延迟队列, DelayedWorkQueue
;
JDK1.8
新加入了一种线程池: workStealingPool
Executors
工具类方式创建线程池不建议在生产开发中直接使用,因为创建出来的线程池比较粗糙,或多或少都存在一些问题,一般只在测试用例中使用,阿里开发手册中也是规定不能在代码中直接通过 Executors
方式创建线程池。 CompletionService
ExecutorService
+ Future
在处理批量提交异步任务并获取结果时可能会存在一些问题,如下:
public void executorServiceTest() throws InterruptedException { List<Future<Integer>> futures = new ArrayList<>(); for(int i=0;i<10;i++){ Future<Integer> future = executor.submit(new Task()); futures.add(future); } //Future.get获取处理结果时,按照顺序获取 for (int i = 0, size = futures.size(); i < size; i++) { Future<Integer> f = futures.get(i); try { Integer ret = f.get(); log.info("ret:{}", ret); } catch (InterruptedException | ExecutionException e) { } } } class Task implements Callable<Integer>{ private Random random = new Random(); @Override public Integer call() throws Exception { int v = random.nextInt(500); TimeUnit.MICROSECONDS.sleep(v); return v; } }
如上述案例:通过 executor.submit()
批量提交 10个任务
,然后获取这些任务的结果,只能顺序通过 Future.get()
获取,因为无法知道哪些任务先完成,这就造成即使有些任务先完成,由于前面任务没有完成依然被阻塞。
CompletionService
对 ExecutorService
进行包装,将结果集 按照完成时间的顺序放入到阻塞队列中,获取结果时只需要从阻塞队列中获取即可,即实现:先完成的任务先获取结果 。
CompletionService
逻辑如上图,其实现代码也很简单:
1、将任务封装成 FutureTask
,然后提交到内部线程池中执行任务;
2、 FutureTask#done()
方法会在任务被完成时进行回调的方法, CompletionService
就是重写该方法,在 FutureTask
任务完成时添加到阻塞队列中去,这样就可以从阻塞队列中获取已完成的任务,见下:
protected void done() { completionQueue.add(task); }
Tips : ExecutorCompletionService
类中使用 LinkedBlockingQueue
无界队列存储结果集,所以,一定要及时去取结果,不然完成的任务结果不停的堆积到阻塞队列中,可能会撑爆内存空间。
在生产开发中建议更多的使用线程池技术,除去 性能 方面考虑外,从 软件设计 上线程池比之前介绍的创建线程方式更好:
Runnable
、 Callable
中,通过 submit()
、 invoke()
等方式提交给线程池;而多线程如何创建、运行等执行机制是由 Executor
框架提供,一般情况下对于开发者是不需要关心的。 Thread
( JDK1.0
就存在)可能是由于出现的比较早,本身在接口设计上有那么点不是很完善的地方,比如我们的任务封装到 Runnable
中需要和 Thread
绑定完成后才能使用 start
启动线程执行任务。执行完成这个线程就废弃了,而不能通过重新绑定 Runnable
实现执行新任务,这就导致 Thread
类接口设计上就将任务和执行很紧密的耦合在一起了,即使 Runnable
、 Callable
接口的出现在代码层面将任务逻辑代码部分剥离出来,但是在执行时依然需要先绑定才能启动线程,而不能做到像线程池在运行时有新任务随时提交。