JSR,全称 Java Specification Requests, 即Java规范提案, 主要是用于向JCP(Java Community Process)提出新增标准化技术规范的正式请求。每次JAVA版本更新都会有对应的JSR更新,比如在Java 8版本中,其新特性Lambda表达式对应的是 JSR 335 ,新的日期和时间API对应的是 JSR 310 。
当然,本文的关注点仅仅是 JSR 166 ,它是一个关于Java并发编程的规范提案,在JDK中,该规范由java.util.concurrent包实现,是在JDK 5.0的时候被引入的;
另外JDK6引入Deques、Navigable collections,对应的是JSR 166x,JDK7引入fork-join框架,用于并行执行任务,对应的是JSR 166y。
即java.util.concurrent的缩写,该包参考自EDU.oswego.cs.dl.util.concurrent,是JSR 166标准规范的一个实现;
那么,JSR 166以及J.U.C包的作者是谁呢,没错,就是Doug Lea大神,挺牛逼的,大神级别任务,贴张照片膜拜下。。。
简单的说,是一个任务的执行和调度框架(线程池),将任务的提交过程和执行过程分离开来,涉及的类如下图所示:
其中,最顶层是Executor接口,它的定义很简单,一个用于执行任务的execute方法,如下所示:
public interface Executor { void execute(Runnable command); }
另外,我们还可以看到一个Executors类,它是一个工具类(有点类似集合框架的Collections类),用于创建 ExecutorService
、 ScheduledExecutorService
、 ThreadFactory
和 Callable对象。
定义好任务(如Callable对象),把它提交给ExecutorService(如线程池)去执行,得到Future对象,然后调用Future的get方法等待执行结果即可。
ExecutorService executor = Executors.newCachedThreadPool();//创建线程池 Task task = new Task(); //创建Callable任务 Future<Integer> result = executor.submit(task);//提交任务给线程池执行 result.get();//等待执行结果; 可以传入等待时间参数,指定时间内没返回的话,直接结束
实现Callable接口或Runnable接口的类,其实例就可以成为一个任务提交给ExecutorService去执行;
其中Callable任务可以返回执行结果,Runnable任务无返回结果;
线程池提供了一种限制和管理资源(线程)的方式,每个线程池还维护了一些基本的统计信息,例如:已完成的任务数量
1.降低资源消耗 : 重复利用已创建的线程降低了线程的新建,销毁造成的消耗
2.提高响应速度 : 任务需要执行时,不需要等待线程的创建,提高了响应速度
3.提高线程的可管理性 : 线程是稀缺资源,如果无限制创建,不仅会消耗系统资源而且还会降低系统的稳定性,使用线程池可以对多个线程进行统一分配,调优和监控
方式一:通过ThreadPoolExecutor的构造方法实现
1.corePoolSize(线程池的基本大小): 线程池基本的线程创建数量, 任务数<基本大小,即便有现成可用空闲线程也会创建新线程执行,大于等于时才利用空闲线程
2.maximumPoolSize(线程池最大数量) : 有界队列中,无空闲线程执行任务,且线程数已到达基本大小 时任务会被放置于任务队列之中,当任务队列已满时,任务仍然无线程可执行,就会新创建线程,而此参数即是限制此线程数量
3.keepAliveTime(线程活动保持时间) : 线程空闲时能够继续存活的时间
4.TimeUnit(线程活动保持时间的单位) :单位有天,时,分,秒,毫秒,微秒,纳秒
5. workQueue(任务队列) : 用以保存等待线程执行的任务的阻塞队列
6.threadFactory(创建线程的线程工厂) : 可以通过自定义线程工厂来自定义池中的线程
7. RejectExecutionHandler(饱和策略) : 任务队列和线程池都满的情况下,仍有任务需要执行时采取的应对策略,默认AbortPolicy直接抛出异常
饱和策略:
1. AbortPolicy:直接抛出异常。
2. CallerRunsPolicy:只用调用者所在线程来运行任务。
3. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
4. DiscardPolicy:不处理,丢弃掉。
当然,也可以根据应用场景需要来实现RejectedExecutionHandler 接口自定义 策略。如记录日志或持久化存储不能处理的任务。
方式二:通过Executor框架的工具类Executors来实现
《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 创建线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor :允许请求的队列长度Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
1.线程池中线程执行任务的性质: 任务为IO密集型还是计算密集型,IO型主要消耗在IO流的等待中,cpu使用率较低,自然线程数量设置较少,同样计算密集型较多
2.CPU使用率:可观察CPU使用率来判定线程数量,并不是越多越好
当线程数量大时,第一线程的频繁创建会消耗大量的资源.第二,线程数量多,并发处理任务会出现任务集中在很短的时间内全部完成,任务不会在任务队列中阻塞,而完成之后CPU便处于空闲状态,此时 CPU的使用率会呈现锯齿状态,短时间飙高,而正确的使用率应该处于一个平缓的状态,让部分任务阻塞在队列之中等待执行,既复用了线程,又使得CPU使用率呈现平缓状态
3.内存使用率: 线程数量和队列大小都会影响内存使用率,队列大小应该通过计算任务的数量进行合理设置
4.下游系统抗并发能力: 考虑下游业务逻辑代码承受线程并发的能力
方式一:首先定义任务集合,然后定义Future集合用于存放执行结果,执行任务,最后遍历Future集合获取结果;
方式二:首先定义任务集合,通过CompletionService包装ExecutorService,执行任务,然后调用其take()方法去取Future对象
这里稍微解释下,在方式一中,从集合中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住,所以后面的任务即使已完成也不能得到结果;而方式二中,CompletionService的实现是维护一个保存Future对象的BlockingQueue,只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,所以调用take()1能从阻塞队列中拿到最新的已完成任务的结果;
AQS框架是J.U.C中实现锁及同步机制的基础,其底层是通过调用 LockSupport .unpark()和 LockSupport .park()实现线程的阻塞和唤醒。
AbstractQueuedSynchronizer是一个抽象类,主要是维护了一个int类型的state属性和一个非阻塞、先进先出的线程等待队列;其中state是用volatile修饰的,保证线程之间的可见性,队列的入队和出对操作都是无锁操作,基于自旋锁和CAS实现;另外AQS分为两种模式:独占模式和共享模式,像ReentrantLock是基于独占模式模式实现的,CountDownLatch、CyclicBarrier等是基于共享模式。
非公平锁的lock方法的实现:
final void lock() { //CAS操作,如果State为0(表示当前没有其它线程占有该锁),则将它设置为1 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
首先是不管先后顺序,直接尝试获取锁(非公平的体现),成功的话,直接独占访问;
如果获取锁失败,则调用AQS的acquire方法,在该方法内部会调用tryAcquire方法再次尝试获取锁以及是否可重入判断,如果失败,则挂起当前线程并加入到等待队列;
具体可查看ReentrantLock.NonfairSync类和AbstractQueuedSynchronizer类对应的源码。
ASQ是抽象的同步器队列,其中的重点是并发同步器执行所需要的三个重要组成部分