转载

Java™ 教程(执行器)

执行器

在前面的所有示例中,由新的线程(由其 Runnable 对象定义)和线程本身(由 Thread 对象定义)完成的任务之间存在紧密的联系,这适用于小型应用程序,但在大型应用程序中,将线程管理和创建与应用程序的其余部分分开是有意义的,封装这些函数的对象称为执行器,以下小节详细描述了执行器。

  • 执行器接口定义三个执行器对象类型。
  • 线程池是最常见的执行器实现类型。
  • Fork/Join是一个利用多个处理器的框架(JDK 7中的新增功能)。

执行器接口

java.util.concurrent 包定义了三个执行器接口:

  • Executor ,一个支持启动新任务的简单接口。
  • ExecutorServiceExecutor 的子接口,它添加了有助于管理生命周期的功能,包括单个任务和执行器本身。
  • ScheduledExecutorServiceExecutorService 的子接口,支持将来和/或定期执行任务。

通常,引用执行器对象的变量被声明为这三种接口类型之一,而不是执行器类类型。

Executor接口

Executor 接口提供单个方法 execute ,旨在成为常见线程创建语法的替代方法,如果 rRunnable 对象,并且 eExecutor 对象,则可以替换

(new Thread(r)).start();

e.execute(r);

但是, execute 的定义不太具体,低级别语法创建一个新线程并立即启动它,根据 Executor 实现, execute 可能会做同样的事情,但更有可能使用现有的工作线程来运行 r ,或者将 r 放在队列中以等待工作线程变为可用(我们将在线程池的部分中描述工作线程)。

java.util.concurrent 中的执行器实现旨在充分利用更高级的 ExecutorServiceScheduledExecutorService 接口,尽管它们也可以与基本 Executor 接口一起使用。

ExecutorService接口

ExecutorService 接口使用类似但更通用的 submit 方法补充 execute ,与 execute 一样, submit 接受 Runnable 对象,但也接受 Callable 对象,这允许任务返回一个值。 submit 方法返回一个 Future 对象,该对象用于检索 Callable 返回值并管理 CallableRunnable 任务的状态。

ExecutorService 还提供了提交大量 Callable 对象的方法,最后, ExecutorService 提供了许多用于管理执行器关闭的方法,为了支持立即关闭,任务应该正确处理中断。

ScheduledExecutorService接口

ScheduledExecutorService 接口使用 schedule 补充其父级 ExecutorService 的方法,在指定的延迟后执行 RunnableCallable 任务,此外,接口定义了 scheduleAtFixedRatescheduleWithFixedDelay ,它们以定义的间隔重复执行指定的任务。

线程池

java.util.concurrent 中的大多数执行器实现都使用由工作线程组成的线程池,这种线程与它执行的 RunnableCallable 任务分开存在,通常用于执行多个任务。

使用工作线程可以最小化由于创建线程而带来的开销,线程对象使用大量内存,在大型应用程序中,分配和释放许多线程对象会产生大量的内存管理开销。

一种常见类型的线程池是固定线程池,这种类型的池始终具有指定数量的线程,如果一个线程在它仍在使用时以某种方式被终止,它将自动被一个新线程替换,任务通过内部队列提交到池中,当活动任务多于线程时,该队列将保存额外的任务。

固定线程池的一个重要优点是使用它的应用程序可以优雅地降级,要理解这一点,请考虑一个Web服务器应用程序,其中每个HTTP请求都由一个单独的线程处理。如果应用程序只是为每个新的HTTP请求创建一个新线程,并且系统接收的请求数量超过了可以立即处理的数量,当所有这些线程的开销超过系统容量时,应用程序将突然停止响应所有请求。由于可以创建的线程数量有限制,应用程序不会像HTTP请求进入时那样快地为它们提供服务,而是以系统能够承受的最快速度为它们提供服务。

创建使用固定线程池的执行器的一种简单方法是在 java.util.concurrent.Executors 中调用 newFixedThreadPool 工厂方法,该类还提供以下工厂方法:

  • newCachedThreadPool 方法使用可扩展线程池创建执行器,此执行器适用于启动许多短期任务的应用程序。
  • newSingleThreadExecutor 方法创建一次执行单个任务的执行器。
  • 有几个工厂方法是上述执行器的 ScheduledExecutorService 版本。

如果上述工厂方法提供的执行器均无法满足你的需求,构造 java.util.concurrent.ThreadPoolExecutor 或 java.util.concurrent.ScheduledThreadPoolExecutor 的实例将为你提供额外选项。

Fork/Join

fork/join框架是 ExecutorService 接口的一个实现,可帮助你利用多个处理器,它专为可以递归分解成小块的工作而设计,目标是使用所有可用的处理能力来增强应用程序的性能。

与任何 ExecutorService 实现一样,fork/join框架将任务分配给线程池中的工作线程,fork/join框架是不同的,因为它使用了工作窃取算法,没有事情可做的工作线程可以从仍然忙碌的其他线程中窃取任务。

fork/join框架的中心是 ForkJoinPool 类,它是 AbstractExecutorService 类的扩展, ForkJoinPool 实现了核心工作窃取算法,可以执行 ForkJoinTask 进程。

基础用法

使用fork/join框架的第一步是编写执行工作片段的代码,你的代码应类似于以下伪代码:

if (我的工作部分足够小)
  直接做这项工作
else
  把我的工作分成两块
  调用这两块并等待结果

将此代码包装在 ForkJoinTask 子类中,通常使用其更专业的类型之一, RecursiveTask (可以返回结果)或 RecursiveAction 。

ForkJoinTask 子类准备就绪后,创建表示要完成的所有工作的对象,并将其传递给 ForkJoinPool 实例的 invoke() 方法。

模糊清晰度

为了帮助你了解fork/join框架的工作原理,请考虑以下示例,假设你想模糊图像,原始源图像由整数数组表示,其中每个整数包含单个像素的颜色值,模糊的目标图像也由与源相同大小的整数数组表示。

通过一次一个像素地处理源数组来完成模糊,将每个像素与其周围像素进行平均(对红色、绿色和蓝色组件进行平均),并将结果放置在目标数组中,由于图像是大型数组,因此此过程可能需要很长时间,通过使用fork/join框架实现的算法,你可以利用多处理器系统上的并发处理,这是一个可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

现在,你实现抽象的 compute() 方法,该方法可以直接执行模糊或将其拆分为两个较小的任务,简单的数组长度阈值有助于确定是执行还是拆分工作。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

如果以前的方法在 RecursiveAction 类的子类中,那么将任务设置为在 ForkJoinPool 中运行是很简单的,涉及以下步骤:

  1. 创建一个代表要完成的所有工作的任务。

    // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. 创建将运行任务的 ForkJoinPool

    ForkJoinPool pool = new ForkJoinPool();
  3. 运行任务。

    pool.invoke(fb);

有关完整源代码(包括创建目标图像文件的一些额外代码),请参阅 ForkBlur 示例。

标准实现

除了使用fork/join框架来实现在多处理器系统上同时执行任务的自定义算法(例如 ForkBlur.java 示例),Java SE中已经使用fork/join框架实现了一些通常有用的功能,在Java SE 8中引入的一种这样的实现被 java.util.Arrays 类用于其 parallelSort() 方法,这些方法类似于 sort() ,但通过fork/join框架利用并发性。在多处理器系统上运行时,大型数组的并行排序比顺序排序更快,但是,这些方法如何利用fork/join框架超出了Java教程的范围,有关此信息,请参阅Java API文档。

fork/join框架的另一个实现由 java.util.streams 包中的方法使用,这是 Project Lambda 计划用于Java SE 8版本的一部分,有关更多信息,请参阅Lambda表达式部分。

上一篇:Lock对象

原文  https://segmentfault.com/a/1190000017937812
正文到此结束
Loading...