转载

Java 并发编程(二):线程池总结

概述

常规 new Thread 创建线程问题:

  • Thread线程 属于一个重量级的对象,通过 new Thread 创建一个线程,首先它是一个 Java对象 ,需要分配堆空间资源;同时 Thread 需要调用操作 系统内核API ,在 系统层面创建一个线程与此对应 ,操作系统还要为线程分配一系列资源。所以,创建线程的成本是很高的,应该 避免频繁创建和销毁线程

  • 另外,线程缺乏统一管理,可能无限制新建线程,相互之间竞争加剧,以及可能占用过多系统资源导致服务器宕机或 OOM 等。

如何去解决这个问题,就是采用经常使用到的 资源池 方案,比如数据库连接池等,将资源提前初始化后放入到池中进行管理,待需要使用时从池中获取一个空闲资源,使用完后再将资源放回到池中达到释放目的,这样其它任务就可以继续重复使用该资源,避免资源被不停创建、销毁。

由于 Thread API 在接口设计上的问题,线程池和一般的资源池在使用上是有些差异的,比如连接池:从连接池获取可用连接 --> 使用连接执行任务 --> 将连接放入到连接池。如果我们从线程池中获取到一个 Thread 对象,根本没法处理我们的任务,因为 Thread 线程在启动之前要么 重写 run() 、要么 传入 Runnable 方式将任务和 Thread 绑定在一起。所以, Java线程池 是没有提供 申请线程释放线程 的方法,而是采用一种 生产者/消费者模式 去构建线程池执行机制。

Java 线程 Thread 是被一对一映射到本地操作系统线程,即 Java 启动时会创建一个本地操作系统线程,当 Java 线程终止时,对应操作系统线程会被回收。

Executor体系

Java 5 之前,仅仅只能使用 ThreadRunnableThreadLocalsynchronized 等进行多线程开发,线程的使用及其简陋; Java 5 极大的改善并发编程,构建出了多线程开发 API 的基础体系,这些类主要位于 java.util.concurrent 包下,简称 J.U.C

Java 并发编程(二):线程池总结

Executor 就是 J.U.C 中比较重要的一块,用于构建多线程开发中使用最普遍的线程池:

  • Executor 【接口】:最顶层接口,该接口只定义了一个方法: void execute(Runnable command)
  • ExecutorService
    Executor
    Callable
    Future
    
  • ScheduledExecutorService 【接口】:继承了 ExecutorService ,增加了定时任务相关的方法
  • ThreadPoolExecutor 【实现类】: ExecutorService 的默认实现
  • ScheduledThreadPoolExecutor
    ThreadPoolExecutor
    ScheduledExecutorService
    ScheduledThreadPoolExecutor
    

ExecutorService

Java 并发编程(二):线程池总结

方法描述:

  • shutdown() :优雅关闭线程池,之前提交的任务将被执行,包括当前正在执行的和等待队列中的任务,但是线程池不会再接收新任务,提交新任务会抛出异常

  • shutdownNow() :调用 shutdownNow 方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断, 该方法会立刻返回,返回值为这时候队列里面被丢弃的任务列表。

  • isShutdown() :如果此线程池关闭,则返回true

  • isTerminated() :如果关闭后所有任务都已完成,则返回true

  • awaitTermination() :当线程调用 awaitTermination 方法后,当前线程会被阻塞,直到线程池状态变为 TERMINATED 才返回, 或者等待时间超时才返回、或当前线程被中断

  • submit() 系列方法:

    • public Future<?> submit(Runnable task)
      Runnable
      Runnable
      submit
      Future
      Future.get()
      Runnable
      Future.get()
      null
      
    • public <T> Future<T> submit(Runnable task, T result)
      submit(Runnable task)
      Runnable
      Future.get()
      
    • public <T> Future<T> submit(Callable<T> task) :提交一个 Callable 任务,该任务是带有返回结果的
  • invokeAll() 系列方法:

    • invokeAll(Collection<? extends Callable<T>> tasks) :执行给定任务集合,执行完毕后返回结果
    • invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
      Future.get()
      CancellationException
      
  • invokeAny() 系列方法:参照 invokeAll() 方法

    • T invokeAny(Collection<? extends Callable<T>> tasks) :执行给定的任务集合,任意一个执行成功则返回结果,其他任务终止
    • T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) :执行给定的任务集合,任意一个执行成功或超时,则返回结果,其他任务终止

注意: invoke 方法是阻塞方法,即提交的任务执行完或超时才会返回结果,而 submit 系列方法是会立即返回 Future 结果。区分一个方法是不是阻塞方法可以通过方法签名是否抛出 InterruptedException

