Java1.5加入的JUC并发包,就像一把好用的瑞士军刀,极大的丰富了Java处理并发的手段,但JUC并不简单,有一定的学习成本,我曾经也断断续续看过一些JUC的实现源码,但是既不系统也不够深入,这次决定重新出发,重新拜读大师Doug Lea的神作,所以自己也是抱着以学代练的心态,记录自己的学习心得,难免有理解不到位的地方,大家轻喷哈。
不知道你有没有这样的感觉,在使用JUC中提供的工具类处理并发时,有一种死记硬背的感觉,比如LockSupport应该怎么用,CountDownLatch能干嘛,但并不清楚其实现原理,只知道how不知道why,这种状态有二个比较大的问题。
那要深入,最直接有效的办法就是阅读源码!
我们知道JUC看似有很多类,结构错综复杂,但是如果要从中挑出最重要的一个类,那一定是队列同步器AbstractQueuedSynchronizer, 而AbstractQueuedSynchronizer又是利用LockSupport来控制线程的状态,从而达到线程在等待唤醒之间切换的目的。而我们处理并发,重点就是管理线程的状态,所以理解LockSupport是很重要的一个基础。
先来看一个简单的例子
public static void main(String[] args) { Thread worker = new Thread(() -> { LockSupport.park(); System.out.println("start work"); }); worker.start(); System.out.println("main thread sleep"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } LockSupport.unpark(worker); try { worker.join(); } catch (InterruptedException e) { e.printStackTrace(); } } // 最终控制台输出结果 main thread sleep start work 复制代码
启动一个worker线程,主线程先sleep 500ms,worker线程因为调用了LockSupport的park,会等待,直到主线程sleep结束,调用unpark唤醒worker线程。那么在JUC之前,我们常用的让线程等待的方法如下
Object monitor = new Object(); synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } 复制代码
主要有三点区别
前面只是铺垫,现在来到我们的主菜,解读LockSupport的park和unpark方法,当然还有一些其他类似的重载方法,如parkUntil,parkNanos,它们的大体原理类似,感兴趣大家可以自行查阅源码。
为什么先讲unpark方法,因为unpark代码量少一些,相对简单,柿子先捡软的捏-。-
//java.util.concurrent.locks.LockSupport.java public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } // UNSAFE = sun.misc.Unsafe.getUnsafe(); 复制代码
参数thread是我们要唤醒的目标线程,先判空,然后调用UNSAFE.unpark,UNSAFE是Unsafe对象,不要被这个名字吓到,这个类提供了很多有用的方法,之前的文章也有提到过,比如获取类对象中属性的内存偏移地址,还有 CAS操作等。但是这个Unsafe对象必须使用反射得到然后才能正常使用,因为getUnsafe方法有判断当前类加载器是不是BootStrapClassLoader。我们继续查看Unsafe类unpark的实现。
// Unsafe.java public native void unpark(Object thread); 复制代码
可以看到unpark是一个native方法,它的native实现是在 hotspot/src/share/vm/prims/unsafe.cpp 看下代码实现,
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) UnsafeWrapper("Unsafe_Unpark"); // 声明一个Parker对象p,它是真正干活的对象 Parker* p = NULL; if (jthread != NULL) { // 根据传入的jthread对象,来获取native层的oopDesc*对象,oop是oopDesc* 的宏定义 oop java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { // 获取java_thread对象中_park_event_offset的值,该值就是Parker对象的地址 jlong lp = java_lang_Thread::park_event(java_thread); if (lp != 0) { // 如果地址有效,直接转为Parker指针 p = (Parker*)addr_from_java(lp); } else { // 如果地址无效 MutexLocker mu(Threads_lock); java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { // 转为native层的JavaThread对象 JavaThread* thr = java_lang_Thread::thread(java_thread); if (thr != NULL) { // 将JavaThread的成员变量_parker赋值给p p = thr->parker(); if (p != NULL) { // Bind to Java thread for next time. // 将p的地址赋值给_park_event_offset,下次获取时可用 java_lang_Thread::set_park_event(java_thread, addr_to_java(p)); } } } } } } if (p != NULL) { // 这个USDT2的宏,暂时我也不清楚是干啥的,不过不影响我们的分析,我们先忽略 #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__unpark, p); #else /* USDT2 */ HOTSPOT_THREAD_UNPARK( (uintptr_t) p); #endif /* USDT2 */ // 真正干货的方法,调用了Parker的unpark方法 p->unpark(); } 复制代码
根据上面的代码,我们需要知道二个native层的类,JavaThread类和Parker类
class JavaThread: public Thread { private: JavaThread* _next; // The next thread in the Threads list oop _threadObj; // The Java level thread object // 省略代码... private: Parker* _parker; public: Parker* parker() { return _parker; } // 省略代码... 复制代码
JavaThread类很长,这里只列出几个成员变量,现在只需要知道它是native层的Thread,成员变量_threadObj是Java层的thread对象,通过它native层可以调用Java层的代码。我们继续重点看下Parker类的实现。
class Parker : public os::PlatformParker { private: volatile int _counter ; // 省略代码... } 复制代码
我们重点关注_counter字段,可以简单理解为_counter字段 > 0时,可以通行,即park方法会直接返回,另外park方法返回后,_counter会被赋值为0,unpark方法可以将_counter置为1,并且唤醒当前等待的线程。
可以看到Parker的父类是os::PlatformParker,那这个类又是干嘛的呢?这里先插个题外话, 我们都知道,Java是跨平台的,我们在应用层定义的Thread肯定依赖于具体的平台,不同的平台有不同实现,比如Linux是一套代码,Windows是另外一套,那我们就能理解了,PlatformParker根据平台有不同的实现。在OpenJdk8的实现中支持5个平台
我们知道Linux是现在使用比较广泛的操作系统,比如熟知的Android是基于Linux内核,所以这里我们就挑选Linux来分析吧。对应的文件路径hotspot/src/os/linux/vm/os_linux.cpp
void Parker::unpark() { int s, status ; // 先进入_mutex的临界区,声明如下 // pthread_mutex_t _mutex [1] ; // pthread_cond_t _cond [2] ; status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; s = _counter; // 将_counter置为1 _counter = 1; // s记录的是unpark之前的_counter数,如果s < 1,说明有可能该线程在等待状态,需要唤醒。 if (s < 1) { // thread might be parked // _cur_index代表被使用cond的index if (_cur_index != -1) { // thread is definitely parked // 根据虚拟机参数WorkAroundNPTLTimedWaitHang来做不同的处理,默认该参数是1 if (WorkAroundNPTLTimedWaitHang) { // 先唤醒目标等待的线程 status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); // 释放互斥锁,先唤醒后释放锁,可能会导致线程被唤醒后获取不到锁,再次进入等待状态,我的理解是效率可能会低一丢丢 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { // 先释放锁 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); // 后发信号唤醒线程,唤醒操作在互斥代码块外部,感觉这里可能会有风险,暂时还GET不到。。。 status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); } } else { // 如果线程没有在等待,直接返回 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } else { // 如果线程没有在等待,直接返回 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } 复制代码
代码不多,都加了注释,总体来说就是根据Park类的成员变量_counter来做加锁解锁和唤醒操作,在Linux平台, 加锁用的pthread_mutex_lock,解锁是pthread_mutex_unlock,唤醒是pthread_cond_signal 。接下来解析LockSupport的park方法。
先看下Java层park方法的实现
public static void park() { UNSAFE.park(false, 0L); } 复制代码
Unsafe中的实现
public native void park(boolean var1, long var2); 复制代码
仍然是一个native方法,我们继续跟进去看下
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) // 省略代码... thread->parker()->park(isAbsolute != 0, time); // 省略代码... 复制代码
省略了非关键代码,重点是park方法,这个thread我们前面已经遇到过,就是native层的JavaThread对象,然后调用Parker的park方法,继续跟进去linux平台的os_linux.cpp的实现
void Parker::park(bool isAbsolute, jlong time) { // 先原子的将_counter的值设为0,并返回_counter的原值,如果原值>0说明有通行证,直接返回 if (Atomic::xchg(0, &_counter) > 0) return; Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; // 判断线程是否已经被中断 if (Thread::is_interrupted(thread, false)) { return; } // Next, demultiplex/decode time arguments timespec absTime; // park方法的传参是isAbsolute = false, time = 0,所以会继续往下走 if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } // 这里time为0,如果调用的是parkNanos或者parkUtil,这里time就会>0, if (time > 0) { // 如果time > 0,unpackTime计算absTime的时间 unpackTime(&absTime, isAbsolute, time); } ThreadBlockInVM tbivm(jt); // 再次判断线程是否被中断,如果没有被中断,尝试获得互斥锁,如果获取失败,直接返回 if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; // 如果_counter > 0, 不需要等待,这里再次检查_counter的值 if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // 插入一个写内存屏障,保证可见性,具体实现见下方 OrderAccess::fence(); return; } // 省略assert代码 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); // 设置JavaThread的_suspend_equivalent为true,表示线程被暂停 jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed // 让线程等待_cond[_cur_index]信号,到这里线程进入等待状态 status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; // 线程进入有超时时间的等待,内部实现调用了pthread_cond_timedwait系统调用 status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } _cur_index = -1; // 省略assert代码 // _counter重新设置为0 _counter = 0 ; // 释放互斥锁 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // 插入写屏障 OrderAccess::fence(); // 省略额外检查代码 } // OrderAccess::fence 在linux平台的实现 inline void OrderAccess::fence() { // 如果是多核cpu if (os::is_MP()) { // always use locked addl since mfence is sometimes expensive // 判断是否AMD64 CPU,汇编代码实现写屏障 #ifdef AMD64 __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } } 复制代码
上面分析了park的实现原理,有了前面unpark方法分析的知识铺垫,park方法应该很容易看懂。
通过LockSupport的源码阅读,可以总结出一下几点
最后,还是想提一下Java层关于线程状态的小知识,可能有些同学会不是特别清楚,所以还是做个总结。 Java线程状态有以下6种。
关于并发,我们软件工程师要做的,就是控制线程在这几个状态间正确转换,所谓“工欲善其事,必先利其器”,JDK提供的各种并发工具类,我们只有深入了解它们,才能灵活高效的运用,这也是我记录"啃透Java并发"系列文章的初心,与君共勉!