privatestaticvoid prepare(boolean quitAllowed) { //判断sThreadLocal是否为null,不为空则直接跑出异常 可以保证一个线程只可以调用一次prepare方法 if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
//为主线程创建一个Looper对象 该方法会在主线程创建时自动调用 publicstaticvoid prepareMainLooper() { prepare(false); synchronized (Looper.class) { if (sMainLooper != null) { throw new IllegalStateException("The main Looper has already been prepared."); } sMainLooper = myLooper(); } }
//源码位置:.../core/java/android/os/Handler.java //Handler默认构造方法 public Handler() { this(null, false); }
public Handler(Callback callback, boolean async) { if (FIND_POTENTIAL_LEAKS) { final Class<? extends Handler> klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0) { Log.w(TAG, "The following Handler class should be static or leaks might occur: " + klass.getCanonicalName()); } } //从当前线程的ThreadLocal获取Looper对象 mLooper = Looper.myLooper(); if (mLooper == null) { throw new RuntimeException( "Can't create handler inside thread that has not called Looper.prepare()"); } //获取当前Looper的消息队列 mQueue = mLooper.mQueue; mCallback = callback; //设置消息是否为异步处理方式 mAsynchronous = async; }
//源码位置 .../core/java/android/os/Message.java /** Constructor (but the preferred way to get a Message is to call {@link #obtain() Message.obtain()}). */ //new Message 方法 public Message() { }
privatestaticfinal Object sPoolSync = new Object(); //维护一个Message池,用于复用Message对象 privatestatic Message sPool; //最多维护50个 privatestaticfinalint MAX_POOL_SIZE = 50; //obtain方法 直接从池内获取Message对象,避免new占用内存 publicstatic Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; //直接从池中取出 return m; } } //无可复用对象,则重新new获取 return new Message(); }
//源码位置:..core/java/android/os/MessageQueue.java //内部是一个单链表有序序列,由 Message.when 作为排序依据,该值为一个相对时间。 boolean enqueueMessage(Message msg, long when) { ... synchronized (this) { //正在退出 回收Message if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; }
msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; // p == null判断当前队列中是否有消息,插入消息作为队列头 // when == 0||when < p.when 队列当前处于等待状态 唤醒队列 if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. //当前队列有消息,按照消息创建时间插入到队列中 needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; //从对列头部开始遍历 for (;;) { prev = p; p = p.next; //循环到队列尾部或者出现一个when小于当前Message的when if (p == null || when < p.when) { break; } //如果是异步消息 且 存在同步屏障 if (needWake && p.isAsynchronous()) { needWake = false;//不需要唤醒 队列 } } msg.next = p; // invariant: p == prev.next prev.next = msg; }
// We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
publicvoid recycle() { //正在使用 无法回收 if (isInUse()) { if (gCheckRecycle) { throw new IllegalStateException("This message cannot be recycled because it " + "is still in use."); } return; } recycleUnchecked(); }
void recycleUnchecked() { // Mark the message as in use while it remains in the recycled object pool. // Clear out all other details. //置为使用标记 flags = FLAG_IN_USE; what = 0; arg1 = 0; arg2 = 0; obj = null; replyTo = null; sendingUid = -1; when = 0; target = null; callback = null; data = null;
//将Message放在了列表里,缓存的对象由obtain()拿出来复用 synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } } }
//源码位置 .../core/java/android/os/MessageQueue.java void quit(boolean safe) { if (!mQuitAllowed) { throw new IllegalStateException("Main thread not allowed to quit."); }
public Handler(Callback callback, boolean async/*是否异步*/) { if (FIND_POTENTIAL_LEAKS) { final Class<? extends Handler> klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0) { Log.w(TAG, "The following Handler class should be static or leaks might occur: " + klass.getCanonicalName()); } }
mLooper = Looper.myLooper(); if (mLooper == null) { throw new RuntimeException( "Can't create handler inside thread " + Thread.currentThread() + " that has not called Looper.prepare()"); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async;//设置异步标志 }
mAsynchronus异步标志默认为false,在以下代码中使用
1 2 3 4 5 6 7
privateboolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true);//设置异步消息 } return queue.enqueueMessage(msg, uptimeMillis); }
设置消息为异步消息有两种方式:
new Handler(true):所有发出去的消息都会setAsynchronous(true)对应方法都为@hide,不推荐使用
privateint postSyncBarrier(long when) { // Enqueue a new sync barrier token. // We don't need to wake the queue because the purpose of a barrier is to stall it. synchronized (this) { finalint token = mNextBarrierToken++; //没有赋值target,后面需要通过判断target == null,判断是否为同步屏障消息 final Message msg = Message.obtain(); msg.markInUse(); msg.when = when; msg.arg1 = token;
//MessageQueue.java Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. finallong ptr = mPtr; if (ptr == 0) { return null; }
int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } //唤醒队列 nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) { // Try to retrieve the next message. Return if found. finallong now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { //同步屏障,找到下一个 异步消息 // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { //取出异步消息 prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; }
// Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } ... } ... } }
//MessageQueue.java publicvoid removeSyncBarrier(int token) { // Remove a sync barrier token from the queue. // If the queue is no longer stalled by a barrier then wake it. synchronized (this) { Message prev = null; Message p = mMessages; //根据token找到对应的 同步屏障消息,并移除 while (p != null && (p.target != null || p.arg1 != token)) { prev = p; p = p.next; } if (p == null) { throw new IllegalStateException("The specified message queue synchronization " + " barrier token has not been posted or has already been removed."); } finalboolean needWake; if (prev != null) { prev.next = p.next; needWake = false; } else { mMessages = p.next; needWake = mMessages == null || mMessages.target != null; } p.recycleUnchecked();
// If the loop is quitting then it is already awake. // We can assume mPtr != 0 when mQuitting is false. if (needWake && !mQuitting) { nativeWake(mPtr); } } }
struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN;//可读事件 eventItem.data.fd = mWakeEventFd; //添加唤醒事件到 epoll实例中 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno));
for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); //将各种事件如键盘、鼠标等事件添加到 mEpollFd中,进行监听 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } }
... //处理带有callback的response事件 for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data;
// Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); }
// Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } }
//Handler.java publicfinalboolean runWithScissors(final Runnable r, long timeout) { if (r == null) { throw new IllegalArgumentException("runnable must not be null"); } if (timeout < 0) { throw new IllegalArgumentException("timeout must be non-negative"); }
if (Looper.myLooper() == mLooper) { r.run(); return true; }