另外推荐一篇原创: 终极推荐!可能是最适合你的Java学习路线+方法+网站+书籍推荐!
synchronized关键字解决的是多个线程之间访问资源的同步性,synchronized关键字可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。
另外,在 Java 早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的 Mutex Lock 来实现的,Java 的线程是映射到操作系统的原生线程之上的。如果要挂起或者唤醒一个线程,都需要操作系统帮忙完成,而操作系统实现线程之间的切换时需要从用户态转换到内核态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是为什么早期的 synchronized 效率低的原因。庆幸的是在 Java 6 之后 Java 官方对从 JVM 层面对synchronized 较大优化,所以现在的 synchronized 锁效率也优化得很不错了。JDK1.6对锁的实现引入了大量的优化,如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。
总结:synchronized 关键字加到 static 静态方法和 synchronized(class)代码块上都是是给 Class 类上锁。synchronized 关键字加到实例方法上是给对象实例上锁。尽量不要使用 synchronized(String a) 因为JVM中,字符串常量池具有缓存功能!
下面我以一个常见的面试题为例讲解一下 synchronized 关键字的具体使用。
面试中面试官经常会说:“单例模式了解吗?来给我手写一下!给我解释一下双重检验锁方式实现单例模式的原理呗!”
public class Singleton { private volatile static Singleton uniqueInstance; private Singleton() { } public static Singleton getUniqueInstance() { //先判断对象是否已经实例过,没有实例化过才进入加锁代码 if (uniqueInstance == null) { //类对象加锁 synchronized (Singleton.class) { if (uniqueInstance == null) { uniqueInstance = new Singleton(); } } } return uniqueInstance; } }
另外,需要注意 uniqueInstance 采用 volatile 关键字修饰也是很有必要。
uniqueInstance 采用 volatile 关键字修饰也是很有必要的, uniqueInstance = new Singleton(); 这段代码其实是分为三步执行:
但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getUniqueInstance() 后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化。
使用 volatile 可以禁止 JVM 的指令重排,保证在多线程环境下也能正常运行。
public class SynchronizedDemo { public void method() { synchronized (this) { System.out.println("synchronized 代码块"); } } }
通过 JDK 自带的 javap 命令查看 SynchronizedDemo 类的相关字节码信息:首先切换到类的对应目录执行 javac SynchronizedDemo.java
命令生成编译后的 .class 文件,然后执行 javap -c -s -v -l SynchronizedDemo.class
。
从上面我们可以看出:
synchronized 同步语句块的实现使用的是 monitorenter 和 monitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。当执行 monitorenter 指令时,线程试图获取锁也就是获取 monitor(monitor对象存在于每个Java对象的对象头中,synchronized 锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因) 的持有权。当计数器为0则可以成功获取,获取后将锁计数器设为1也就是加1。相应的在执行 monitorexit 指令后,将锁计数器设为0,表明锁被释放。如果获取对象锁失败,那当前线程就要阻塞等待,直到锁被另外一个线程释放为止。
public class SynchronizedDemo2 { public synchronized void method() { System.out.println("synchronized 方法"); } }
synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法,JVM 通过该 ACC_SYNCHRONIZED 访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。
JDK1.6 对锁的实现引入了大量的优化,如偏向锁、轻量级锁、自旋锁、适应性自旋锁、锁消除、锁粗化等技术来减少锁操作的开销。
锁主要存在四种状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,他们会随着竞争的激烈而逐渐升级。注意锁可以升级不可降级,这种策略是为了提高获得锁和释放锁的效率。
关于这几种优化的详细信息可以查看笔主的这篇文章: https://gitee.com/SnailClimb/...
两者都是可重入锁。“可重入锁”概念是:自己可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果不可锁重入的话,就会造成死锁。同一个线程每次获取锁,锁的计数器都自增1,所以要等到锁的计数器下降为0时才能释放锁。
synchronized 是依赖于 JVM 实现的,前面我们也讲到了 虚拟机团队在 JDK1.6 为 synchronized 关键字进行了很多优化,但是这些优化都是在虚拟机层面实现的,并没有直接暴露给我们。ReentrantLock 是 JDK 层面实现的(也就是 API 层面,需要 lock() 和 unlock() 方法配合 try/finally 语句块来完成),所以我们可以通过查看它的源代码,来看它是如何实现的。
相比synchronized,ReentrantLock增加了一些高级功能。主要来说主要有三点: ①等待可中断;②可实现公平锁;③可实现选择性通知(锁可以绑定多个条件)
ReentrantLock(boolean fair)
构造方法来制定是否是公平的。 如果你想使用上述功能,那么选择ReentrantLock是一个不错的选择。
在 JDK1.2 之前,Java的内存模型实现总是从 主存 (即共享内存)读取变量,是不需要进行特别的注意的。而在当前的 Java 内存模型下,线程可以把变量保存 本地内存 (比如机器的寄存器)中,而不是直接在主存中进行读写。这就可能造成一个线程在主存中修改了一个变量的值,而另外一个线程还继续使用它在寄存器中的变量值的拷贝,造成 数据的不一致 。
要解决这个问题,就需要把变量声明为 volatile ,这就指示 JVM,这个变量是不稳定的,每次使用它都到主存中进行读取。
说白了, volatile 关键字的主要作用就是保证变量的可见性然后还有一个作用是防止指令重排序。
synchronized关键字和volatile关键字比较
通常情况下,我们创建的变量是可以被任何一个线程访问并修改的。 如果想实现每一个线程都有自己的专属本地变量该如何解决呢? JDK中提供的 ThreadLocal
类正是为了解决这样的问题。 ThreadLocal
类主要解决的就是让每个线程绑定自己的值,可以将 ThreadLocal
类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。
ThreadLocal
变量,那么访问这个变量的每个线程都会有这个变量的本地副本,这也是 ThreadLocal
变量名的由来。他们可以使用 get()
和 set()
方法来获取默认值或将其值更改为当前线程所存的副本的值,从而避免了线程安全问题。 再举个简单的例子:
比如有两个人去宝屋收集宝物,这两个共用一个袋子的话肯定会产生争执,但是给他们两个人每个人分配一个袋子的话就不会出现这样的问题。如果把这两个人比作线程的话,那么ThreadLocal就是用来避免这两个线程竞争的。
相信看了上面的解释,大家已经搞懂 ThreadLocal 类是个什么东西了。
import java.text.SimpleDateFormat; import java.util.Random; public class ThreadLocalExample implements Runnable{ // SimpleDateFormat 不是线程安全的,所以每个线程都要有自己独立的副本 private static final ThreadLocal<SimpleDateFormat> formatter = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMdd HHmm")); public static void main(String[] args) throws InterruptedException { ThreadLocalExample obj = new ThreadLocalExample(); for(int i=0 ; i<10; i++){ Thread t = new Thread(obj, ""+i); Thread.sleep(new Random().nextInt(1000)); t.start(); } } @Override public void run() { System.out.println("Thread Name= "+Thread.currentThread().getName()+" default Formatter = "+formatter.get().toPattern()); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } //formatter pattern is changed here by thread, but it won't reflect to other threads formatter.set(new SimpleDateFormat()); System.out.println("Thread Name= "+Thread.currentThread().getName()+" formatter = "+formatter.get().toPattern()); } }
Output:
Thread Name= 0 default Formatter = yyyyMMdd HHmm Thread Name= 0 formatter = yy-M-d ah:mm Thread Name= 1 default Formatter = yyyyMMdd HHmm Thread Name= 2 default Formatter = yyyyMMdd HHmm Thread Name= 1 formatter = yy-M-d ah:mm Thread Name= 3 default Formatter = yyyyMMdd HHmm Thread Name= 2 formatter = yy-M-d ah:mm Thread Name= 4 default Formatter = yyyyMMdd HHmm Thread Name= 3 formatter = yy-M-d ah:mm Thread Name= 4 formatter = yy-M-d ah:mm Thread Name= 5 default Formatter = yyyyMMdd HHmm Thread Name= 5 formatter = yy-M-d ah:mm Thread Name= 6 default Formatter = yyyyMMdd HHmm Thread Name= 6 formatter = yy-M-d ah:mm Thread Name= 7 default Formatter = yyyyMMdd HHmm Thread Name= 7 formatter = yy-M-d ah:mm Thread Name= 8 default Formatter = yyyyMMdd HHmm Thread Name= 9 default Formatter = yyyyMMdd HHmm Thread Name= 8 formatter = yy-M-d ah:mm Thread Name= 9 formatter = yy-M-d ah:mm
从输出中可以看出,Thread-0已经改变了formatter的值,但仍然是thread-2默认格式化程序与初始化值相同,其他线程也一样。
上面有一段代码用到了创建 ThreadLocal
变量的那段代码用到了 Java8 的知识,它等于下面这段代码,如果你写了下面这段代码的话,IDEA会提示你转换为Java8的格式(IDEA真的不错!)。因为ThreadLocal类在Java 8中扩展,使用一个新的方法 withInitial()
,将Supplier功能接口作为参数。
private static final ThreadLocal<SimpleDateFormat> formatter = new ThreadLocal<SimpleDateFormat>(){ @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyyMMdd HHmm"); } };
从 Thread
类源代码入手。
public class Thread implements Runnable { ...... //与此线程有关的ThreadLocal值。由ThreadLocal类维护 ThreadLocal.ThreadLocalMap threadLocals = null; //与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护 ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; ...... }
从上面 Thread
类 源代码可以看出 Thread
类中有一个 threadLocals
和 一个 inheritableThreadLocals
变量,它们都是 ThreadLocalMap
类型的变量,我们可以把 ThreadLocalMap
理解为 ThreadLocal
类实现的定制化的 HashMap
。默认情况下这两个变量都是null,只有当前线程调用 ThreadLocal
类的 set
或 get
方法时才创建它们,实际上调用这两个方法的时候,我们调用的是 ThreadLocalMap
类对应的 get()
、 set()
方法。
ThreadLocal
类的 set()
方法
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
通过上面这些内容,我们足以通过猜测得出结论: 最终的变量是放在了当前线程的 ThreadLocalMap
中,并不是存在 ThreadLocal
上, ThreadLocal
可以理解为只是 ThreadLocalMap
的封装,传递了变量值。 ThrealLocal
类中可以通过 Thread.currentThread()
获取到当前线程对象后,直接通过 getMap(Thread t)
可以访问到该线程的 ThreadLocalMap
对象。
每个 Thread
中都具备一个 ThreadLocalMap
,而 ThreadLocalMap
可以存储以 ThreadLocal
为key的键值对。 比如我们在同一个线程中声明了两个 ThreadLocal
对象的话,会使用 Thread
内部都是使用仅有那个 ThreadLocalMap
存放数据的, ThreadLocalMap
的 key 就是 ThreadLocal
对象,value 就是 ThreadLocal
对象调用 set
方法设置的值。 ThreadLocal
是 map结构是为了让每个线程可以关联多个 ThreadLocal
变量。这也就解释了 ThreadLocal 声明的变量为什么在每一个线程都有自己的专属本地变量。
ThreadLocalMap
是 ThreadLocal
的静态内部类。
ThreadLocalMap
中使用的 key 为 ThreadLocal
的弱引用,而 value 是强引用。所以,如果 ThreadLocal
没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。这样一来, ThreadLocalMap
中就会出现key为null的Entry。假如我们不做任何措施的话,value 永远无法被GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap实现中已经考虑了这种情况,在调用 set()
、 get()
、 remove()
方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal
方法后 最好手动调用 remove()
方法
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它 所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象。
弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java虚拟机就会把这个弱引用加入到与之关联的引用队列中。
池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供了一种限制和管理资源(包括执行一个任务)。 每个 线程池 还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下 使用线程池的好处 :
Runnable
自Java 1.0以来一直存在,但 Callable
仅在Java 1.5中引入,目的就是为了来处理 Runnable
不支持的用例。 Runnable
接口 不会返回结果或抛出检查异常,但是 Callable
接口 可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable
接口 ,这样代码看起来会更加简洁。
工具类 Executors
可以实现 Runnable
对象和 Callable
对象之间的相互转换。( Executors.callable(Runnable task
)或 Executors.callable(Runnable task,Object resule)
)。
Runnable.java
@FunctionalInterface public interface Runnable { /** * 被线程执行,没有返回值也无法抛出异常 */ public abstract void run(); }
Callable.java
@FunctionalInterface public interface Callable<V> { /** * 计算结果,或在无法这样做时抛出异常。 * @return 计算得出的结果 * @throws 如果无法计算结果,则抛出异常 */ V call() throws Exception; }
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否; submit()
方法用于提交需要返回值的任务。线程池会返回一个 Future
类型的对象,通过这个 Future
对象可以判断任务是否执行成功 ,并且可以通过 Future
的 get()
方法来获取返回值, get()
方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。 我们以 AbstractExecutorService
接口中的一个 submit
方法为例子来看看源代码:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
上面方法调用的 newTaskFor
方法返回了一个 FutureTask
对象。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
我们再来看看 execute()
方法:
public void execute(Runnable command) { ... }
《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下:
方式一:通过构造方法实现
方式二:通过Executor 框架的工具类Executors来实现
我们可以创建三种类型的ThreadPoolExecutor:
对应Executors工具类中的方法如图所示:
ThreadPoolExecutor
类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
/** * 用给定的初始参数创建一个新的ThreadPoolExecutor。 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
ThreadPoolExecutor
构造函数重要参数分析 ThreadPoolExecutor
3 个最重要的参数: corePoolSize
: 核心线程数线程数定义了最小可以同时运行的线程数量。 maximumPoolSize
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。 workQueue
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。 ThreadPoolExecutor
其他常见参数:
keepAliveTime
:当线程池中的线程数量大于 corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
才会被回收销毁; unit
: keepAliveTime
参数的时间单位。 threadFactory
:executor 创建新线程的时候会用到。 handler
:饱和策略。关于饱和策略下面单独介绍一下。 ThreadPoolExecutor
饱和策略 ThreadPoolExecutor
饱和策略定义: 如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时, ThreadPoolTaskExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出 RejectedExecutionException
来拒绝新任务的处理。 ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。 ThreadPoolExecutor.DiscardPolicy
: 不处理新任务,直接丢弃掉。 ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求。 举个例子: Spring 通过 ThreadPoolTaskExecutor
或者我们直接通过 ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler
饱和策略的话来配置线程池的时候默认使用的是 ThreadPoolExecutor.AbortPolicy
。在默认情况下, ThreadPoolExecutor
将抛出 RejectedExecutionException
来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy
。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看 ThreadPoolExecutor
的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了)
Runnable
+ ThreadPoolExecutor
为了让大家更清楚上面的面试题中的一些概念,我写了一个简单的线程池 Demo。
首先创建一个 Runnable
接口的实现类(当然也可以是 Callable
接口,我们上面也说了两者的区别。)
MyRunnable.java
import java.util.Date; /** * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。 * @author shuang.kou */ public class MyRunnable implements Runnable { private String command; public MyRunnable(String s) { this.command = s; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date()); processCommand(); System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date()); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return this.command; } }
编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor
构造函数自定义参数的方式来创建线程池。
ThreadPoolExecutorDemo.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorDemo { private static final int CORE_POOL_SIZE = 5; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) Runnable worker = new MyRunnable("" + i); //执行Runnable executor.execute(worker); } //终止线程池 executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads"); } }
可以看到我们上面的代码指定了:
corePoolSize
: 核心线程数为 5。 maximumPoolSize
:最大线程数 10 keepAliveTime
: 等待时间为 1L。 unit
: 等待时间的单位为 TimeUnit.SECONDS。 workQueue
:任务队列为 ArrayBlockingQueue
,并且容量为 100; handler
:饱和策略为 CallerRunsPolicy
。 pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019 pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019 pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019 pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019
承接 4.6 节,我们通过代码输出结果可以看出: 线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)
现在,我们就分析上面的输出内容来简单分析一下线程池原理。
为了搞懂线程池的原理,我们需要首先分析一下 execute
方法。 在 4.6 节中的 Demo 中我们使用 executor.execute(worker)
来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & CAPACITY; } private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.get(); // 下面会涉及到 3 步 操作 // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里 // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 if (!isRunning(recheck) && remove(command)) reject(command); // 如果当前线程池为空就新创建一个线程并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addWorker(command, false)) reject(command); }
通过下图可以更好的对上面这 3 步做一个展示,下图是我为了省事直接从网上找到,原地址不明。
现在,让我们在回到 4.6 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?
没搞懂的话,也没关系,可以看看我的分析:
我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务之行完成后,才会之行剩下的 5 个任务。
Atomic 翻译成中文是原子的意思。在化学上,我们知道原子是构成一般物质的最小单位,在化学反应中是不可分割的。在我们这里 Atomic 是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其他线程干扰。
所以,所谓原子类说简单点就是具有原子/原子操作特征的类。
并发包 java.util.concurrent
的原子类都存放在 java.util.concurrent.atomic
下,如下图所示。
使用原子的方式更新基本类型
使用原子的方式更新数组里的某个元素
public final int get() //获取当前的值 public final int getAndSet(int newValue)//获取当前的值,并设置新的值 public final int getAndIncrement()//获取当前的值,并自增 public final int getAndDecrement() //获取当前的值,并自减 public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update) public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
使用 AtomicInteger 之后,不用对 increment() 方法加锁也可以保证线程安全。
class AtomicIntegerTest { private AtomicInteger count = new AtomicInteger(); //使用AtomicInteger之后,不需要对该方法加锁,也可以实现线程安全。 public void increment() { count.incrementAndGet(); } public int getCount() { return count.get(); } }
AtomicInteger 线程安全原理简单分析
AtomicInteger 类的部分源码:
// setup to use Unsafe.compareAndSwapInt for updates(更新操作时提供“比较并替换”的作用) private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
AtomicInteger 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。
CAS的原理是拿期望的值和原本的一个值作比较,如果相同则更新成新的值。UnSafe 类的 objectFieldOffset() 方法是一个本地方法,这个方法是用来拿到“原来的值”的内存地址,返回值是 valueOffset。另外 value 是一个volatile变量,在内存中可见,因此 JVM 可以保证任何时刻任何线程总能拿到该变量的最新值。
关于 Atomic 原子类这部分更多内容可以查看我的这篇文章:并发编程面试必备: JUC 中的 Atomic 原子类总结
AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
AQS 原理这部分参考了部分博客,在5.2节末尾放了链接。
在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于AQS原理的理解”。下面给大家一个示例供大家参加,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲出来而不是背出来。
下面大部分内容其实在AQS类注释上已经给出了,不过是英语看着比较吃力一点,感兴趣的话可以看看源码。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
看个AQS(AbstractQueuedSynchronizer)原理图:
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过protected类型的getState,setState,compareAndSetState进行操作
//返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { state = newState; } //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值) protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。 tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。 tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。 tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
默认情况下,每个方法都抛出 UnsupportedOperationException
。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现 tryAcquire-tryRelease
、 tryAcquireShared-tryReleaseShared
中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如 ReentrantReadWriteLock
。
推荐两篇 AQS 原理和相关源码分析的文章:
作者的其他开源项目推荐: