转载

Java多线程之Callable接口

Callable和Runnbale一样代表着任务,区别在于Callable有返回值并且可以抛出异常。

1. 创建线程的三种方式:

  • 继承Thread,重写run方法
  • 实现Runnable接口,重新run方法
  • 实现Callable接口,重写call方法

2. Callable接口实际上是属于Executor框架中的功能类,Callable接口与Runnable接口的功能类似,但提供了比Runnable更加强大的功能。

  • Callable可以在任务结束的时候提供一个返回值,Runnable无法提供这个功能
  • Callable的call方法分可以抛出异常,而Runnable的run方法不能抛出异常。

Callable和Future出现的原因

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。

这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable和Future介绍

Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。

java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

package cn.iigrowing.threads.study.Callable;

import java.util.concurrent.Callable;

import java.util.concurrent.FutureTask;

/*

* 一、创建执行线程的方式三:实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。

*

* 二、执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 FutureTask 是 Future 接口的实现类

*/

public class TestCallable {

public static void main(String[] args) {

ThreadDemo td = new ThreadDemo();

// 1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。

FutureTask<Integer> result = new FutureTask<Integer>(td);

new Thread(result).start();

// 2.接收线程运算后的结果

try {

Integer sum = result.get(); // FutureTask 可用于 闭锁

// 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的

System.out.println(sum);

System.out.println(“————————————“);

} catch (Exception e) {

e.printStackTrace();

}

}

}

class ThreadDemo implements Callable<Integer> {

// @Override

public Integer call() throws Exception {

int sum = 0;

for (int i = 0; i <= 100000; i++) {

sum += i;

}

return sum;

}

}

综上例子可以看到: Callable 和 Future接口的区别

(1)Callable规定的方法是call(),而Runnable规定的方法是run().

(2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。

(3)call()方法可抛出异常,而run()方法是不能抛出异常的。

(4)运行Callable任务可拿到一个Future对象, Future表示异步计算的结果。

它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。

通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。

Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。

———–

Callable异步执行,应该不会陌生,那么在java中是怎么用的呢?又是如何实现的?下面我们循序渐进,慢慢分析。

先看一个例子,实现Callable接口,进行异步计算:

package com.demo;

import java.util.concurrent.*;

public class Demo {

public static void main(String[] args) throws ExecutionException, InterruptedException {

ExecutorService executor = Executors.newCachedThreadPool();

Future<String> future = executor.submit(new Callable<String>() {

@Override

public String call() throws Exception {

System.out.println(“call”);

TimeUnit.SECONDS.sleep(1);

return “str”;

}

});

System.out.println(future.get());

}

}

这段代码是很简单的一种方式利用Callable进行异步操作,结果自己可以执行下。

如何实现异步

在不阻塞当前线程的情况下计算,那么必然需要另外的线程去执行具体的业务逻辑,上面代码中可以看到,是把Callable放入了线程池中,等待执行,并且立刻返回futrue。可以猜想下,需要从Future中得到Callable的结果,那么Future的引用必然会被两个线程共享,一个线程执行完成后改变Future的状态位并唤醒挂起在get上的线程,到底是不是这样呢?

首先我们从任务提交开始,在AbstractExecutorService中的源码如下:

public <T> Future<T> submit(Callable<T> task) {

if (task == null) throw new NullPointerException();

RunnableFuture<T> ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

return new FutureTask<T>(runnable, value);

}

public FutureTask(Callable<V> callable) {

if (callable == null)

throw new NullPointerException();

this.callable = callable;

this.state = NEW; // ensure visibility of callable

}

可以看到Callable任务被包装成了RunnableFuture对象,通过了线程池的execute方法提交任务并且立刻返回对象本身,而线程池是接受Runnable,必然RunnableFuture继承了Runnable,我们看下其继承结构。

这里写图片描述

从继承中可以清楚的看到,FutureTask是Runnable和Future的综合。

到这里我们应该有些头绪了,关键点应该在FutureTask对象上,线程池不过是提供一个线程运行FutureTask中的run方法罢了。

Java多线程之Callable接口

FutureTask

从上面的分析,FutureTask被生产者和消费者共享,生产者运行run方法计算结果,消费者通过get方法获取结果,那么必然就需要通知,如何通知呢,肯定是状态位变化,并唤醒线程。

FutureTask状态

//FutureTask类中用来保存状态的变量,下面常量就是具体的状态表示

private volatile int state;

private static final int NEW = 0;

private static final int COMPLETING = 1;

private static final int NORMAL = 2;

private static final int EXCEPTIONAL = 3;

private static final int CANCELLED = 4;

private static final int INTERRUPTING = 5;

private static final int INTERRUPTED = 6;

run方法

//我修剪后的代码,可以看出其逻辑,执行Callable的call方法获取结果

public void run() {

Callable<V> c = callable;

if (c != null && state == NEW) {

V result = c.call();

set(result);

}

}

//把结果保存到属性字段中,finishCompletion是最后的操作,唤醒等待结果的线程

protected void set(V v) {

if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

outcome = v;

UNSAFE.putOrderedInt(this, stateOffset, NORMAL);//正常结束设置状态为NORMAL

finishCompletion();

}

}

//waiters是FutureTask类的等待线程包装类,以链表的形式连接多个,WaitNode对象是在调用get方法时生成,并挂起get的调用者线程

private void finishCompletion() {

for (WaitNode q; (q = waiters) != null;) {

if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

for (;;) {

Thread t = q.thread;

if (t != null) {

q.thread = null;

LockSupport.unpark(t); //唤醒get上等待的线程

if (next == null)

break;

}

break;

}

}

}

等待的线程

