多线程的设计方法确实可以最大限度的发挥多核处理器的计算能力,提高吞吐量和性能。但是如果不加控制随意使用线程,对系统的性能反而会产生不利。
和进程相比,线程虽然是一种轻量级的,但是创建和关闭依然需要花费时间,如果每一个小的任务都创建一个线程,则会很有可能出现创建和销毁线程占用的时间大于该线程任务所消耗的时间。其次线程本身也是需要占用内存空间的,大量的线程会抢占宝贵的内存资源。
因此线程的使用需要掌握一个度,再有限的范围内增加线程的数量可以提高系统的性能,一旦超过这个范围,大量的线程只会拖垮整个系统。
为了避免系统频繁的创建和销毁线程,我们可以让创建的线程复用。我们可以使用一个线程池维护一些线程,当你需要使用线程的时候,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这些线程退回到线程池中,方便下次使用。
简而言之,再使用线程池后,创建线程编程了从线程池中获得空闲线程,关闭线程变为想线程池归还线程。
线程池的成员都在java.util.concurrent包中,是JDK并发包的核心。其中ThreadPoolExecutor表示一个线程池。Executors类则是一个线程工厂的角色,通过Executors可以取得一个拥有特定功能的线程池,通过Executors可以取得一个特定功能的线程池。
该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有则新的任务会暂存在一个任务队列中,待有线程空闲时,便处理任务队列中的队列。
该方法返回一个只有一个线程的线程池。若有多余的任务被提交到线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有的线程都在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口上扩展了在给定时间执行某任务的功能,如在某个固定的延时后执行,或者周期性执行某个任务。
该方法会返回一个ScheduledExecutorService对象,但该线程池可以执行线程数量。
public class ThreadPoolThread { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId()); try{ Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { MyTask myTask = new MyTask(); ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0;i< 10 ;i++){ executorService.submit(myTask); } } } } 复制代码
1562554721820:Thread ID: 12 1562554721820:Thread ID: 15 1562554721820:Thread ID: 16 1562554721820:Thread ID: 13 1562554721820:Thread ID: 14 1562554722821:Thread ID: 15 1562554722821:Thread ID: 16 1562554722821:Thread ID: 12 1562554722821:Thread ID: 13 1562554722821:Thread ID: 14 复制代码
newScheduledThreadPool()方法返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。主要方法如下
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit); 复制代码
和其他线程不同,ScheduledExecutorService不一定会立即安排执行任务。他其实是起到了计划任务的作用,会在指定的时间对任务进行调度。
schedule()会在给定时间对任务进行一次调度。scheduleAtFixedRate()和scheduleWithFixedDelay()方法会对任务进行周期性调度,但是二者还是有区别的。scheduleAtFixedRate()方法的任务调度频率是一定的,它是以上一个任务开始执行的时间为起点,再在规定的时间调度下一次任务。而scheduleWithFixedDelay()方法是以上一个任务的结束后再经过规定时间进行任务调度。
public class ScheduleExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } } 复制代码
1562555518798 1562555520798 1562555522798 1562555524799 1562555526800 复制代码
可以看出任务每两秒被调度一次。
将代码修改为8秒
Thread.sleep(8000); 复制代码
1562555680333 1562555688333 1562555696333 1562555704333 复制代码
对于几个核心的线程池,虽然看着创建的线程池有着不同的功能特点,但是其内部都是使用了ThreadPoolExecutor类。
看几个线程池的创建源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); } 复制代码
可以看出他们都是ThreadPoolExecutor类的封装,来看一下ThreadPoolExecutor类的构造:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 复制代码
参数workQueue指被提交但未执行的任务队列,他是一个BlockingQueue接口的对象,仅用于存放Runnable对象。在ThreadPoolExecutor构造中可以使用一下几种BlockingQueue接口:
拒绝策略是当任务数量超过系统实际承载能力时执行的策略。拒绝策略可以说时系统超负荷运行时的补救措施。
JDK内置了四种拒绝策略:
上面的策略都实现了RejectedExecutionHandler接口,如果以上策略无法满足实际开发,可以自己扩展。
RejectedExecutionHandler接口构造:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } 复制代码
自定义拒绝策略:
//拒绝策略demo public class RejectThreadPoolDemo { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(System.currentTimeMillis() + ": Thread ID : " + Thread.currentThread().getId()); try{ Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask myTask = new MyTask(); ThreadPoolExecutor es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.privilegedThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "被拒绝"); } }); for (int i = 0;i<Integer.MAX_VALUE;i++){ es.submit(myTask); Thread.sleep(10); } } } 复制代码
1562575292467: Thread ID : 14 1562575292478: Thread ID : 15 1562575292489: Thread ID : 16 java.util.concurrent.FutureTask@b4c966a被拒绝 java.util.concurrent.FutureTask@2f4d3709被拒绝 java.util.concurrent.FutureTask@4e50df2e被拒绝 复制代码
ThreadFactory是一个接口,他只有一个用来创建线程的方法。
Thread newThread(Runnable r); 复制代码
通过自定义线程创建我们可以跟踪线程池在何时创建了多少线程,自定义线程名等。
public class ThreadFactoryDemo { static volatile int i = 0; public static class TestTask implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); } } public static void main(String[] args) { TestTask testTask = new TestTask(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r,"test--" + i); i++; return thread; } }); for (int i = 0;i<5;i++){ threadPoolExecutor.submit(testTask); } } } 复制代码
test--0 test--1 test--4 test--2 test--3 复制代码
虽然JDK已经帮我们实现了稳定的线程池,但如果我们想要对线程池进行一些扩展,比如监控任务执行的开始和结束时间怎么办呢。
ThreadPoolExecutor是一个可以扩展的线程池,它提供了beforExecutor(),afterExecutor()和terminated()三个接口来对其进行扩展。
public class ThreadFactoryDemo { static volatile int i = 0; public static class TestTask implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); } } public static void main(String[] args) { TestTask testTask = new TestTask(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r,"test--" + i); i++; return thread; } }){ @Override protected void beforeExecute(Thread t,Runnable r){ System.out.println("task-----准备执行"); } }; for (int i = 0;i<5;i++){ threadPoolExecutor.submit(testTask); } } } 复制代码
task-----准备执行 task-----准备执行 test--2 task-----准备执行 test--1 task-----准备执行 test--4 task-----准备执行 test--3 test--0 复制代码
execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也即是提交后如果线程运行后,和主线程就脱离了关系了,当然可以设置一些变量来获取到线程的运行结果。并且当线程的执行过程中抛出了异常通常来说主线程也无法获取到异常的信息的,只有通过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常。
sumbit()方法有三种形式:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } 复制代码
sumbit方法会返回一个Future对象,这个Future对象代表这线程的执行结果,当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。如果在线程的执行过程中发生了异常,get会获取到异常的信息。