English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
Messages are stored in a message queue, and the message loop thread is in an infinite loop around this message queue until the thread exits. If there are messages in the queue, the message loop thread will take them out and distribute them to the corresponding Handler for processing; if there are no messages in the queue, the message loop thread will enter an idle waiting state, waiting for the arrival of the next message. When writing Android applications, when the tasks executed by the program are relatively heavy, in order not to block the UI main thread and cause ANR, we usually create a child thread to complete specific tasks. When creating a child thread, there are two choices: one is to create a child thread without a message loop by creating a Thread object; the other is to create a child thread with a message loop. Due to the two implementation methods, one is to directly use the HandlerThread class encapsulated by Android to generate a thread object with a message loop, and the other method is to start a message loop within the run() method of the thread implementation in the following way:
I. Message Mechanism Usage
Generally, messages consist of a message thread and a Handler. Let's take a look at a message Handler in PowerManagerService below:
mHandlerThread = new ServiceThread(TAG, Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/); mHandlerThread.start(); mHandler = new PowerManagerHandler(mHandlerThread.getLooper());
Here, ServiceThread is a HandlerThread. When creating a Handler, it is necessary to pass in the looper of HandlerThread, otherwise it will use the looper of the current thread by default.
And each handler, roughly as follows:
private final class PowerManagerHandler extends Handler { public PowerManagerHandler(Looper looper) { super(looper, null, true /*async*/); } @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_USER_ACTIVITY_TIMEOUT: handleUserActivityTimeout(); break; case MSG_SANDMAN: handleSandman(); break; case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT: handleScreenBrightnessBoostTimeout(); break; case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT: checkWakeLockAquireTooLong(); Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); m.setAsynchronous(true); mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT); break; } } }
II. Message Mechanism Principle
That's why let's first look at the main function of HandlerThread's run function:
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper();//After assignment, notifyall, mainly because getLooper function returns mLooper notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
Let's take a look at the prepare function of Lopper, which finally creates a Looper object and places it in the thread's local variable.
public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
The constructor of Looper creates a MessageQueue
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
Let's take a look at the MessageQueue constructor, in which nativeInit is a native method, and the returned value is stored in mPtr, which is obviously saved as a pointer of long type
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
The native function mainly creates the NativeMessageQueue object and returns the pointer variable.
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); }
The constructor of NativeMessageQueue is to obtain mLooper, and if there is none, it will create a new Looper
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
Then let's take a look at the Looper constructor, which shows that it called eventfd to create a file descriptor, and eventfd is mainly used for inter-process or inter-thread communication, and we can see the introduction of eventfd in this blog post
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno); AutoMutex _l(mLock); rebuildEpollLocked(); }
2.1 c layer creates epoll
Let's take a look at the rebuildEpollLocked function, which creates epoll, adds mWakeEventFd to epoll, and also adds the fd of mRequests to epoll
void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - Rebuilding epoll set", this); #endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(&eventItem, 0, sizeof(epoll_event)); // Zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d", errno); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", request.fd, errno); } } }
Let's continue back to the run function of HandlerThread, and let's continue to analyze the loop function of Looper
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
Let's take a look at Looper's loop function:
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue;//Get the Looper's mQueue // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); // This function might block, and the blocking is mainly epoll_wait if (msg == null) { // 没有消息表示消息队列正在退出。 return; } // 这必须在局部变量中,以防UI事件设置了记录器 Printer logging = me.mLogging;//Self-printed if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + : " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // the identity of the thread was not corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
The next function of the MessageQueue class mainly calls the nativePollOnce function, and then retrieves a Message from the message queue
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mPtr;//The pointer reserved before if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during the first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis);
Below we mainly look at the nativePollOnce native function, convert the previous pointer to NativeMessageQueue by force conversion, and then call its pollOnce function
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
2.2 c layer epoll_wait block
The pollOnce function, the while loop before this function usually does not exist and only handles the case where indent is greater than 0, which usually does not occur, so we can directly look at the pollInner function
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce" - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce" - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
The pollInner function mainly calls epoll_wait to block, and the Java layer calculates the time of each block and passes it to the C layer, waiting for mWakeEventFd or the fd added previously to have an event, then epoll_wait returns.
int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce" - waiting: timeoutMillis=%d", this, timeoutMillis); #endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce" - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif } // Poll. int result = POLL_WAKE; mResponses.clear();//Clear mResponses mResponseIndex = 0; // We are about to idle. mPolling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//The main thread of epoll_wait is blocked here, and the blocking time is also passed from the java layer. // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // Rebuild epoll set if needed. if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = POLL_ERROR; goto Done; } // 检查poll超时。 if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce" - 超时", this); #endif result = POLL_TIMEOUT; goto Done; } // 处理所有事件。 #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce" - 处理来自%d fds的事件", this, eventCount); #endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) {//通知唤醒线程的事件 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("忽略在唤醒事件fd上意外的epoll事件 0x%x。", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd);//之前addFd的事件 if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex));//放在mResponses中 } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } 完成:; // 调用挂起消息回调。 mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) {// 这部分主要是c层的消息,java层的消息是自己管理的 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce" - 发送消息:handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // 释放处理器 mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // 队列头部最后留下的消息决定了下一次唤醒时间。 mNextMessageUptime = messageEnvelope.uptime; break; } } // 释放锁。 mLock.unlock(); // 调用所有响应回调。 for (size_t i = 0; i < mResponses.size(); i++) {//这是之前addFd的事件的处理,主要是遍历mResponses,然后调用其回调 Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce" - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Promptly clear the callback reference in the response structure because we // we will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result; }
继续分析Looper的loop函数,可以增加自己的打印来调试代码,之前调用Message的target的dispatchMessage来分配消息
for (;;) { Message msg = queue.next(); // 可能会阻塞 if (msg == null) { // 没有消息表示消息队列正在退出。 return; } // 这必须在局部变量中,以防UI事件设置了记录器 Printer logging = me.mLogging;//self-printing if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + : " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // the identity of the thread was not corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
2.3 Add debugging printing
Let's first look at the printing we added, which can be printed through the setMessageLogging function of Lopper
public void setMessageLogging(@Nullable Printer printer) { mLogging = printer; } Printer is an interface public interface Printer { /** * Write a line of text to the output. There is no need to terminate * the given string with a newline. */ void println(String x); }
2.4 Java layer message distribution processing
Let's take a look at the message distribution, which first calls the obtainMessage function of the Handler
Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
First, let's look at the obtainMessage that calls the obtain function of Message
public final Message obtainMessage(int what) { return Message.obtain(this, what); }
The obtain function of Message is to create a new Message, and then set its target to its Handler
public static Message obtain(Handler h, int what) { Message m = obtain();//That is, create a new Message m.target = h; m.what = what; return m; }
Let's link back to the previous message distribution
msg.target.dispatchMessage(msg); Finally, it calls the Handler's dispatchMessage function, and finally in the Handler, it will handle messages according to different situations.
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg);//This is the case where it is sent in the form of post, with a Runnable } else { if (mCallback != null) {//This is the case when the handler passes in mCallback as a callback if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg);//Finally, it is handled by the self-implemented handleMessage } }
2.3 Java layer message sending
Let's take a look at the message sending at the Java layer, which mainly calls the Handler's sendMessage, post, and other functions, and ultimately calls the following function
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException(" this + " sendMessageAtTime() called with no mQueue" Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); }
Let's take a look at the java layer where sending messages ultimately calls the enqueueMessage function
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
Finally, in enqueueMessage, add the message to the message queue and then call the nativeWake function in C layer if necessary
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use." } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException(" msg.target + " sending message to a Handler on a dead thread" Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
Let's take a look at this native method, which also calls the Looper's wake function at the end
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } void NativeMessageQueue::wake() { mLooper->wake(); }
The wake function of the Looper class simply writes some content to mWakeEventfd. This fd is just a notification, similar to a pipe, and it will eventually wake epoll_wait, so the thread does not block and continues to send C layer messages first, then handle the events added previously, and then process the messages from the Java layer.
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
2.4 C layer message sending
Messages can also be sent at the C layer, mainly by calling the Looper's sendMessageAtTime function, which has a handler as a callback. We place the message in mMessageEnvelopes.
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { #if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // acquire lock AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i,) 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // release lock // Wake the poll loop only when we enqueue a new message at the head. if (i == 0) { wake(); } }
When in pollOnce, after epoll_wait, it will iterate through the messages in mMessageEnvelopes and then call the handleMessage function of its handler
while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce" - 发送消息:handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // 释放处理器 mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // 队列头部最后留下的消息决定了下一次唤醒时间。 mNextMessageUptime = messageEnvelope.uptime; break; } }
有一个Looper_test.cpp文件,里面介绍了很多Looper的使用方法,我们来看下
sp<StubMessageHandler> handler = new StubMessageHandler(); mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1)); StubMessageHandler继承MessageHandler就必须实现handleMessage方法 class StubMessageHandler : public MessageHandler { public: Vector<Message> messages; virtual void handleMessage(const Message& message) { messages.push(message); } };
顺便看看Message和MessageHandler类
struct Message { Message() : what(0) { } Message(int what) : what(what) { } /* 消息类型。(解释留给处理程序) */ int what; }; /** * Interface for a Looper message handler. * * The Looper holds a strong reference to the message handler whenever it has * a message to deliver to it. Make sure to call Looper::removeMessages * to remove any pending messages destined for the handler so that the handler * can be destroyed. */ class MessageHandler : public virtual RefBase { protected: virtual ~MessageHandler() { } public: /** * Handles a message. */ virtual void handleMessage(const Message& message) = 0; };
2.5 c layer addFd
We can also add fd to the epoll thread in Looper.cpp's addFd, and we can also handle the corresponding data when fd has data. Let's first look at the addFd function, where we notice that there is a callBack callback
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); } int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback.get(), data); #endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); return -1; } if (ident < 0) { ALOGE("Invalid attempt to set NULL callback with ident < 0."); return -1; } } else { ident = POLL_CALLBACK; } { // acquire lock AutoMutex _l(mLock); Request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1 struct epoll_event eventItem; request.initEventItem(&eventItem); ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, &eventItem);//join epoll if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.add(fd, request);//put into mRequests } else { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, &eventItem);//update if (epollResult < 0) { if (errno == ENOENT) { // tolerate ENOENT because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // a file descriptor with the same number has been created and is now // being registered for the first time. This error may occur naturally // when a callback has the side-effect of closing the file descriptor // before returning and unregistering itself. Callback sequence number // checks further ensure that the race is benign. // // Unfortunately due to kernel limitations we need to rebuild the epoll // set from scratch because it may contain an old file handle that we are // now unable to remove since its file descriptor is no longer valid. // No such problem would have occurred if we were using the poll system // call instead, but that approach carries others disadvantages. #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor " "being recycled, falling back on EPOLL_CTL_ADD, errno=%d", this, errno); #endif epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying or adding epoll events for fd %d, errno=%d", fd, errno); return -1; } scheduleEpollRebuildLocked(); } else { ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1; }
In the pollOnce function, we first find the matching fd in mRequests, then create a new Response in pushResponse, and then match the Response with the Request.
} else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } }
Below, we will iterate over the mResponses and then call the callback in its request.
for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce" - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Promptly clear the callback reference in the response structure because we // we will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } }
Similarly, let's see how Looper_test.cpp uses it?
Pipe pipe; StubCallbackHandler handler(true); handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
Let's take a look at the setCallback function of the handler
class CallbackHandler { public: void setCallback(const sp<Looper>& looper, int fd, int events) { looper->addFd(fd, 0, events, staticHandler, this);//It is calling the looper's addFd function and the callback } protected: virtual ~CallbackHandler() { } virtual int handler(int fd, int events) = 0; private: static int staticHandler(int fd, int events, void* data) {//This is the callback function return static_cast<CallbackHandler*>(data)->handler(fd, events); } }; class StubCallbackHandler : public CallbackHandler { public: int nextResult; int callbackCount; int fd; int events; StubCallbackHandler(int nextResult) : nextResult(nextResult), callbackCount(0), fd(-1), events(-1) { } protected: virtual int handler(int fd, int events) {//This is called back to here through the callback function callbackCount += 1; this->fd = fd; this->events = events; return nextResult; } };
Let's combine Looper's addFd to see, when callback is present, we create a new SimpleLooperCallback
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); }
Here, Looper_callbackFunc is a typedef
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);
Let's take a look at SimpleLooperCallback again
class SimpleLooperCallback : public LooperCallback { protected: virtual ~SimpleLooperCallback(); public: SimpleLooperCallback(Looper_callbackFunc callback); virtual int handleEvent(int fd, int events, void* data); private: Looper_callbackFunc mCallback; };SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) : mCallback(callback) { } SimpleLooperCallback::~SimpleLooperCallback() { } int SimpleLooperCallback::handleEvent(int fd, int events, void* data) { return mCallback(fd, events, data); }
Finally, we call the callback->handleEvent(fd, events, data), and callback is SimpleLooperCallback, here the data, which was passed in before, is the pointer of CallbackHandler
Therefore, in the end, it is the staticHandler that is called, and data->handler, which is this->handler, last is the virtual function, which calls the handler function of StubCallbackHandler.
Of course, we can also use the second addFd function directly, and of course we need to define a class ourselves to implement the LooperCallBack class, which makes it much simpler.
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
2.6 addFd in the Java layer
I always thought that addFd could only be added in the Looper at the C layer, but it turns out that this function is also implemented in the Java layer through JNI.
We can implement this function by adding a onFileDescriptorEventListener in the MessageQueue
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd, @OnFileDescriptorEventListener.Events int events, @NonNull OnFileDescriptorEventListener listener) { if (fd == null) { throw new IllegalArgumentException("fd must not be null"); } if (listener == null) { throw new IllegalArgumentException("listener must not be null"); } synchronized (this) { updateOnFileDescriptorEventListenerLocked(fd, events, listener); } }
Let's take a look at the OnFileDescriptorEventListener callback again
public interface OnFileDescriptorEventListener { public static final int EVENT_INPUT = 1 << 0; public static final int EVENT_OUTPUT = 1 << 1; public static final int EVENT_ERROR = 1 << 2; /** @hide */ @Retention(RetentionPolicy.SOURCE) @IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR}) public @interface Events {} @Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events); }
then the updateOnFileDescriptorEventListenerLocked function was called
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events,) OnFileDescriptorEventListener listener) { final int fdNum = fd.getInt$(); int index = -1; FileDescriptorRecord record = null; if (mFileDescriptorRecords != null) { index = mFileDescriptorRecords.indexOfKey(fdNum); if (index >= 0) { record = mFileDescriptorRecords.valueAt(index); if (record != null && record.mEvents == events) { return; } } } if (events != 0) { events |= OnFileDescriptorEventListener.EVENT_ERROR; if (record == null) { if (mFileDescriptorRecords == null) { mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>(); } record = new FileDescriptorRecord(fd, events, listener);//fd is stored in the FileDescriptorRecord object mFileDescriptorRecords.put(fdNum, record);//mFileDescriptorRecords are then stored in } else { record.mListener = listener; record.mEvents = events; record.mSeq += 1; } nativeSetFileDescriptorEvents(mPtr, fdNum, events);//Call the native function } else if (record != null) { record.mEvents = 0; mFileDescriptorRecords.removeAt(index); } }
native finally called the setFileDescriptorEvents function of NativeMessageQueue
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->setFileDescriptorEvents(fd, events); }
setFileDescriptorEvents函数,这个addFd就是调用的第二个addFd,因此我们可以肯定NativeMessageQueue继承了LooperCallback
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) { if (events) { int looperEvents = 0; if (events & CALLBACK_EVENT_INPUT) { looperEvents |= Looper::EVENT_INPUT; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, reinterpret_cast<void*>(events)); } else { mLooper->removeFd(fd); } }
果然是,需要实现handleEvent函数
class NativeMessageQueue : public MessageQueue, public LooperCallback { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis); void wake(); void setFileDescriptorEvents(int fd, int events); virtual int handleEvent(int fd, int events, void* data);
handleEvent is called after epoll_wait in looper, when the fd we added has data, this function is called
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) { int events = 0; if (looperEvents & Looper::EVENT_INPUT) { events |= CALLBACK_EVENT_INPUT; } if (looperEvents & Looper::EVENT_OUTPUT) { events |= CALLBACK_EVENT_OUTPUT; } if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { events |= CALLBACK_EVENT_ERROR; } int oldWatchedEvents = reinterpret_cast<intptr_t>(data); int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events); //call the callback if (!newWatchedEvents) { return 0; // unregister the fd } if (newWatchedEvents != oldWatchedEvents) { setFileDescriptorEvents(fd, newWatchedEvents); } return 1; }
Finally, the dispatchEvents in java's MessageQueue is called back from the JNI layer, and then the previously registered callback function is called
// Called from native code. private int dispatchEvents(int fd, int events) { // Get the file descriptor record and any state that might change. final FileDescriptorRecord record; final int oldWatchedEvents; final OnFileDescriptorEventListener listener; final int seq; synchronized (this) { record = mFileDescriptorRecords.get(fd);//Obtain FileDescriptorRecord through fd if (record == null) { return 0; // spurious, no listener registered } oldWatchedEvents = record.mEvents; events &= oldWatchedEvents; // filter events based on current watched set if (events == 0) { return oldWatchedEvents; // spurious, watched events changed } listener = record.mListener; seq = record.mSeq; } // Invoke the listener outside of the lock. int newWatchedEvents = listener.onFileDescriptorEvents(//listener callback record.mDescriptor, events); if (newWatchedEvents != 0) { newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR; } // Update the file descriptor record if the listener changed the set of // events to watch and the listener itself hasn't been updated since. if (newWatchedEvents != oldWatchedEvents) { synchronized (this) { int index = mFileDescriptorRecords.indexOfKey(fd); if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record && record.mSeq == seq) { record.mEvents = newWatchedEvents; if (newWatchedEvents == 0) { mFileDescriptorRecords.removeAt(index); } } } } // Return the new set of events to watch for native code to take care of. return newWatchedEvents; }
That's all for this article. Hope it will be helpful to everyone's learning and also hope everyone will support the Yelling Tutorial more.
Declaration: The content of this article is from the network, and the copyright belongs to the original author. The content is contributed and uploaded by Internet users spontaneously. This website does not own the copyright, has not been manually edited, and does not assume relevant legal liability. If you find any suspected copyright content, please send an email to: notice#w3Please report via email to codebox.com (replace # with @ when sending an email) and provide relevant evidence. Once verified, this site will immediately delete the suspected infringing content.