转载

Java并发篇-全面解析Executor框架

成员分为四个部分:任务、任务执行、任务执行结果以及任务执行工具类

任务:实现 Callable 接口或 Runnable 接口

任务执行部分: ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor

任务执行结果: Future 接口以及 FutureTask 实现类

任务执行工厂类: Executors

任务执行框架

ThreadPoolExecutor(核心)

ThreadPoolExecutor 通常使用 Executors 进行创建,其内部含有三种线程池:

  1. FixedThreadPool :含有 固定线程数 的线程池。

  2. SingleThreadExecutor单线程 的线程池,需要保证任务顺序执行时采用。

  3. CachedThreadPool大小无界 的线程池,只要需要线程就可以一直创建线程。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 通常使用 Executors 进行创建,其内部含有两种线程池:

  1. ScheduledThreadPoolExecutor :含有 固定线程数 的定时任务线程池
  2. SingleThreadScheduledExecutor :只 包含一个线程数 的定时任务线程池,需要保证任务顺序执行时采用。

任务执行结果

提交任务给任务执行框架执行后,任务执行框架会返回一个 Future 接口类型的对象,该对象内部包含了任务执行结果信息。

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<> submit(Runnable task)
复制代码

在JDK1.8返回的是 FutureTask 对象,但不一定返回的就是 FutureTask 对象,只保证返回 Future 接口。

任务

实现 RunnableCallable 均可被任务执行框架执行,它们之间的区别是实现 Runnable 的任务执行完成后 不会返回结果 ,而实现 Callable 接口的任务 可以返回 结果。

返回结果为null的封装(没有什么意义)

public static Callable<Object> callable(Runnable task)
复制代码

拥有返回结果的封装

public static <T> Callable<T> callable(Runnable task, T result)
复制代码

ThreadPoolExecutor

上一篇讲过ThreadPoolExecutor构造函数中的参数含义,这里不再赘述,如果忘记了或者还没阅读过,可以先去了解一下,两篇文章是衔接的: juejin.im/post/5e9c45…

主要参数有4个:

corePolSize
maximumPoolSize
BlockingQueue
RejectedExecutionHandler

FixedThreadPool详解

fixedThreadPool 称为 可重用固定线程数 的线程池,下面是构造该线程池的方法源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
复制代码

参数解读

  1. corePoolSizemaximumPoolSize 都设置为用户传入的 nThreads
  2. keepAliveTime 设置为 0L ,代表只要空闲线程闲下来就 马上被终止 ,如果设置为 SECONDS > 0 ,则代表线程空闲下来后等待新任务的最长时间 SECONDS
  3. TimeUnit 代表 keepAliveTime 的单位
  4. workQueue 设置阻塞队列

execute()执行流程

Java并发篇-全面解析Executor框架

curThread 代表核心线程池的当前线程数

  1. 如果 curThread < corPoolSize ,那么创建新线程执行任务

  2. 如果 curThread = corPoolSize ,那么将任务加入阻塞队列当中

  3. 线程池中的线程执行完任务后会空闲下来,然后会 一直从阻塞队列中获取任务执行

使用无界阻塞队列 LinkedBlockingQueue 的后果:

  1. curThread = corPoolSize 后,任务会一直加入到阻塞队列, 严重时阻塞队列的任务数过多会造成GC内存溢出
  2. 使用无界队列,那么参数 maximumPoolSize 是一个无效参数。只有 当任务无法加入到阻塞队列时 满足 curThread < maximumPoolSiize 才会在线程池中创建新的线程执行该任务。
  3. 由于第1点和第2点导致 keepAliveTime 参数无效
  4. 使用无界队列后,任务都可以加入到阻塞队列中,所以不会出现 curThread > maximumPoolSize ,也就 不会出现任务拒绝执行 的情况,不会调用任务拒绝方法。

GC内存溢出示例(个人理解,如果有错,欢迎指出!):

public class Task implements Runnable {    
    private String name;   
    public Task(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        System.out.println(name);
    }
}
public class FixedThreadPoolTest {   
    private static final ExecutorService fixThreadPool = Executors.newFixedThreadPool(10);
    private static int i = 0;
    
    public static void main(String[] args) {
        // 不停地往线程池中提交任务
        for (;i < 100000;i++) {
            fixThreadPool.execute(new Task("任务" + i));
        }
        fixThreadPool.shutdown();
    }
}
复制代码

设置堆内存参数: -Xms2m -Xmx2m

Java并发篇-全面解析Executor框架

抛出OOM异常,原因在于阻塞队列中的任务数过多。

SingleThreadPool详解

Java并发篇-全面解析Executor框架

使用单个 Worker 线程的 Executor 。本质上是 fixedThreadPoolcorePoolSizemaximumPoolSize 都设置为 1 ,此外无其它区别。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
复制代码

