epoll
内部监听的事件以
epitem
组织为一个红黑树注册和事件添加到epoll的rdlist都是通过调用f_op->poll
注册时是通过将epoll添加到文件的wait_queue_head中
rdlist上的epitem通常是由被监听文件自己挂上去的
data struct
fs/eventpoll.c
epoll_create (SYSCALL_DEFINE1(epoll_create) -> do_epoll_create
1 | // 有 strc |
- 相关的数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48struct wait_queue_head {
spinlock_t lock;
struct list_head head;
};
typedef struct wait_queue_head wait_queue_head_t;
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
struct optimistic_spin_queue osq; // 优化 mutex 的性能
struct list_head wait_list;
};
/*
* Leftmost-cached rbtrees.
*
* We do not cache the rightmost node based on footprint
* size vs number of potential users that could benefit
* from O(1) rb_last(). Just not worth it, users that want
* this feature can always implement the logic explicitly.
* Furthermore, users that want to cache both pointers may
* find it a bit asymmetric, but that's ok.
*/
struct rb_root_cached {
struct rb_root rb_root;
struct rb_node *rb_leftmost;
};
struct file {
struct path f_path;
struct inode *f_inode; /* cached value */
const struct file_operations *f_op;
spinlock_t f_lock;
// 对于epollfd指向 struct event_poll
void *private_data;
/* Used by fs/eventpoll.c to link all the hooks to this file */
struct list_head f_ep_links; // 所有监听此文件的 epitem 的链表
struct list_head f_tfile_llink;
// ....
};
struct callback_head {
struct callback_head *next;
void (*func)(struct callback_head *head);
} __attribute__((aligned(sizeof(void *))));
epoll_create
glibc
的epoll_create
-> 系统调用的epoll_create
->do_epoll_create
ep_alloc(&ep)
创建一个struct eventpoll
即ep
,并初始化fd = get_unused_fd_flags
获取一个可用的fd
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,O_RDWR | (flags & O_CLOEXEC));
- 创建一个名为
[eventpoll]
的匿名文件,文件的fop
设置为eventpoll_fops
(staic
全局变量)file->private_data = ep
fd_install(fd, file);
使task_struct->files->fdt->fd[fd] = file
eventpoll_fops->poll = ep_eventpoll_poll
- poll函数用于驱动提供给应用程序探测设备文件是否可读或可写。当该函数指针为空时表示设备可对文件非阻塞的读写。调用poll函数的进程会
sleep
知道可读或可写。
- poll函数用于驱动提供给应用程序探测设备文件是否可读或可写。当该函数指针为空时表示设备可对文件非阻塞的读写。调用poll函数的进程会
- 创建一个名为
epoll_ctl
根据参数分别调用
ep_insert
,ep_remove
,ep_modify
。
ADD
:epoll_ctl
在ADD
时会调用对监听fd
的f_op->poll
,里面将调用poll_wait
创建一个wait_queue_entry
,内含一个回调函数(在poll_table
中 ),将此wait_queue_entry
挂到被监听fd
的wait_queue_head
上,当设备对此fd
进行读写时会唤醒被监听fd
的wait_queue_head
上监听对应事件的进行,调用注册进来的回调函数。epoll
使用红黑树管理它监听的所有文件,它为每个监听的文件创建一个epitem
,加入到eventpoll
的rbr
字段(红黑树root)中。- 如果要监听的是一个非
epoll
文件,内核会调用epfd
的f_op->poll
回调,内核创建一个epoll_entry
,通过调用该回调函数,将epoll_entry
加入到文件内部的wait_queue_head
中,以便文件有事件到达时通过该epoll_entry
,将对应的epitem
加入到eventpoll
的rdlist
中。 - 如果要监听的是一个
epoll
文件,epfd2
对应的eventpoll
结构体中有一个wait_queue_head——poll_wait
。因此,内核同样会创建一个epoll_entry
,然后将该epoll_entry
加入到poll_wait
中,以便epfd2通
知epfd1
。
epoll_wait
- 如果
eventpoll
的rdlist
中没有epollitem
,则说明监听的文件没有一个事件到达,这时需要将当前文件陷入阻塞(睡在eventpoll->wq
上),等待有任意事件到达。 - 如果有事件到达,则将
rdlist
中所有的epollitem
一口气取下来,放到一个临时链表中(避免争用rdlist
)。对于临时链表中的每一项epollitem
:- 将
epollitem
从链表中取下 - 调用
ep_item_poll
- 如果
epitem
对应的是一个非epoll
文件,调用它的f_op->poll
回调,获取该文件的事件,与epitem
记录的要等待的事件取与。若取与得到的值不为0,- 说明确实有要等待的事件发生,首先将该事件拷贝到用户缓冲区
- 然后,如果是
EPOLLONESHOT
,则在拷贝完事件后去掉对EPOLLIN/EPOLLOUT
等的监听,只保留EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE
四个标志位的值。 - 然后,如果是水平触发模式,则将该
epollitem
再放到eventpoll
的尾部。
- 如果
epitem
对应的是一个epoll
文件(称为epfd2
),则以epfd2
为参数,递归进入第二项判断rdlist
。
- 如果
- 如果从临时链表中取出了用户要求的最大事件数,则停止循环
- 将
- 现在,将临时链表中剩下的没有处理完的那些事件重新加到
rdlist
中。 - 如果
rdlist
中有还没有处理完的数据,唤醒睡在eventpoll->wq
上的进程(处于epoll_wait
阻塞的进程),并通知eventpoll->poll_wait
上的进程(比如epfd1
监听epfd2
,那么epfd2
会通过自己的poll_wait
通知epfd1
,这会导致epfd1
的rdlist
中出现一项关于epfd2
的epollitem
)。 - 如果发现没有拿到任何事件(到达的事件和想要的事件不同,或者被其他进程提前一步拿走了),且还没有超过
epoll_wait
限制的时间,则跳转到第一步重新开始。 - 通过上面的流程描述,可以知道,水平触发模式的文件,在缓冲区有数据时,会一直待在
rdlist
中,直到它的缓冲区数据为空。 - 需要注意的是,
rdlist
上的epitem
通常是由被监听文件自己挂上去的。比如,当监听一个管道时,管道文件自己有一个wait_queue_head
,当我们将管道文件通过EPOLL_ADD
加入到epoll
中时,会调用一次管道文件的poll
回调。poll回调通过调用poll_wait
函数,将我们的epoll
注册到它自己的wait_queue_head
中。然后,当管道文件发现自己缓冲区可读可写时,它会通过自己的wait_queue_head
通知epoll
,将epitem
挂在epoll
的rdlist
里面。这样,当epoll_wait
被调用时,就会发现自己的rdlist
里面有数据,然后,它会遍历rdlist
中的每一项,对每一项调用poll
回调,这样,管道的poll
回调又一次被调用了。不过,这一次管道并不会将epoll
注册到自己的wait_queue_head
中去,它仅仅判断自己的缓冲区的可读可写事件,返回对于的POLLIN/POLLOUT
,然后,epoll_wait
根据返回的事件,与要监听的事件做与操作,来决定是否拷贝该事件。
select
1 | int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); |
- 调用过程:
select -> kern_select -> core_sys_select
- 对于每个
fd
,如果在读/写/异常fd_set
中的任意一个fd_set
中置位了- 调用
fd
对于file
的f_op->poll
回调- 如果
poll_table
中有回调函数,它将负责创建一个wait_queue_entry
,并将该entry
挂在file
提供的wait_queue_head
中。然后,poll
回调函数还将返回文件的状态(POLLIN/POLLOUT/…
) - 如果
poll_table
没有回调函数,则poll
回调仅仅返回文件的状态
- 如果
- 根据
poll
回调返回的文件状态,判断返回状态是否为想要监听的状态。比如返回了POLLIN
,且fd
恰好在读的fd_set
中,则在返回给用户的读fd_set
中,标记该位,然后将poll_table
的回调置为NULL
。这一步很重要,因为它会导致后续对fd的f_op->poll
回调不再挂任何wait_queue_entry
到剩下的fd
的wait_queue_head
中。
- 调用
- 当对于所有的
fd
都判断完毕后- 如果得到了想要监听的事件,那么就取下那些之前遍历时挂上去的
wait_queue_entry
,然后将对应的事件拷贝给用户。 - 如果超时了,那么操作和上面一样
- 否则,说明没有任何想要的事件达到,而且我们已经将
wait_queue_entry
挂在了每个想要监听的文件的wait_queue_head
上。现在,根据select
传递进来的超时时间,陷入一定时间的睡眠,等待被超时唤醒,或者被监听文件唤醒,然后重复第一步。
- 如果得到了想要监听的事件,那么就取下那些之前遍历时挂上去的
- 对于每个
struct timeval *timeout
内的内容会经历从用户态拷贝到内核再拷贝回来的过程。select
只有一个系统调用,而不像epoll
拥有三个系统调用。因此,select
对事件的监听没有像epoll
那样,做到常驻内核。- 在使用
select
时,需要传递三个fd_set
,fd_set
实际上是关于fd
的位图,所有需要监听的fd
对于的位被置1。三个fd_set
分别对应读/写/异常事件的监听
select 和 epoll 对比
select
比epoll
慢的主要原因:- 对
fd_set
的拷贝开销:三个fd_set
,不管结果成功与否,都要进行拷贝。 - 没有常驻内核:每次调用
select
都需要重新挂wait_queue_entry
,离开时还需要给取下来。
- 对
- 另外,由于
select
是位图,所以允许监听的最大fd
数量是有限制的,而epoll
使用红黑树,它可以监听更多的fd
。
problems
spinlock_t
在4.2
之后内部实现改为了qspinlock
struct mutex
的trylock
调用了atomic_long_try_cmpxchg_acquire
+ `struct optimistic_spin_queue osq;`的作用
rcu
机制