线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
这里是7个参数(我们在开发中用的更多的是5个参数的构造方法),OK,那我们来看看这里七个参数的含义:
corePoolSize 线程池中核心线程的数量
maximumPoolSize 线程池中最大线程数量
keepAliveTime 非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也表示核心线程的超时时长
unit 第三个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等
workQueue 线程池中的任务队列,该队列主要用来存储已经被提交但是尚未执行的任务。存储在这里的任务是由ThreadPoolExecutor的execute方法提交来的。
threadFactory 为线程池提供创建新线程的功能,这个我们一般使用默认即可
handler 拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。
通过构造方法可以看出:
maximumPoolSize < corePoolSize的时候会抛出异常;
maximumPoolSize(最大线程数) = corePoolSize(核心线程数) + noCorePoolSize(非核心线程数);
当currentSize<corePoolSize时,没什么好说的,直接启动一个核心线程并执行任务。
当currentSize>=corePoolSize、并且workQueue未满时,添加进来的任务会被安排到workQueue中等待执行。
当workQueue已满,但是currentSize<maximumPoolSize时,会立即开
启一个非核心线程来执行任务。
一个线程从被提交(submit)到执行共经历以下流程:
线程池在执行execute方法时,主要有以下四种情况
线程池采取上述的流程进行设计是为了减少获取全局锁的次数。在线程池完成预热(当前运行的线程数大于或等于corePoolSize)之后,几乎所有的excute方法调用都执行步骤2。
Executors工厂类可以创建四种类型的线程池,通过Executors.newXXX即可创建。
public static ExecutorService newFixedThreadPool(int nThreads){ return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
public class MyThreadextends Thread{ @Override public void run(){ super.run(); System.out.println(new Date().toString()+" "+Thread.currentThread().getName()+" MyThread is Running"); } }
Thread thread1 = new MyThread(); Thread thread2 = new MyThread(); Thread thread3 = new MyThread(); Thread thread4 = new MyThread(); Thread thread5 = new MyThread(); ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute(thread1); pool.execute(thread2); pool.execute(thread3); pool.execute(thread4); pool.execute(thread5);
运行结果:(1/2运行的顺序是无序的)
Wed Mar 13 15:37:38 CST 2019 pool-1-thread-2 MyThread is Running Wed Mar 13 15:37:38 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:37:38 CST 2019 pool-1-thread-2 MyThread is Running Wed Mar 13 15:37:38 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:37:38 CST 2019 pool-1-thread-2 MyThread is Running
public static ExecutorService newCachedThreadPool(){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>()); }
Thread thread1 = new MyThread(); Thread thread2 = new MyThread(); Thread thread3 = new MyThread(); Thread thread4 = new MyThread(); Thread thread5 = new MyThread(); ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(thread1); pool.execute(thread2); pool.execute(thread3); pool.execute(thread4); pool.execute(thread5);
运行结果:
Wed Mar 13 15:40:03 CST 2019 pool-1-thread-4 MyThread is Running Wed Mar 13 15:40:03 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:40:03 CST 2019 pool-1-thread-2 MyThread is Running Wed Mar 13 15:40:03 CST 2019 pool-1-thread-3 MyThread is Running Wed Mar 13 15:40:03 CST 2019 pool-1-thread-5 MyThread is Running
public static ExecutorService newSingleThreadExecutor(){ return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
Thread thread1 = new MyThread(); Thread thread2 = new MyThread(); Thread thread3 = new MyThread(); Thread thread4 = new MyThread(); Thread thread5 = new MyThread(); ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(thread1); pool.execute(thread2); pool.execute(thread3); pool.execute(thread4); pool.execute(thread5);
运行结果:
Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running Wed Mar 13 15:40:58 CST 2019 pool-1-thread-1 MyThread is Running
它用来处理延时任务或定时任务。
SchduledFutureTask接收的参数:
它采用DelayQueue存储等待的任务
工作线程的执行过程:
public class MyThreadextends Thread{ private String name; public MyThread(String name){ this.name = name; } @Override public void run(){ super.run(); System.out.println(new Date().toString()+" "+Thread.currentThread().getName()+" "+name); } }
Thread thread1 = new MyThread("MyThread1"); Thread thread2 = new MyThread("MyThread2"); ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(2); exec.scheduleAtFixedRate(thread1, 0, 3000, TimeUnit.MILLISECONDS); exec.scheduleAtFixedRate(thread2, 0, 2000, TimeUnit.MILLISECONDS);
运行结果:
Wed Mar 13 15:44:07 CST 2019 pool-1-thread-2 MyThread2 Wed Mar 13 15:44:07 CST 2019 pool-1-thread-1 MyThread1 Wed Mar 13 15:44:09 CST 2019 pool-1-thread-2 MyThread2 Wed Mar 13 15:44:10 CST 2019 pool-1-thread-1 MyThread1 Wed Mar 13 15:44:11 CST 2019 pool-1-thread-2 MyThread2 Wed Mar 13 15:44:13 CST 2019 pool-1-thread-1 MyThread1 Wed Mar 13 15:44:13 CST 2019 pool-1-thread-2 MyThread2 Wed Mar 13 15:44:15 CST 2019 pool-1-thread-2 MyThread2 Wed Mar 13 15:44:16 CST 2019 pool-1-thread-1 MyThread1 Wed Mar 13 15:44:17 CST 2019 pool-1-thread-2 MyThread2
由于上面的构造方法涉及到了阻塞队列,所以补充一些阻塞队列的知识。
阻塞队列:我的理解是,生产者——消费者,生产者往队列里放元素,消费者取,如果队列里没有元素,消费者线程取则阻塞,如果队列里元素满了,则生产者线程阻塞。
常见的阻塞队列有下列7种:
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。 LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。 PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。 DelayQueue:一个使用优先级队列实现的无界阻塞队列。 SynchronousQueue:一个不存储元素的阻塞队列。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:
当工作队列满了,不同策略的处理方式为:
1.Abort策略:默认策略,新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常 可由 调用者捕获。
2.CallerRuns策略:为调节机制,既不抛弃任务也不抛出异常,而是将某些任务回退到调用者。不会在线程池的线程中执行新的任务,而是在调用exector的线程中运行新的任务。
3.Discard策略:新提交的任务被抛弃。
4.DiscardOldest策略:队列的是“队头”的任务,然后尝试提交新的任务。(不适合工作队列为优先队列场景)
问:shutdown()有什么功能?
答:阻止新来的任务提交,对已经提交了的任务不会产生任何影响。当已经提交的任务执行完后,它会将那些闲置的线程(idleWorks)进行中断,这个过程是异步的。
问:如何阻止新来的任务提交?
答:通过将线程池的状态改成SHUTDOWN,当再将执行execute提交任务时,如果测试到状态不为RUNNING,则抛出rejectedExecution,从而达到阻止新任务提交的目的。
问:为何对提交的任务不产生任何影响?
答:在调用中断任务的方法时,它会检测workers中的任务,如果worker对应的任务没有中断,并且是空闲线程,它才会去中断。另外的话,workQueue中的值,还是按照一定的逻辑顺序不断的往works中进行输送的,这样一来,就可以保证提交的任务按照线程本身的逻辑执行,不受到影响。
问:shutdownNow()有什么功能?
答:阻止新来的任务提交,同时会中断当前正在运行的线程,即workers中的线程。另外它还将workQueue中的任务给移除,并将这些任务添加到列表中进行返回。
问:如何阻止新来的任务提交?
答:通过将线程池的状态改成STOP,当再将执行execute提交任务时,如果测试到状态不为RUNNING,则抛出rejectedExecution,从而达到阻止新任务提交的目的.
问:如果我提交的任务代码块中,正在等待某个资源,而这个资源没到,但此时执行shutdownNow(),会出现什么情况?
答:当执行shutdownNow()方法时,如遇已经激活的任务,并且处于阻塞状态时,shutdownNow()会执行1次中断阻塞的操作,此时对应的线程报InterruptedException,如果后续还要等待某个资源,则按正常逻辑等待某个资源的到达。例如,一个线程正在sleep状态中,此时执行shutdownNow(),它向该线程发起interrupt()请求,而sleep()方法遇到有interrupt()请求时,会抛出InterruptedException(),并继续往下执行。在这里要提醒注意的是,在激活的任务中,如果有多个sleep(),该方法只会中断第一个sleep(),而后面的仍然按照正常的执行逻辑进行。