public Future<?> submit(Runnable task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException

ScheduledExecutorService

创建并执行一个一次性任务,可以指定延迟时间:

schedule(Runnable command, long delay, TimeUnit unit)

schedule(Callable<V> callable, long delay, TimeUnit unit)

创建并执行一个周期性任务:

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :周期是以两个任务开始时间间隔为周期,初始延迟时间后会触发第一次执行,执行过程中发生异常,那么任务就停止,一次任务执行时长超过周期时间,那下一次任务会等到该任务执行结束后,立即执行。

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :周期是以上一个任务结束到下一个任务开始时间间隔为周期,指的是 以固定的延时 执行, initialDelay 初次延迟时间, delay 指的是上一次执行终止和下一次执行开始之间的延迟。

线程池状态

  • RUNNING :接收新任务并处理排队任务;
  • SHUTDOWN :不接收新任务,但处理排队任务,调用 shutdown() 会处于该状态;
  • STOP :不接收新任务,也不处理排队任务,并中断正在运行的任务,调用 shutdownNow() 会处于该状态;
  • TIDYING
    workerCount
    TIDYING
    terminate()
    
  • TERMINATEDterminate() 运行完成;

参见 ThreadPoolExecutor

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态具体可参加线程池终止一节中源码分析。

线程池模型

Java 并发编程(二):线程池总结

线程池中主要有:

  • AtomicInteger ctl :二进制高3位表示线程池状态,后29位记录工作线程数量,所以线程池所能够支持的最大线程数: 2^29 - 1 = 536870911

线程池状态高3位表示:

RUNNING    = -1 << COUNT_BITS;//111
SHUTDOWN   =  0 << COUNT_BITS;//000
STOP       =  1 << COUNT_BITS;//010
TIDYING    =  2 << COUNT_BITS;//100
TERMINATED =  3 << COUNT_BITS;//110
  • Worker
    Worker
    Worker
    Thread
    Worker
    firstTask
    Worker
    getTask
    

ThreadPoolExecutor 定义:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) 

核心参数:

  • corePoolSize :核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
  • maximumPoolSize :最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量 maxPoolSize
  • keepAliveTime
    工作线程数 > 核心线程数
    线程空闲超时
    allowCoreThreadTimeOut(true)
    工作线程数
    核心线程数
    

等待队列 workQueue

  • 有界队列 ArrayBlockingQueue :等待队列有固定大小;
  • LinkedBlockingQueue
    corePoolSize
    corePoolSize
    maximumPoolSize
    corePoolSize
    
  • SynchronousQueue
    Executors
    newCachedThreadPool()
    

拒绝策略 handler :如若使用拒绝策略,等待队列一定要设置成有界队列才行;若等待队列已满,则在总线程数不大于 maximumPoolSize 的前提下,创建新的线程;若线程数大于 maximumPoolSize ,则执行拒绝策略

  • AbortPolicy :拒绝任务,并且抛出异常

  • CallerRunsPolicy :只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

  • DiscardOldestPolicy :丢弃最老的一个请求,然后把当前任务重新加入到队列中

  • DiscardPolicy :丢弃无法处理的任务,没有异常信息

  • 如果需要自定义拒绝策略可以实现 RejectedExecutionHandler 接口

增减线程时机:

  • 如果 工作线程数 < corePoolSize ,即使其它工作线程处于空闲状态,也会创建一个新线程来运行新任务;
  • 如果 工作线程数 >= corePoolSize ,但 工作线程数 < maximumPoolSize ,则提交的任务会被放入到等待队列中;
  • 如果 等待队列已满 ,并且 工作线程数 < maxPoolSize ,则创建一个新线程来运行新任务,这个机制可能很多人都会搞错,这里就容易出现之前提交的任务还在等待队列中阻塞,但是新提交任务却被执行情况,千万注意;
  • 如果队列已满,并且 工作线程数 >= maxPoolSize ,这时提交的新任务采用拒绝策略处理;
  • 等待队列
    corePoolSize
    LinkedBlockingQueue
    等待队列
    工作线程数
    corePoolSize
    maximumPoolSize
    
  • 线程池有个线程超时机制,超时的线程会被销毁回收,详见 keepAliveTime 参数;
  • shutdownshutdownNow 方法终止线程池时,最终会把所有的工作线程都销毁,具体参见后续源码分析;

Executors

