消息机制是Android中重要的线程间通信手段。
它的存在可以让一个线程通知另一个线程去工作。那么一个线程为什么会有让另一个线程工作的需求呢?
可以看一个常见的应用场景——UI更新。Google官方有一句话解释了UI更新的规则:The Android UI toolkit is not thread-safe and the view must always be manipulated on the UI thread。 因为UI更新并不是线程安全的,所以Android为了规避开发者可能的不安全操作,干脆将所有UI更新都放在了主线程中进行。在这种场景下,就会出现其他线程请求主线程来帮忙更新UI的需求。
除了UI更新,某些设计模式的实现也离不开消息机制。
下图便是消息机制最基本的工作方式。A线程发送消息到B线程的消息队列中,B线程不断从消息队列中取出新的消息进行处理。
线程A在这里表现的就像是一个甩手掌柜,只负责发送消息,却从不干活。而线程B就像是一个苦力,不断地处理到来的消息。
下图便是消息机制的详细过程,主要分为两个部分:
消息通过Handler发送到另一个线程的MessageQueue中。另一个线程通过Looper不断轮询消息队列,取出其中的消息,并交给当初发送它的Handler进行处理。
上述详细过程有一个前提假设,也即线程B中存在Looper和MessageQueue。事实上,这两样东西并不是天生存在的。所以真正完整的详细过程包含以下三个部分:
在Android应用中,主线程自带Looper和MessageQueue,其他线程如果想具备消息机制的功能,则必须首先调用Looper.prepare()。
主线程为什么会自带Looper和MessageQueue呢?
所有Android应用的主线程都对应一个ActivityThread,正是由于所有Activity的回调方法都运行在主线程,所以Google便用ActivityThread来对应主线程。
ActivityThread的main方法是每个Android应用启动时的入口。通过6642行代码可知,主线程并非自带了Looper和MessageQueue,而是在ActivityThread的main方法中提前为我们创建好了而已。6642行创建了主线程的Looper和MessageQueue(下文有详述),6669行便开始了Looper的循环工作:不断从MessageQueue中取出消息并执行,消息队列为空时就将所在线程挂起休息,有新的消息到来时再起来继续工作。周而复始,永不停歇。
以上就是Android主线程的基本工作模型。至于我们所熟知的onCreate、onDestroy,其实背后也都是消息机制在起作用(当然还有Binder的身影)。
/frameworks/base/core/java/android/app/ActivityThread.java
6623 public static void main(String[] args) { 6624 Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain"); 6625 6626 // CloseGuard defaults to true and can be quite spammy. We 6627 // disable it here, but selectively enable it later (via 6628 // StrictMode) on debug builds, but using DropBox, not logs. 6629 CloseGuard.setEnabled(false); 6630 6631 Environment.initForCurrentUser(); 6632 6633 // Set the reporter for event logging in libcore 6634 EventLogger.setReporter(new EventLoggingReporter()); 6635 6636 // Make sure TrustedCertificateStore looks in the right place for CA certificates 6637 final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId()); 6638 TrustedCertificateStore.setDefaultUserDirectory(configDir); 6639 6640 Process.setArgV0("<pre-initialized>"); 6641 6642 Looper.prepareMainLooper(); 6643 6644 // Find the value for {@link #PROC_START_SEQ_IDENT} if provided on the command line. 6645 // It will be in the format "seq=114" 6646 long startSeq = 0; 6647 if (args != null) { 6648 for (int i = args.length - 1; i >= 0; --i) { 6649 if (args[i] != null && args[i].startsWith(PROC_START_SEQ_IDENT)) { 6650 startSeq = Long.parseLong( 6651 args[i].substring(PROC_START_SEQ_IDENT.length())); 6652 } 6653 } 6654 } 6655 ActivityThread thread = new ActivityThread(); 6656 thread.attach(false, startSeq); 6657 6658 if (sMainThreadHandler == null) { 6659 sMainThreadHandler = thread.getHandler(); 6660 } 6661 6662 if (false) { 6663 Looper.myLooper().setMessageLogging(new 6664 LogPrinter(Log.DEBUG, "ActivityThread")); 6665 } 6666 6667 // End of event ActivityThreadMain. 6668 Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER); 6669 Looper.loop(); 6670 6671 throw new RuntimeException("Main thread loop unexpectedly exited"); 6672 } 复制代码
Looper.prepareMainLooper是一个静态方法,它的作用是为主线程创建一个Looper和MessageQueue。其最终调用了prepare方法,创建了一个新的Looper并将它写入sThreadLocal字段。
sThreadLocal字段是一个静态变量,按照常理它应该在内存中独一份,且各个线程均可访问的。但这里sThreadLocal利用了TLS(ThreadLocalStorage)的机制,每个线程访问到的sThreadLocal是相互独立的,并不是同一个。所以,主线程调用prepareMainLooper方法,相当于创建了一个线程独有的Looper,并且将这个Looper赋值给名为sMainLooper的静态变量(方便其他线程获取主线程的Looper)。
/frameworks/base/core/java/android/os/Looper.java
114 public static void prepareMainLooper() { 115 prepare(false); 116 synchronized (Looper.class) { 117 if (sMainLooper != null) { 118 throw new IllegalStateException("The main Looper has already been prepared."); 119 } 120 sMainLooper = myLooper(); 121 } 122 } 复制代码
/frameworks/base/core/java/android/os/Looper.java
97 public static void prepare() { 98 prepare(true); 99 } 100 101 private static void prepare(boolean quitAllowed) { 102 if (sThreadLocal.get() != null) { 103 throw new RuntimeException("Only one Looper may be created per thread"); 104 } 105 sThreadLocal.set(new Looper(quitAllowed)); 106 } 复制代码
Looper的构造方法中会创建一个MessageQueue,所以调用Looper.prepare方法便会创建与线程唯一对应的Looper和MessageQueue。
/frameworks/base/core/java/android/os/Looper.java
267 private Looper(boolean quitAllowed) { 268 mQueue = new MessageQueue(quitAllowed); 269 mThread = Thread.currentThread(); 270 } 复制代码
MessageQueue的构造方法如下,它会调用nativeInit方法在native层做一些初始化的工作。
/frameworks/base/core/java/android/os/MessageQueue.java
70 MessageQueue(boolean quitAllowed) { 71 mQuitAllowed = quitAllowed; 72 mPtr = nativeInit(); 73 } 复制代码
63 private native static long nativeInit(); 复制代码
nativeInit对应的JNI方法为android_os_MessageQueue_nativeInit,其中创建了一个NativeMessageQueue对象,并将该对象的指针转化为long型传递给java层。在Android的世界中,存在大量java层对象和native层对象一一映射的关系,通常都是在java层对象中设立一个long型的字段,用于记录native对象的指针值。
/frameworks/base/core/jni/android_os_MessageQueue.cpp
172static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { 173 NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); 174 if (!nativeMessageQueue) { 175 jniThrowRuntimeException(env, "Unable to allocate native queue"); 176 return 0; 177 } 178 179 nativeMessageQueue->incStrong(env); 180 return reinterpret_cast<jlong>(nativeMessageQueue); 181} 复制代码
在NativeMessageQueue的构造函数中创建一个native层的Looper,并通过TLS的机制和线程绑定。
/frameworks/base/core/jni/android_os_MessageQueue.cpp
78NativeMessageQueue::NativeMessageQueue() : 79 mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { 80 mLooper = Looper::getForThread(); 81 if (mLooper == NULL) { 82 mLooper = new Looper(false); 83 Looper::setForThread(mLooper); 84 } 85} 复制代码
在native层Looper的构造过程中,67行的代码非常关键。它用于mWakeEventFd的初始化,创建出来的eventfd将会在rebuildEpollLocked函数中被epoll监听(151行)。Epoll机制是Linux内核中一种事件触发的机制,可以同时监听多个文件描述符。在调用epoll_wait将线程挂起的时候,如果有被监测的事件产生,则线程从挂起状态恢复,重新恢复运行。这其实是一种中断式的wait/notify机制。如果想了解这个机制的详细内容,可以参考这两篇博客: 博客1 和 博客2 。博客1中对epoll的基本概念讲述较多,博客2对epoll中的Level Trigger和Edge Trigger讲的非常清楚。
我们以149行到151行的代码为例,EPOLLIN表示监测mWakeEventFd上的可读事件,当该线程调用epoll_wait时,如果mWakeEventFd上有可读事件,则线程直接返回,否则挂起。在该线程挂起的时候,如果有其他线程往mWakeEventFd上写入新的数据,则该线程会接收到事件,并从挂起状态恢复为运行状态。
/system/core/libutils/Looper.cpp
63Looper::Looper(bool allowNonCallbacks) : 64 mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), 65 mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), 66 mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { 67 mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 68 LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", 69 strerror(errno)); 70 71 AutoMutex _l(mLock); 72 rebuildEpollLocked(); 73} 复制代码
/system/core/libutils/Looper.cpp
134void Looper::rebuildEpollLocked() { 135 // Close old epoll instance if we have one. 136 if (mEpollFd >= 0) { 137#if DEBUG_CALLBACKS 138 ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); 139#endif 140 close(mEpollFd); 141 } 142 143 // Allocate the new epoll instance and register the wake pipe. 144 mEpollFd = epoll_create(EPOLL_SIZE_HINT); 145 LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); 146 147 struct epoll_event eventItem; 148 memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union 149 eventItem.events = EPOLLIN; 150 eventItem.data.fd = mWakeEventFd; 151 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); 152 LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", 153 strerror(errno)); 154 155 for (size_t i = 0; i < mRequests.size(); i++) { 156 const Request& request = mRequests.valueAt(i); 157 struct epoll_event eventItem; 158 request.initEventItem(&eventItem); 159 160 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); 161 if (epollResult < 0) { 162 ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", 163 request.fd, strerror(errno)); 164 } 165 } 166} 复制代码
综上所述,一个可以处理消息的线程,必然会有一个唯一的Looper和唯一的MessageQueue。
消息通过Handler进行发送。
通过调用Handler类的sendMessage方法,我们可以发送一个消息。sendMessage最终调用的是sendMessageAtTime方法。参数uptimeMillis表示希望消息发送的时间点距离开机时间点的毫秒数,譬如手机15:00:00开机,消息发送者希望这条消息15:00:01准时发送,那么传入的uptimeMillis就是1000。
/frameworks/base/core/java/android/os/Handler.java
602 public final boolean sendMessage(Message msg) 603 { 604 return sendMessageDelayed(msg, 0); 605 } 复制代码
662 public final boolean sendMessageDelayed(Message msg, long delayMillis) 663 { 664 if (delayMillis < 0) { 665 delayMillis = 0; 666 } 667 return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); 668 } 复制代码
689 public boolean sendMessageAtTime(Message msg, long uptimeMillis) { 690 MessageQueue queue = mQueue; 691 if (queue == null) { 692 RuntimeException e = new RuntimeException( 693 this + " sendMessageAtTime() called with no mQueue"); 694 Log.w("Looper", e.getMessage(), e); 695 return false; 696 } 697 return enqueueMessage(queue, msg, uptimeMillis); 698 } 复制代码
sendMessageAtTime方法取出Handler的mQueue字段,并调用enqueueMessage方法。enqueueMessage的作用就是将消息加入到消息队列中。首先,将消息的target字段设置为发送时的Handler,表明这个消息被接收后依然由此Handler进行处理。其后根据Handler是否异步来决定发送的消息是否异步。最后调用MessageQueue的enqueueMessage方法。
/frameworks/base/core/java/android/os/Handler.java
740 private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { 741 msg.target = this; 742 if (mAsynchronous) { 743 msg.setAsynchronous(true); 744 } 745 return queue.enqueueMessage(msg, uptimeMillis); 746 } 复制代码
745行的queue是从Handler的sendMessageAtTime方法中传递过来的,它是Handler对象的mQueue字段,在Handler对象的构造方法中被赋值。为了搞清楚mQueue的来及,我们有必要看一看Handler的构造方法。
Handler的构造方法被重载了很多个,但底层其实都是这两个:
/frameworks/base/core/java/android/os/Handler.java
192 public Handler(Callback callback, boolean async) { 193 if (FIND_POTENTIAL_LEAKS) { 194 final Class<? extends Handler> klass = getClass(); 195 if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && 196 (klass.getModifiers() & Modifier.STATIC) == 0) { 197 Log.w(TAG, "The following Handler class should be static or leaks might occur: " + 198 klass.getCanonicalName()); 199 } 200 } 201 202 mLooper = Looper.myLooper(); 203 if (mLooper == null) { 204 throw new RuntimeException( 205 "Can't create handler inside thread " + Thread.currentThread() 206 + " that has not called Looper.prepare()"); 207 } 208 mQueue = mLooper.mQueue; 209 mCallback = callback; 210 mAsynchronous = async; 211 } 复制代码
232 public Handler(Looper looper, Callback callback, boolean async) { 233 mLooper = looper; 234 mQueue = looper.mQueue; 235 mCallback = callback; 236 mAsynchronous = async; 237 } 复制代码
二者最大的区别就在于,一个传入了Looper,另一个没有传入Looper。
传入Looper的话,Handler对象的mQueue就等于looper.mQueue。假设Handler对象在线程A中创建,其构造时传入的是线程B的Looper,那么通过这个Handler发送的消息就将由线程B来处理。
没有传入Looper的话,Handler对象的mQueue就等于其创建线程的Looper。依然假设Handler对象在线程A中创建,此时构造Handler时没有传入Looper,那么通过这个Handler发送的消息就将由线程A来处理。
请仔细体会上述两种情况的区别。
针对没有传入Looper的情况,这里还要多提几句。Handler对象创建之后,由于它存在于Java堆上,所以可以被任何线程访问、使用。任何线程通过它发送的消息,最终都将汇总到其创建线程的MessageQueue中,包括在它的创建线程中发送消息。
下面看看MessageQueue的enqueueMessage方法做了哪些工作。
/frameworks/base/core/java/android/os/MessageQueue.java
536 boolean enqueueMessage(Message msg, long when) { 537 if (msg.target == null) { 538 throw new IllegalArgumentException("Message must have a target."); 539 } 540 if (msg.isInUse()) { 541 throw new IllegalStateException(msg + " This message is already in use."); 542 } 543 544 synchronized (this) { 545 if (mQuitting) { 546 IllegalStateException e = new IllegalStateException( 547 msg.target + " sending message to a Handler on a dead thread"); 548 Log.w(TAG, e.getMessage(), e); 549 msg.recycle(); 550 return false; 551 } 552 553 msg.markInUse(); 554 msg.when = when; 555 Message p = mMessages; 556 boolean needWake; 557 if (p == null || when == 0 || when < p.when) { 558 // New head, wake up the event queue if blocked. 559 msg.next = p; 560 mMessages = msg; 561 needWake = mBlocked; 562 } else { 563 // Inserted within the middle of the queue. Usually we don't have to wake 564 // up the event queue unless there is a barrier at the head of the queue 565 // and the message is the earliest asynchronous message in the queue. 566 needWake = mBlocked && p.target == null && msg.isAsynchronous(); 567 Message prev; 568 for (;;) { 569 prev = p; 570 p = p.next; 571 if (p == null || when < p.when) { 572 break; 573 } 574 if (needWake && p.isAsynchronous()) { 575 needWake = false; 576 } 577 } 578 msg.next = p; // invariant: p == prev.next 579 prev.next = msg; 580 } 581 582 // We can assume mPtr != 0 because mQuitting is false. 583 if (needWake) { 584 nativeWake(mPtr); 585 } 586 } 587 return true; 588 } 复制代码
跳过enqueueMessage方法中的异常判断,其核心的作用只有一个:将新消息加入MessageQueue中的消息链表中。MessageQueue中的Message通过链表的方式进行管理,其中的消息按照发送时间的先后顺序排列。在管理链表的过程中,只需持有头部对象就可以遍历所有的对象。因此MessageQueue只用了一个字段(mMessages)来记录消息链表的头部消息。
557行和562行分别表示对新消息的两种处理方式,第一种是将新消息插入到链表头部,第二种是将新消息插入到链表中间(或尾部)。
先分析插入链表头部的情况。
除了插入到头部的三种情况外,其他情况下消息都将插入到链表中间(或尾部)。568行的for循环其实就是遍历消息链表,根据发送时间的先后顺序将消息插入到链表中。
除了需要将新消息插入到链表的合适位置,enqueueMessage还要决定是否唤醒MessageQueue所在的线程。MessageQueue的mBlocked字段记录了其所属线程是否已经发生阻塞(被挂起),该字段在消息处理的过程中被赋值。
当新消息插入到链表头部时,needWake = mBlocked:
当新消息插入到链表中间(或尾部)时,needWake的赋值变得复杂起来。这主要是由于异步消息和同步屏障的存在。
同步屏障像是一个守卫,当消息链表的头部是一个同步屏障时,后续的同步消息都无法被放行,即便这些消息已经满足发送的时间要求。此时,链表上的异步消息却不受影响,它们照常按照发送时间的逻辑,顺利地被处理。
同步屏障是一种特殊的Message,它的target为null,表明这个消息是不需要被处理的,而普通消息的target都是最终来处理该消息的Handler。通过MessageQueue的postSyncBarrier方法可以放置同步屏障,只不过这个方法是hide的,而且从Android P开始,反射调用非 SDK 的接口被限制了。虽然网上有一些手段可以绕开这种限制,但Google的本意应该是不想让开发者再使用同步屏障了。与之对应,撤除同步屏障的方法是removeSyncBarrier。
/frameworks/base/core/java/android/os/MessageQueue.java
461 public int postSyncBarrier() { 462 return postSyncBarrier(SystemClock.uptimeMillis()); 463 } 复制代码
465 private int postSyncBarrier(long when) { 466 // Enqueue a new sync barrier token. 467 // We don't need to wake the queue because the purpose of a barrier is to stall it. 468 synchronized (this) { 469 final int token = mNextBarrierToken++; 470 final Message msg = Message.obtain(); 471 msg.markInUse(); 472 msg.when = when; 473 msg.arg1 = token; 474 475 Message prev = null; 476 Message p = mMessages; 477 if (when != 0) { 478 while (p != null && p.when <= when) { 479 prev = p; 480 p = p.next; 481 } 482 } 483 if (prev != null) { // invariant: p == prev.next 484 msg.next = p; 485 prev.next = msg; 486 } else { 487 msg.next = p; 488 mMessages = msg; 489 } 490 return token; 491 } 492 } 复制代码
同步消息和异步消息的唯一差异在于Message的flag是否被置上 FLAG_ASYNCHRONOUS
标志位。这个标志位只在setAsynchronous方法中被改变。如果Handler的mAsynchronous为true,则通过该Handler发送的消息默认都是异步;反之,默认都是同步。除此以外,我们也可以通过消息的setAsynchronous方法来单独地给某个方法设置是否异步。
/frameworks/base/core/java/android/os/Message.java
447 public boolean isAsynchronous() { 448 return (flags & FLAG_ASYNCHRONOUS) != 0; 449 } 复制代码
477 public void setAsynchronous(boolean async) { 478 if (async) { 479 flags |= FLAG_ASYNCHRONOUS; 480 } else { 481 flags &= ~FLAG_ASYNCHRONOUS; 482 } 483 } 复制代码
回到新消息插入到链表中间(或尾部)时needWake的赋值,needWake在遍历之初被赋值如下:
/frameworks/base/core/java/android/os/MessageQueue.java
566 needWake = mBlocked && p.target == null && msg.isAsynchronous(); 复制代码
只有当MessageQueue所在的线程阻塞,链表头部为同步屏障,且新消息为异步消息时,needWake才为true。三者缺一不可。
另外在遍历的过程中,如果发现新消息的前面有另一个消息为异步消息,则needWake重新置为false。这种情况表明原有的异步消息为线程设置了有超时的阻塞,当下时间未达到异步消息的发送时间,所以mBlocked为true。但由于此次阻塞设有超时,所以并不需要外不唤醒。
线程的阻塞相当于人类的睡眠,从阻塞状态中恢复有两种可能,一种是超时唤醒,另一个是外部唤醒。类比到人类的睡眠,人从睡梦中被叫醒也有两种可能,一种是自己定闹钟,闹钟响后将自己叫醒,另一种是被别人拍醒(不考虑自然醒,因为自然醒本质也是闹钟叫醒,只不过这个闹钟是生物钟)。
上面介绍了是否应该主动唤醒线程,如果回答“需要”的话,那我们又该怎样去唤醒线程呢?
/frameworks/base/core/java/android/os/MessageQueue.java
584 nativeWake(mPtr); 复制代码
通过nativeWake的native方法,我们就可以实现唤醒MessageQueue所在线程的目的。它对应的JNI方法是android_os_MessageQueue_nativeWake。传入的mPtr实际上是native对象的指针,它被存在一个Java的字段中,用于Java层和native层的互动。
mPtr被转换成NativeMessageQueue对象(c++对象)的指针,紧接着调用NativeMessageQueue对象的wake方法。
/frameworks/base/core/jni/android_os_MessageQueue.cpp
194static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { 195 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); 196 nativeMessageQueue->wake(); 197} 复制代码
再追踪下去,发现调用的是NativeMessageQueue中mLooper变量的wake方法。最终只干了一件事:往native层的Looper对象的mWakeEventFd中写一个“1”。结合2.1中对Epoll机制的描述,mWakeEventFd上有可读数据时,epfd将会监测到该事件,并将线程从挂起状态恢复为运行状态。
/frameworks/base/core/jni/android_os_MessageQueue.cpp
121void NativeMessageQueue::wake() { 122 mLooper->wake(); 123} 复制代码
/system/core/libutils/Looper.cpp
398void Looper::wake() { 399#if DEBUG_POLL_AND_WAKE 400 ALOGD("%p ~ wake", this); 401#endif 402 403 uint64_t inc = 1; 404 ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); 405 if (nWrite != sizeof(uint64_t)) { 406 if (errno != EAGAIN) { 407 LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s", 408 mWakeEventFd, strerror(errno)); 409 } 410 } 411} 复制代码
对于想要运行消息机制的线程而言,除了需要通过Looper.prepare来创建属于自己的Looper和MessageQueue,还需要调用Looper.loop来真正的轮询、处理消息。
/frameworks/base/core/java/android/os/Looper.java
127 public static Looper getMainLooper() { 128 synchronized (Looper.class) { 129 return sMainLooper; 130 } 131 } 132 133 /** 134 * Run the message queue in this thread. Be sure to call 135 * {@link #quit()} to end the loop. 136 */ 137 public static void loop() { 138 final Looper me = myLooper(); 139 if (me == null) { 140 throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); 141 } 142 final MessageQueue queue = me.mQueue; 143 144 // Make sure the identity of this thread is that of the local process, 145 // and keep track of what that identity token actually is. 146 Binder.clearCallingIdentity(); 147 final long ident = Binder.clearCallingIdentity(); 148 149 // Allow overriding a threshold with a system prop. e.g. 150 // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start' 151 final int thresholdOverride = 152 SystemProperties.getInt("log.looper." 153 + Process.myUid() + "." 154 + Thread.currentThread().getName() 155 + ".slow", 0); 156 157 boolean slowDeliveryDetected = false; 158 159 for (;;) { 160 Message msg = queue.next(); // might block 161 if (msg == null) { 162 // No message indicates that the message queue is quitting. 163 return; 164 } 165 166 // This must be in a local variable, in case a UI event sets the logger 167 final Printer logging = me.mLogging; 168 if (logging != null) { 169 logging.println(">>>>> Dispatching to " + msg.target + " " + 170 msg.callback + ": " + msg.what); 171 } 172 173 final long traceTag = me.mTraceTag; 174 long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs; 175 long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs; 176 if (thresholdOverride > 0) { 177 slowDispatchThresholdMs = thresholdOverride; 178 slowDeliveryThresholdMs = thresholdOverride; 179 } 180 final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0); 181 final boolean logSlowDispatch = (slowDispatchThresholdMs > 0); 182 183 final boolean needStartTime = logSlowDelivery || logSlowDispatch; 184 final boolean needEndTime = logSlowDispatch; 185 186 if (traceTag != 0 && Trace.isTagEnabled(traceTag)) { 187 Trace.traceBegin(traceTag, msg.target.getTraceName(msg)); 188 } 189 190 final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0; 191 final long dispatchEnd; 192 try { 193 msg.target.dispatchMessage(msg); 194 dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0; 195 } finally { 196 if (traceTag != 0) { 197 Trace.traceEnd(traceTag); 198 } 199 } 200 if (logSlowDelivery) { 201 if (slowDeliveryDetected) { 202 if ((dispatchStart - msg.when) <= 10) { 203 Slog.w(TAG, "Drained"); 204 slowDeliveryDetected = false; 205 } 206 } else { 207 if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery", 208 msg)) { 209 // Once we write a slow delivery log, suppress until the queue drains. 210 slowDeliveryDetected = true; 211 } 212 } 213 } 214 if (logSlowDispatch) { 215 showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg); 216 } 217 218 if (logging != null) { 219 logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); 220 } 221 222 // Make sure that during the course of dispatching the 223 // identity of the thread wasn't corrupted. 224 final long newIdent = Binder.clearCallingIdentity(); 225 if (ident != newIdent) { 226 Log.wtf(TAG, "Thread identity changed from 0x" 227 + Long.toHexString(ident) + " to 0x" 228 + Long.toHexString(newIdent) + " while dispatching to " 229 + msg.target.getClass().getName() + " " 230 + msg.callback + " what=" + msg.what); 231 } 232 233 msg.recycleUnchecked(); 234 } 235 } 复制代码
159行开始的for循环在正常状态下永远不会退出,除非调用Looper或MessageQueue的quit方法。在每一次循环的过程中,都做了以下三件事:
在这三个步骤中,需要详细分析的是1和2。1需要较多的篇幅来阐述,因此我们先分析2的过程。
待发送的消息通常都有一个预设的发送时间点,也即message的when字段。当这个消息从消息链表中被取出时,记录另一个时间点,称之为dispatchStart。正常情况下,dispatchStart和msg.when相同,表明消息按照预设的时间点被取出。非正常情况下,如果前面消息处理时间过长,将会延误后续消息的发送(因为消息链表是串行发送的)。这个道理和排队的情形很相似。
DelieveryTime = dispatchStart - msg.when,表示消息被取出的时间点和预设的时间点之间的差距。差值较小,表明消息基本是按照预设的时间来取出的。差值较大,则表明消息队列有些拥堵,可能是前面的消息过多,也可能是前面某个消息的处理耗时过长。总之,当前这个消息并没有按照预设的时间被取出,而是有些滞后了。
消息的处理时间,也即消息所对应Handler的dispatchMessage方法的运行时间。每个消息都有属于自己的处理方法,其中可能包含某些耗时操作。因此记录下dispatch time,当这个时间超过某个阈值时给出相应的警告,可以帮助开发者了解程序的性能以及运行时的压力。
消息处理会调用Handler的dispatchMessage方法来对消息进行处理。在这个方法内部,我们可以看出一个消息会有三种处理方式。三种处理方式并非随机选择,而是具有一定的优先级的。
/frameworks/base/core/java/android/os/Handler.java
97 public void dispatchMessage(Message msg) { 98 if (msg.callback != null) { 99 handleCallback(msg); 100 } else { 101 if (mCallback != null) { 102 if (mCallback.handleMessage(msg)) { 103 return; 104 } 105 } 106 handleMessage(msg); 107 } 108 } 复制代码
以下分别列举满足3种处理方式的例子:
/frameworks/base/core/java/android/speech/tts/TextToSpeechService.java
579 Runnable runnable = new Runnable() { 580 @Override 581 public void run() { 582 if (setCurrentSpeechItem(speechItem)) { 583 speechItem.play(); 584 removeCurrentSpeechItem(); 585 } else { 586 // The item is alreadly flushed. Stopping. 587 speechItem.stop(); 588 } 589 } 590 }; 591 Message msg = Message.obtain(this, runnable); 复制代码
/frameworks/base/services/core/java/com/android/server/GraphicsStatsService.java
110 mWriteOutHandler = new Handler(bgthread.getLooper(), new Handler.Callback() { 111 @Override 112 public boolean handleMessage(Message msg) { 113 switch (msg.what) { 114 case SAVE_BUFFER: 115 saveBuffer((HistoricalBuffer) msg.obj); 116 break; 117 case DELETE_OLD: 118 deleteOldBuffers(); 119 break; 120 } 121 return true; 122 } 123 }); 复制代码
/frameworks/base/services/core/java/com/android/server/pm/ProcessLoggingHandler.java
35public final class ProcessLoggingHandler extends Handler { ...... ...... 47 @Override 48 public void handleMessage(Message msg) { 49 switch (msg.what) { 50 case LOG_APP_PROCESS_START_MSG: { 51 Bundle bundle = msg.getData(); 52 String processName = bundle.getString("processName"); 53 int uid = bundle.getInt("uid"); 54 String seinfo = bundle.getString("seinfo"); 55 String apkFile = bundle.getString("apkFile"); 56 int pid = bundle.getInt("pid"); 57 long startTimestamp = bundle.getLong("startTimestamp"); 58 String apkHash = computeStringHashOfApk(apkFile); 59 SecurityLog.writeEvent(SecurityLog.TAG_APP_PROCESS_START, processName, 60 startTimestamp, uid, pid, seinfo, apkHash); 61 break; 62 } 63 case INVALIDATE_BASE_APK_HASH_MSG: { 64 Bundle bundle = msg.getData(); 65 mProcessLoggingBaseApkHashes.remove(bundle.getString("apkFile")); 66 break; 67 } 68 } 69 } 复制代码
开发者定义的都是Handler的子类(譬如上面的ProcessingLoggingHandler),如果需要最终由Handler类的handleMessage来对消息进行处理,则子类中必须覆盖父类的handleMessage方法。否则将不会对消息进行处理,因为父类(Handler)的handleMessage方法是一个空方法。
这种阶梯式处理消息的设计,可以给予开发者更大的自由度。
接下来重点讲述如何取出消息链表中可被处理的头部消息。让我们走进MessageQueue的next方法。
/frameworks/base/core/java/android/os/Looper.java
160 Message msg = queue.next(); // might block 复制代码
/frameworks/base/core/java/android/os/MessageQueue.java
310 Message next() { 311 // Return here if the message loop has already quit and been disposed. 312 // This can happen if the application tries to restart a looper after quit 313 // which is not supported. 314 final long ptr = mPtr; 315 if (ptr == 0) { 316 return null; 317 } 318 319 int pendingIdleHandlerCount = -1; // -1 only during first iteration 320 int nextPollTimeoutMillis = 0; 321 for (;;) { 322 if (nextPollTimeoutMillis != 0) { 323 Binder.flushPendingCommands(); 324 } 325 326 nativePollOnce(ptr, nextPollTimeoutMillis); 327 328 synchronized (this) { 329 // Try to retrieve the next message. Return if found. 330 final long now = SystemClock.uptimeMillis(); 331 Message prevMsg = null; 332 Message msg = mMessages; 333 if (msg != null && msg.target == null) { 334 // Stalled by a barrier. Find the next asynchronous message in the queue. 335 do { 336 prevMsg = msg; 337 msg = msg.next; 338 } while (msg != null && !msg.isAsynchronous()); 339 } 340 if (msg != null) { 341 if (now < msg.when) { 342 // Next message is not ready. Set a timeout to wake up when it is ready. 343 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); 344 } else { 345 // Got a message. 346 mBlocked = false; 347 if (prevMsg != null) { 348 prevMsg.next = msg.next; 349 } else { 350 mMessages = msg.next; 351 } 352 msg.next = null; 353 if (DEBUG) Log.v(TAG, "Returning message: " + msg); 354 msg.markInUse(); 355 return msg; 356 } 357 } else { 358 // No more messages. 359 nextPollTimeoutMillis = -1; 360 } 361 362 // Process the quit message now that all pending messages have been handled. 363 if (mQuitting) { 364 dispose(); 365 return null; 366 } 367 368 // If first time idle, then get the number of idlers to run. 369 // Idle handles only run if the queue is empty or if the first message 370 // in the queue (possibly a barrier) is due to be handled in the future. 371 if (pendingIdleHandlerCount < 0 372 && (mMessages == null || now < mMessages.when)) { 373 pendingIdleHandlerCount = mIdleHandlers.size(); 374 } 375 if (pendingIdleHandlerCount <= 0) { 376 // No idle handlers to run. Loop and wait some more. 377 mBlocked = true; 378 continue; 379 } 380 381 if (mPendingIdleHandlers == null) { 382 mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; 383 } 384 mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); 385 } 386 387 // Run the idle handlers. 388 // We only ever reach this code block during the first iteration. 389 for (int i = 0; i < pendingIdleHandlerCount; i++) { 390 final IdleHandler idler = mPendingIdleHandlers[i]; 391 mPendingIdleHandlers[i] = null; // release the reference to the handler 392 393 boolean keep = false; 394 try { 395 keep = idler.queueIdle(); 396 } catch (Throwable t) { 397 Log.wtf(TAG, "IdleHandler threw exception", t); 398 } 399 400 if (!keep) { 401 synchronized (this) { 402 mIdleHandlers.remove(idler); 403 } 404 } 405 } 406 407 // Reset the idle handler count to 0 so we do not run them again. 408 pendingIdleHandlerCount = 0; 409 410 // While calling an idle handler, a new message could have been delivered 411 // so go back and look again for a pending message without waiting. 412 nextPollTimeoutMillis = 0; 413 } 414 } 复制代码
首先分析326行的nativePollOnce方法,它的作用是设定下一次发送的时间或挂起线程。其对应的JNI方法为android_os_MessageQueue_nativePollOnce。内部调用NativeMessageQueue的pollOnce函数。
/frameworks/base/core/jni/android_os_MessageQueue.cpp
188static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, 189 jlong ptr, jint timeoutMillis) { 190 NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); 191 nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 192} 复制代码
NativeMessageQueue的pollOnce函数进一步调用Looper的pollOnce函数,并传入timeoutMills参数。
/frameworks/base/core/jni/android_os_MessageQueue.cpp
107void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { 108 mPollEnv = env; 109 mPollObj = pollObj; 110 mLooper->pollOnce(timeoutMillis); 111 mPollObj = NULL; 112 mPollEnv = NULL; 113 114 if (mExceptionObj) { 115 env->Throw(mExceptionObj); 116 env->DeleteLocalRef(mExceptionObj); 117 mExceptionObj = NULL; 118 } 119} 复制代码
一层层往下走,发现最终调用的是Looper的pollInner函数,最终通过系统调用epoll_wait陷入内核态。
/system/core/libutils/Looper.cpp
242 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); 复制代码
传入epoll_wait的timeoutMillis参数将直接决定epoll的行为。这里可以分为三种情况:
通过320行可知,nextPollTimeoutMillis在第一次循环时被设置为0,意味着第一次循环将跳过epoll_wait的等待,直接去检查消息链表的状态。
/frameworks/base/core/java/android/os/MessageQueue.java
320 int nextPollTimeoutMillis = 0; 321 for (;;) { 322 if (nextPollTimeoutMillis != 0) { 323 Binder.flushPendingCommands(); 324 } 325 326 nativePollOnce(ptr, nextPollTimeoutMillis); 复制代码
330-339行的主要工作是取出链表中第一个可被处理的消息。上文提到,MessageQueue只用了一个字段(mMessages)来记录消息链表的头部消息,所以通过332行便可以取到头部消息。如果链表头部是同步屏障,那么就要遍历去寻找链表中第一个异步消息。
/frameworks/base/core/java/android/os/MessageQueue.java
330 final long now = SystemClock.uptimeMillis(); 331 Message prevMsg = null; 332 Message msg = mMessages; 333 if (msg != null && msg.target == null) { 334 // Stalled by a barrier. Find the next asynchronous message in the queue. 335 do { 336 prevMsg = msg; 337 msg = msg.next; 338 } while (msg != null && !msg.isAsynchronous()); 339 } 复制代码
当取出的可处理消息为null时,意味着链表中暂时没有消息可以被处理,所以将nextPollTimeoutMillis置为-1,让next下一次轮询的时候直接通过epoll_wait将线程挂起休息。
反之则需要有进一步的处理,分两种情况讨论:
/frameworks/base/core/java/android/os/MessageQueue.java
340 if (msg != null) { 341 if (now < msg.when) { 342 // Next message is not ready. Set a timeout to wake up when it is ready. 343 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); 344 } else { 345 // Got a message. 346 mBlocked = false; 347 if (prevMsg != null) { 348 prevMsg.next = msg.next; 349 } else { 350 mMessages = msg.next; 351 } 352 msg.next = null; 353 if (DEBUG) Log.v(TAG, "Returning message: " + msg); 354 msg.markInUse(); 355 return msg; 356 } 357 } else { 358 // No more messages. 359 nextPollTimeoutMillis = -1; 360 } 复制代码
在MessageQueue的next方法中,还会对IdleHandler进行处理。IdleHandler,顾名思义,表示线程空闲时才需要去执行的一些操作。如果此时链表头部的消息为空或尚未到达发送时间,则表明线程空闲,因此可以去处理一些杂事(IdleHandler里的工作)。
通过319行可知,pendingIdleHandlerCount最初始被赋值为-1。
/frameworks/base/core/java/android/os/MessageQueue.java
319 int pendingIdleHandlerCount = -1; // -1 only during first iteration 复制代码
所以第一次运行到371行时,pendingIdleHandlerCount必定小于0。通过373行到384行,将mIdleHandlers(类型为ArrayList)中的元素赋值给mPendingIdleHandlers(类型为数组)。之所以不直接使用mIdleHandlers来进行遍历,是因为遍历处理mIdleHandles时无需持有MessageQueue的monitor lock,于是干脆将锁释放,让其他线程可以在处理mPendingIdleHandlers中的元素时,同时往mIdleHandlers中插入新的元素。
如果不需要对IdleHandler处理,或者mIdleHandlers中没有需要处理的对象,则设置mBlocked为true(377行),在下一轮循环的过程中会通过epoll_wait将本线程挂起。需要注意的一点是,如果此次next()方法能够取出有效消息进行处理,代码是不会执行到371行及以下的位置,它会在355行直接返回。
接下来便是遍历mIdleHandlers中的元素,并执行它们的queueIdle方法的过程。如果queueIdle返回false,表明该IdleHandler只会执行一次,执行完之后就从mIdleHandlers列表中删除。
/frameworks/base/core/java/android/os/MessageQueue.java
371 if (pendingIdleHandlerCount < 0 372 && (mMessages == null || now < mMessages.when)) { 373 pendingIdleHandlerCount = mIdleHandlers.size(); 374 } 375 if (pendingIdleHandlerCount <= 0) { 376 // No idle handlers to run. Loop and wait some more. 377 mBlocked = true; 378 continue; 379 } 380 381 if (mPendingIdleHandlers == null) { 382 mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; 383 } 384 mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); 385 } 386 387 // Run the idle handlers. 388 // We only ever reach this code block during the first iteration. 389 for (int i = 0; i < pendingIdleHandlerCount; i++) { 390 final IdleHandler idler = mPendingIdleHandlers[i]; 391 mPendingIdleHandlers[i] = null; // release the reference to the handler 392 393 boolean keep = false; 394 try { 395 keep = idler.queueIdle(); 396 } catch (Throwable t) { 397 Log.wtf(TAG, "IdleHandler threw exception", t); 398 } 399 400 if (!keep) { 401 synchronized (this) { 402 mIdleHandlers.remove(idler); 403 } 404 } 405 } 406 407 // Reset the idle handler count to 0 so we do not run them again. 408 pendingIdleHandlerCount = 0; 409 410 // While calling an idle handler, a new message could have been delivered 411 // so go back and look again for a pending message without waiting. 412 nextPollTimeoutMillis = 0; 413 } 414 } 复制代码