为了清除的看到如何挂起get的线程,我们可以分析下get的源码

public V get() throws InterruptedException, ExecutionException {

int s = state;

if (s <= COMPLETING)

s = awaitDone(false, 0L); //会一直挂起知道处理业务的线程完成,唤醒等待线程

return report(s);

}

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//通过判断返回的状态是否为已完成做不同的处理

throw new TimeoutException();

return report(s);

}

private int awaitDone(boolean timed, long nanos)

throws InterruptedException {

final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;

boolean queued = false;

for (;;) {

int s = state;

if (s > COMPLETING) {

if (q != null)

q.thread = null;

return s;

}

else if (s == COMPLETING) // cannot time out yet

Thread.yield();

else if (q == null)

q = new WaitNode();

else if (!queued)

queued = UNSAFE.compareAndSwapObject(this, waitersOffset,

q.next = waiters, q);

else if (timed) { //如果是超时的get那么会挂起一段时间

nanos = deadline – System.nanoTime();

if (nanos <= 0L) {//等待时间过后则会移除等待线程返回当前futureTask状态

removeWaiter(q);

return state;

}

LockSupport.parkNanos(this, nanos);

}

else

LockSupport.park(this);

}

}

如果想搞明白可以自行研究下,这种经过优化的并发代码确实可读性差,基本原理就是生产者与消费者模型。

——————–

package cn.iigrowing.threads.study.Callable;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

public class CallableAndFuture {

static class MyThread implements Callable<String> {

// @Override

public String call() throws Exception {

System.out

.println(“——————-CallableAndFuture -MyThread “);

return “Hello world”;

}

}

static class MyThread2 implements Runnable {

// @Override

public void run() {

System.out

.println(“–!!!!!!!!!!!!!! CallableAndFuture -MyThread2 “);

}

}

public static void main(String[] args) {

ExecutorService threadPool = Executors.newSingleThreadExecutor();

Future<String> future = threadPool.submit(new MyThread());

threadPool.submit(new MyThread2());

try {

System.out.println(future.get());

} catch (Exception e) {

} finally {

threadPool.shutdown();

}

}

}

——–

Callable与Runnable

java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:

public interface Runnable {

public abstract void run();

}

由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

public interface Callable<V> {

/**

* Computes a result, or throws an exception if unable to do so.

*

* @return computed result

* @throws Exception if unable to compute a result

*/

V call() throws Exception;

}

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?

一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

第一个submit方法里面的参数类型就是Callable。

暂时只需要知道Callable一般是和ExecutorService配合来使用的,具体的使用方法讲在后面讲述。

一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> {

boolean cancel(boolean mayInterruptIfRunning);

boolean isCancelled();

boolean isDone();

V get() throws InterruptedException, ExecutionException;

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

}

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

isDone方法表示任务是否已经完成,若任务完成,则返回true;

get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说Future提供了三种功能:

1)判断任务是否完成;

2)能够中断任务;

3)能够获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask实现了RunnableFuture接口,这个接口的定义如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {

void run();

}

可以看到这个接口实现了Runnable和Future接口,接口中的具体实现由FutureTask来实现。这个类的两个构造方法如下 :

public FutureTask(Callable<V> callable) {

if (callable == null)

throw new NullPointerException();

sync = new Sync(callable);

}

public FutureTask(Runnable runnable, V result) {

sync = new Sync(Executors.callable(runnable, result));

}

如上提供了两个构造函数,一个以Callable为参数,另外一个以Runnable为参数。这些类之间的关联对于任务建模的办法非常灵活,允许你基于FutureTask的Runnable特性(因为它实现了Runnable接口),把任务写成Callable,然后封装进一个由执行者调度并在必要时可以取消的FutureTask。

FutureTask可以由执行者调度,这一点很关键。它对外提供的方法基本上就是Future和Runnable接口的组合:get()、cancel、isDone()、isCancelled()和run(),而run()方法通常都是由执行者调用,我们基本上不需要直接调用它。

一个FutureTask的例子

public class MyCallable implements Callable<String> {

private long waitTime;

public MyCallable(int timeInMillis){

this.waitTime=timeInMillis;

}

@Override

public String call() throws Exception {

Thread.sleep(waitTime);

//return the thread name executing this callable task

return Thread.currentThread().getName();

}

}

public class FutureTaskExample {

public static void main(String[] args) {

MyCallable callable1 = new MyCallable(1000); // 要执行的任务

MyCallable callable2 = new MyCallable(2000);

FutureTask<String> futureTask1 = new FutureTask<String>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象

FutureTask<String> futureTask2 = new FutureTask<String>(callable2);

ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池并返回ExecutorService实例

executor.execute(futureTask1); // 执行任务

executor.execute(futureTask2);

while (true) {

try {

if(futureTask1.isDone() && futureTask2.isDone()){// 两个任务都完成

System.out.println(“Done”);

executor.shutdown(); // 关闭线程池和服务

return;

}

if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成

System.out.println(“FutureTask1 output=”+futureTask1.get());

}

System.out.println(“Waiting for FutureTask2 to complete”);

String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);

if(s !=null){

System.out.println(“FutureTask2 output=”+s);

}

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}catch(TimeoutException e){

//do nothing

}

}

}

}

运行如上程序后,可以看到一段时间内没有输出,因为get()方法等待任务执行完成然后才输出内容.

输出结果如下:

FutureTask1 output=pool-1-thread-1

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

Waiting for FutureTask2 to complete

FutureTask2 output=pool-1-thread-2

Done

原文  http://www.iigrowing.cn/java_duo_xian_cheng_zhi_callable_jie_kou.html
正文到此结束
Loading...