参数解释

  1. corePoolSizeMaximumPoolSize 的参数都设置为 1 ,其它参数含义和默认值都和 fixedThreadPool 相同。
  2. 无界阻塞队列对线程池的影响和 fixedThreadPool 相同。

execute()执行流程

Worker

CachedThreadPool详解

Java并发篇-全面解析Executor框架

根据需要创建新线程的线程池

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
复制代码

参数解释

  1. corePoolSize 设置为0,即 corePool 为空
  2. maximumPoolSize 设置为 Integer.MAX_VALUE ,所以 maximumPool 是几乎无界的,可容纳足够多的线程
  3. keepAliveTime 设置为60L,每个空闲线程可以等待新任务的时间是60秒
  4. TimeUnit 单位设置为 SECONDS 代表以秒为单位计算
  5. 阻塞队列采用无界的 SynchronousQueue ,每一次的 offer 操作都要对应一个 poll 操作

execute()执行流程

offer
poll
poll
Java并发篇-全面解析Executor框架

ScheduledThreadPoolExecutor

由于 JDK1.6与JDK1.8中延时队列的实现不同 ,所以在阅读本小节时不要死记硬背其实现,掌握其思想。图的组件均会用中文表示,对思想的掌握有帮助,实现的思想是一致的。

该类用于 指定特定的延迟时间后运行任务,或者定期执行任务Timer 是单线程的,该类是多线程的,而且该类功能更加强大。

运行机制

Java并发篇-全面解析Executor框架

ScheduledThreadPoolExecutor将任务提交到延时队列当中存储,当任务到达执行时间时, Worker 工作线程会拉取任务进行执行,空闲的线程会一直阻塞,不断尝试获取延时队列中的任务。

ScheduledThreadPoolExecutor 本质上是 ThreadPoolExecutor 的改造:

  1. 使用延时队列作为任务队列
  2. 获取任务的方式不同,空闲线程将一直阻塞获取任务
  3. 执行周期任务后,增加了额外的处理操作

实现方法

调用线程把待调度的任务( ScheduledFutureTask )放入一个延时队列当中, ScheduledFutureTask 的主要成员变量有三个:

  1. time :该任务要被执行的具体时间
  2. sequenceNumber :该任务被添加到 ScheduledThreadPoolExecutor 的序号,标记任务添加到队列的时刻
  3. period :任务执行的间隔周期

延时队列中采用优先级队列进行排序。排序时, time 越小排在越前面, time 相同时, sequenceNumber 越小排在越前面。

ScheduledThreadPoolExecutor 执行任务的流程如下图:

Java并发篇-全面解析Executor框架
time
time

FutureTask

表示异步计算的结果,实现了Runnable、Future接口。

FutureTask的几种状态

  1. 未启动。FutureTask没有执行run()方法之前处于未启动状态
  2. 已启动。FutureTask.run()方法被执行的过程中
  3. 已完成。FutureTask.run()方法执行完后正常结束、或被取消、或执行方法时抛出异常而异常结束

FutureTask的状态转换图如下所示

Java并发篇-全面解析Executor框架

FutureTask的核心方法

FutureTask.run()

当任务处于未启动状态时,调用FutureTask.run()会使任务执行。

FutureTask.get()

FutureTask处于 未启动或已启动 状态时,执行FutureTask.get()会导致 线程阻塞

当FutureTask处于 已完成 状态时,调用FutureTask.get()会立即 返回结果或者抛出异常

FutureTask.cancel()

当任务处于 未启动状态 时,调用FutureTask.cancel()会导致此任务 永远不会被执行

当任务处于 已启动状态 时,执行FutureTask.cancel( true )会 中断任务 ,执行FutureTask.cancel( false )方法将不会对正在执行此任务的线程产生任何影响;

当FutureTask处于 完成状态 时,执行FutureTask.cancel(...)方法将返回 false

get()和cancel()的状态转换图:

Java并发篇-全面解析Executor框架

FutureTask的使用

当一个线程需要等待另一个线程把某个任务执行完后才能继续执行,可以使用FutureTask。这种用法并不是很常见,只要知道有这种用法就可以了。

总结

这一章主要和上一章的线程池有很大联系,阅读到这里非常不容易,收获也应该很多,留下几道面试题来检验自己是否真的掌握了这些内容吧。

  1. Executor框架主要有哪些成员?
  2. 任务执行框架主要有哪几种线程池?
  3. 线程池有哪些参数?它们的含义是什么?
  4. 每种线程池的工作流程是如何的?简单介绍一下(越详细越好,使用了什么队列、线程池大小······)
  5. 任务执行完成后返回的结果如何接收?
  6. FutureTask有哪几种状态?有哪些核心方法?调用方法会导致状态如何变化?(灵魂三问)

如果这篇文章能给你带来一点点的小收获,你的一个小小的点赞我会开心一整天!感谢你的阅读!

原文  https://juejin.im/post/5e9e62a4f265da47e64930d8
正文到此结束
Loading...