从libuv源码看清nodejs事件循环几个钻牛角尖的问题
前言
本文是在开始学习nodejs事件循环时,结合官方文档和其他资料来解答自己理解不够清晰的问题
1.poll阶段不阻塞(阻塞时间timeout为0)、无限阻塞(阻塞时间timout为-1),到底会不会执行poll回调队列的回调函数
1.1 I/O是什么,文件描述符又是什么?
【从libuv源码看清nodejs事件循环几个钻牛角尖的问题】I/O,就是输入(input)和输出(output)的简写。Linux系统中,把一切都看做是文件。文件(常规文件、socket、FIFO、管道、终端……)就是一串二进制流,当信息交换中,我们对这些流进行数据的收发操作,就是I/O操作。当进程打开现有的文件或者创建新文件时,内核向进程返回一个文件操作符。文件操作符是一个索引,它就是一个整数,指向系统级文件描述表,它包含了文件操作、文件类型、访问权限等等信息。所有执行I/O操作的系统调用都会通过文件描述符。
文章图片
1.2 epoll是什么
epoll是Linux内核的可扩展I/O事件通知机制。在浏览器环境,当想监听鼠标事件时,我们会element.addEventListener('click', cbFn)
,这就是浏览器的事件通知机制。而类似地,libuv调用epoll相关的api来实现I/O事件通知。Observer(观察者)注册到被观察者(Subject),当被观察者(Subject)发生某种变化,会通知已注册的Observer(观察者)执行回调。
1.3 epoll工作流程
epoll有三个步骤:
- epoll_create,在epoll文件系统建立了个file节点,并开辟epoll自己的内核高速cache区,建立红黑树,分配好想要的size的内存对象,建立一个list链表,用于存储准备就绪的事件。
- epoll_ctl,把要监听的文件放到对应的红黑树上,给内核中断处理程序注册一个回调函数,通知内核,如果这个句柄的数据到了,就把它放到就绪列表。
- epoll_wait,观察就绪列表里面有没有数据,并进行提取和清空就绪列表。
void uv__io_poll(uv_loop_t* loop, int timeout) {
// ...
// 如果没有任何观察者,直接返回
if (loop->nfds == 0) {
assert(QUEUE_EMPTY(&loop->watcher_queue));
return;
}
memset(&e, 0, sizeof(e));
// 向epoll系统注册所有I/O观察者
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
// 获取队列头部,并将队列从loop->watcher_queue移除
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
// 获取I/O观察者结构
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);
e.events = w->pevents;
e.data.fd = w->fd;
if (w->events == 0)
op = EPOLL_CTL_ADD;
else
op = EPOLL_CTL_MOD;
// epoll_ctl操作,向epoll注册文件描述符及需要监控的I/O事件
if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();
assert(op == EPOLL_CTL_ADD);
// loop->backend_fd通过epoll_create创建
if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
abort();
}
w->events = w->pevents;
}
// ...
// 记录当前时间,以便计算到达时间之后跳出下面的循环
base = loop->time;
// count减少到0,下面的循环跳出
count = 48;
/* Benchmarks suggest this gives the best throughput. */
real_timeout = timeout;
/*
进入epoll_pwait轮询I/O事件
以下循环主要由timeout和count控制是否跳出,符合整个事件循环
*/
for (;
;
) {
// ...
// nfds表示产生I/O事件的文件描述符的数量,0为没有事件发生,可能因为超时时间到了,或者timeout=0
// events保存了从内核得到的事件集合
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
psigset);
// ...
// 没有I/O事件
if (nfds == 0) {
// ...
// 如果timeout为-1则继续循环
if (timeout == -1)
continue;
// 如果timeout为0函数直接返回
if (timeout == 0)
return;
// 更新下次epoll_pwait的timeout时间
goto update_timeout;
}
// epoll_pwait返回错误
if (nfds == -1) {
if (errno != EINTR)
abort();
// 如果timeout为-1则继续循环
if (timeout == -1)
continue;
// 如果timeout为0函数直接返回
if (timeout == 0)
return;
// 更新下次epoll_pwait的timeout时间
goto update_timeout;
}
// ...
// 获取I/O观察者,调用关联的回调函数
for (i = 0;
i < nfds;
i++) {
pe = events + i;
fd = pe->data.fd;
// ...
// 如果存在有效事件
if (pe->events != 0) {
if (w == &loop->signal_IOWatcher)
have_signals = 1;
else
// 执行回调
w->cb(loop, w, pe->events);
nevents++;
}
}
// ...
if (nevents != 0) {
// 如果所有文件描述符上都有事件产生,且count不为0,再循环一次
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}
// 如果timeout为0函数直接返回
if (timeout == 0)
return;
// 如果timeout为-1则继续循环
if (timeout == -1)
continue;
// 重新计算timeout
update_timeout:
assert(timeout > 0);
real_timeout -= (loop->time - base);
if (real_timeout <= 0)
return;
// 剩余timeout
timeout = real_timeout;
}
}
- epoll注册I/O观察者
- 调用epoll_ctl,注册文件描述符以及需要监控的I/O事件
- 进入循环,调用epoll_pwait轮询I/O事件
3.1 如果没有I/O事件,timeout为0,则直接退出轮询,timeout为-1,则继续轮询
3.2 如果epoll_pwait返回错误,timeout为0,则直接退出轮询,timeout为-1,则继续轮询
3.3 有I/O事件,则调用关联的回调函数
3.4 如果timeout为0,则直接退出轮询
3.5 如果timeout为-1,则继续轮询
2.pending callbacks在什么时候注册回调队列的
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;
if (QUEUE_EMPTY(&loop->pending_queue))
return 0;
QUEUE_MOVE(&loop->pending_queue, &pq);
while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}return 1;
}
该函数遍历
loop->pending_queue
队列节点,取得I/O观察者后调用cb。经搜索,只有uv__io_feed
中存在向loop->pending_queue
队列插入节点的代码,如下void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
if (QUEUE_EMPTY(&w->pending_queue))
QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}
继续搜索
uv__io_feed
,调用的地方如下// src/unix/pipe.c
void uv_pipe_connect(uv_connect_t* req,
uv_pipe_t* handle,
const char* name,
uv_connect_cb cb) {
// ...
if (err)
uv__io_feed(handle->loop, &handle->io_watcher);
}
// src/unix/stream.c
static void uv__write_req_finish(uv_write_t* req) {
// ...
uv__io_feed(stream->loop, &stream->io_watcher);
}
// src/unix/tpc.c
int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
// ...
if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->io_watcher);
// ...
}
还有三处在
src/unix/udp.c
处调用所以,pending callbacks阶段,分别在以下场景注册回调:
- pipe连接出错时
- stream流写请求完成时
- tcp连接有延迟错误时
- udp的几个场景
timer阶段源码解读可以看这里:传送门
TL; DR; 流程如下:
- setTimeout/setInterval是通过内置类Timeout实现的,它的时间阈值为1 ~ 231-1 ms,且为整数。所以
setTimeout(callback, 0)
会转换为setTimeout(callback, 1)
- 进入tick之后,会获取这一tick开始的时间,通过
uv__hrtime
函数调用系统时间,过程中可能会受到其他应用的影响 - libuv所有计时器都是以执行时间节点构成的二叉最小堆结构来存储。二叉最小堆,特点是父节点始终比子节点小,所以根节点是最小的
- 计时器回调的执行时间节点=注册回调时的tick开始时间time+计时器阈值timeout
- 当
二叉最小堆的根节点计时器回调的执行时间节点 <= 当前时间循环tick的开始时间
,表示至少有一个过期的定时器,循环迭代二叉最小堆的根节点,并调用该计时器所应的回调函数。 - 当
二叉最小堆的根节点计时器回调的执行时间节点 > 当前时间循环tick的开始时间
,表示还没有到执行时机,根据二叉最小堆的特点,根节点的时间都不能满足执行时机的话,那么后面的节点也没有过期。此时,退出timer阶段的回调函数执行,进入下一个阶段 - 执行pending callbacks、idel、prepare的回调函数
- 计算poll阻塞当前tick的时间p,如果pending callbacks、idel、close callbacks回调队列非空,则为0,尽快进入下个tick执行对应的回调;如果有超时的计时器,则为0,尽快进入下个tick执行超时计时器的回调;如果有未超时的计时器,则
阻塞时间 = 二叉最小堆的根节点计时器回调的执行时间节点 - 当前时间循环tick的开始时间
;如果没有计时器,则为-1,无限阻塞 - 执行check、close callbacks的回调函数
- 系统时间调用,过程中可能会受到其他应用的影响
- poll阻塞的时候线程会挂起,CPU会调度去做其他事,CPU接回来处理的时间不可控制
- 各阶段回调执行时间不可控制
Reference
epoll的作用和原理介绍
从 libuv 看 nodejs 事件循环
libuv 源码分析(五)IO 观察者(io_watcher)
文件描述符(File Descriptor)简介
I/O的内核原理与5种I/O模型
推荐阅读
- Docker应用:容器间通信与Mariadb数据库主从复制
- 一个人的碎碎念
- 我从来不做坏事
- 从蓦然回首到花开在眼前,都是为了更好的明天。
- 西湖游
- 改变自己,先从自我反思开始
- leetcode|leetcode 92. 反转链表 II
- 从我的第一张健身卡谈传统健身房
- 自媒体形势分析
- 操作系统|[译]从内部了解现代浏览器(1)