在执行并发任务时,我们可以把任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程,只要池里有空闲的线程,任务就会分配一个线程执行。在线程池的内部,任务被插入一个阻塞队列(BlockingQueue),线程池里的线程会去取这个队列里的任务。
利用线程池有三个好处:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
创建一个线程池需要的几个参数:
1、ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2、LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
3、SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
4、PriorityBlockingQueue:一个具有优先级得无限阻塞队列。
此外,我们还可以通过调用Ececutors中的某个静态工厂方法来创建一个线程池(它们的内部实现原理都是相同的,仅仅是使用了不同的工作队列或线程池大小):
newFixedThreadPool:创建一个定长的线程池,每当提交一个任务就创建一个线程,直到达到池的最大长度,这时线程池会保持长度不在变化
newCachedThreadPool:创建一个可缓存的线程池,如果当前的线程池的长度超过了处理的需要时,它可以灵活的回收空闲的线程,当需求增加时,它可以灵活的添加新的线程,并不会对池的长度做任何限制
newSingleThreadPool:创建一个单线程化的executor,它只会创建唯一的工作者线程来执行任务
newScheduledThreadPool:创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer
可以使用execute向线程池提交任务:
public class Test2 { public static void main(String[] args) { BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<Runnable>(); ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue); poolExecutor.execute(new Task1()); poolExecutor.execute(new Task2()); poolExecutor.shutdown(); } } class Task1 implements Runnable { public void run() { System.out.println("执行任务1"); } } class Task2 implements Runnable { public void run() { System.out.println("执行任务2"); } }
也可以使用submit方法来提交任务,它会返回一个future,我们可以通过这个future来判断任务是否执行成功,通过future的get方法获取返回值,get方法会阻塞直到任务完成。
public class Test3 { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newCachedThreadPool(); List<Future<String>> resultList = new ArrayList<Future<String>>(); // 创建10个任务并执行 for (int i = 0; i < 10; i++) { // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中 Future<String> future = executorService.submit(new TaskWithResult(i)); resultList.add(future); } for (Future<String> future : resultList) { while (!future.isDone());// Future返回如果没有完成,则一直循环等待,直到Future返回完成 { System.out.println(future.get()); // 打印各个线程(任务)执行的结果 } } executorService.shutdown(); } } class TaskWithResult implements Callable<String> { private int id; public TaskWithResult(int id) { this.id = id; } public String call() throws Exception { return "执行结果"+id; } }
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。
整个ThreadExecutor的任务处理经过下面4个步骤,如下图所示:
1、如果当前的线程数<corePoolSize,提交的Runnable任务,会直接作为new Thread的参数,立即执行,当提交的任务数超过了corePoolSize,就进入第二部操作
2、将当前的任务提交到BlockingQueue阻塞队列中,如果Block Queue是个有界队列,当队列满了之后就进入第三步
3、如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,执行对应的runnable任务
4、如果第三步也无法处理,就会用RejectedExecutionHandler来做拒绝处理
Timer工具管理任务的定时以及周期性执行。示例代码如下:
public class TimerTest { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1=new TimerTask() { @Override public void run() { System.out.println("任务1执行时间:"+sdf.format(new Date())); try { Thread.sleep(3000);//模拟任务1执行时间为3秒 } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }; System.out.println("当前时间:"+sdf.format(new Date())); Timer timer=new Timer(); timer.schedule(timerTask1, new Date(),4000); //间隔4秒周期性执行 } }
执行结果:
可以看到上述任务1以4秒为间隔周期性执行。但是Timer存在一些缺陷,主要是下面两个方面的问题:
缺陷1 : Timer只创建唯一的线程的来执行所有的Timer任务,如果一个time任务的执行很耗时,会导致其他的TimeTask的时效准确性出问题。看下面的例子:
public class TimerTest { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1=new TimerTask() { @Override public void run() { System.out.println("任务1执行时间:"+sdf.format(new Date())); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }; TimerTask timerTask2=new TimerTask() { @Override public void run() { System.out.println("任务2执行时间:"+sdf.format(new Date())); } }; System.out.println("当前时间:"+sdf.format(new Date())); Timer timer=new Timer(); timer.schedule(timerTask1, new Date(),1000); //间隔1秒周期性执行 timer.schedule(timerTask2, new Date(),4000); //间隔4秒周期性执行 } }
执行结果:
缺陷2 :如果TimeTask抛出未检查的异常,Timer将产生无法预料的行为。Timer线程并不捕获线程,所有TimerTask抛出的未检查的异常会终止timer线程。看下面的代码:
public class TimerTest2 { final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1=new TimerTask() { @Override public void run() { System.out.println("任务1执行时间:"+sdf.format(new Date())); throw new RuntimeException(); } }; TimerTask timerTask2=new TimerTask() { @Override public void run() { System.out.println("任务2执行时间:"+sdf.format(new Date())); } }; System.out.println("当前时间:"+sdf.format(new Date())); Timer timer=new Timer(); timer.schedule(timerTask1, new Date(),1000); //周期1秒执行任务1 timer.schedule(timerTask2, new Date() ,3000); //周期3秒执行任务2 } }
执行结果为:
针对上述的两个问题,我们可以使用ScheduledThreadPoolExecutor来作为Timer的替代。
针对问题1,有下面代码:
public class ScheduledThreadPoolExecutorTest { final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1 = new TimerTask() { @Override public void run() { System.out.println("任务1执行时间:" + sdf.format(new Date())); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }; TimerTask timerTask2 = new TimerTask() { @Override public void run() { System.out.println("任务2执行时间:" + sdf.format(new Date())); } }; System.out.println("当前时间:" + sdf.format(new Date())); ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2); poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000,TimeUnit.MILLISECONDS); poolExecutor.scheduleAtFixedRate(timerTask2, 0, 4000,TimeUnit.MILLISECONDS); } }
执行的结果为:
针对问题2,有下面代码:
public class ScheduledThreadPoolExecutorTest2 { final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { TimerTask timerTask1 = new TimerTask() { @Override public void run() { System.out.println("任务1执行时间:" + sdf.format(new Date())); throw new RuntimeException(); } }; TimerTask timerTask2 = new TimerTask() { @Override public void run() { System.out.println("任务2执行时间:" + sdf.format(new Date())); } }; System.out.println("当前时间:" + sdf.format(new Date())); ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2); poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000, TimeUnit.MILLISECONDS); poolExecutor.scheduleAtFixedRate(timerTask2, 0, 2000, TimeUnit.MILLISECONDS); } }
执行结果为:
1、Java并发编程实践