类似于 CollectionCollections 关系, ExecutorsExecutor 的工具类,提供了一些创建线程池方法,主要如下:

  • ExecutorService newFixedThreadPool(int nThreads) :创建一个固定大小、任务队列容量无界的线程池;
  • ExecutorService newCachedThreadPool()
    Integer.MAX_VALUE
    SynchronousQueue
    
  • ExecutorService newSingleThreadExecutor() :只有一个线程来执行无界任务队列的单一线程池,该线程池确保任务按照加入的顺序一个一个依次执行,当唯一的线程因任务异常终止时,将创建一个新的线程来继续执行后续的任务;
  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize) :能定时执行任务的线程池,线程池中核心线程数由参数指定,最大线程数= Integer.MAX_VALUE

FixedThreadPoolSingleThreadExecutorQueueLinkedBlockingQueue ,因为固定线程,线程满的话需要等待队列去缓存任务;

CachedThreadPool 使用的 QueueSynchronousQueue ,不需要缓存任务,因为最大线程数没有限制;

ScheduledThreadPool 来说,它使用的是延迟队列, DelayedWorkQueue

JDK1.8 新加入了一种线程池: workStealingPool

Executors 工具类方式创建线程池不建议在生产开发中直接使用,因为创建出来的线程池比较粗糙,或多或少都存在一些问题,一般只在测试用例中使用,阿里开发手册中也是规定不能在代码中直接通过 Executors 方式创建线程池。

CompletionService

ExecutorService + Future 在处理批量提交异步任务并获取结果时可能会存在一些问题,如下:

public void executorServiceTest() throws InterruptedException {
    List<Future<Integer>> futures = new ArrayList<>();
    for(int i=0;i<10;i++){
        Future<Integer> future = executor.submit(new Task());
        futures.add(future);
    }
    //Future.get获取处理结果时,按照顺序获取
    for (int i = 0, size = futures.size(); i < size; i++) {
        Future<Integer> f = futures.get(i);
        try {
            Integer ret = f.get();
            log.info("ret:{}", ret);
        } catch (InterruptedException | ExecutionException e) {
        }
    }
}

class Task implements Callable<Integer>{
    private Random random = new Random();

    @Override
    public Integer call() throws Exception {
        int v = random.nextInt(500);
        TimeUnit.MICROSECONDS.sleep(v);
        return v;
    }
}

如上述案例:通过 executor.submit() 批量提交 10个任务 ,然后获取这些任务的结果,只能顺序通过 Future.get() 获取,因为无法知道哪些任务先完成,这就造成即使有些任务先完成,由于前面任务没有完成依然被阻塞。

CompletionServiceExecutorService 进行包装,将结果集 按照完成时间的顺序放入到阻塞队列中,获取结果时只需要从阻塞队列中获取即可,即实现:先完成的任务先获取结果

Java 并发编程(二):线程池总结

CompletionService 逻辑如上图,其实现代码也很简单:

1、将任务封装成 FutureTask ,然后提交到内部线程池中执行任务;

2、 FutureTask#done() 方法会在任务被完成时进行回调的方法, CompletionService 就是重写该方法,在 FutureTask 任务完成时添加到阻塞队列中去,这样就可以从阻塞队列中获取已完成的任务,见下:

protected void done() { completionQueue.add(task); }

TipsExecutorCompletionService 类中使用 LinkedBlockingQueue 无界队列存储结果集,所以,一定要及时去取结果,不然完成的任务结果不停的堆积到阻塞队列中,可能会撑爆内存空间。

总结

在生产开发中建议更多的使用线程池技术,除去 性能 方面考虑外,从 软件设计 上线程池比之前介绍的创建线程方式更好:

首先,线程池更好的体现了把任务单元与执行机制分离开的思想,开发者更多关注的是任务单元,只需要把需要执行的任务封装到 RunnableCallable 中,通过 submit()invoke() 等方式提交给线程池;而多线程如何创建、运行等执行机制是由 Executor 框架提供,一般情况下对于开发者是不需要关心的。

另一点 Thread ( JDK1.0 就存在)可能是由于出现的比较早,本身在接口设计上有那么点不是很完善的地方,比如我们的任务封装到 Runnable 中需要和 Thread 绑定完成后才能使用 start 启动线程执行任务。执行完成这个线程就废弃了,而不能通过重新绑定 Runnable 实现执行新任务,这就导致 Thread 类接口设计上就将任务和执行很紧密的耦合在一起了,即使 RunnableCallable 接口的出现在代码层面将任务逻辑代码部分剥离出来,但是在执行时依然需要先绑定才能启动线程,而不能做到像线程池在运行时有新任务随时提交。

Java 并发编程(二):线程池总结

原文  https://mp.weixin.qq.com/s/7wnhs7GPp8UFTFENWZaQAQ
正文到此结束
Loading...