epoll源码

epoll

内部监听的事件以epitem组织为一个红黑树

注册和事件添加到epoll的rdlist都是通过调用f_op->poll

注册时是通过将epoll添加到文件的wait_queue_head中

rdlist上的epitem通常是由被监听文件自己挂上去的

data struct

  • fs/eventpoll.c
  • epoll_create (SYSCALL_DEFINE1(epoll_create) -> do_epoll_create
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 有 strc
struct eventpoll {
spinlock_t lock; // 保护对eventpoll的访问
struct mutex mtx; // ctl 和 事件收集时用到

/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;

struct file *file;

struct list_head rdllist; // 所有 ready 的 fd
struct rb_root_cached rbr; // 所有 add 进来的 fd -> epitem
struct epitem *ovflist; // 单项链表存储所有 epitem
};

struct epoll_event {
__poll_t events;
__u64 data;
} EPOLL_PACKED;

struct epoll_filefd {
struct file *file;
int fd;
} __packed;

struct eppoll_entry {
struct list_head llink; // 指向 epitem 中的 pwqlist,从而将 epitem 连成链表
struct epitem *base; // 指向所属的 epitem
wait_queue_entry_t wait; // 要加入被监听文件的wait_queue的回调函数
wait_queue_head_t *whead;
};

struct epitem {
struct eventpoll *ep; // 指向所属的 eventpoll
/* The file descriptor information this item refers to */
struct epoll_filefd ffd; // epitem 对应的 struct file 和 fd
struct list_head fllink; // epitem 中的 struct file 通过这个组织成一个链表
struct list_head pwqlist; // poll wait queues,指向 epoll_entry 中的 llink
struct epoll_event event;
};
  • 相关的数据结构
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    struct wait_queue_head {
    spinlock_t lock;
    struct list_head head;
    };
    typedef struct wait_queue_head wait_queue_head_t;

    struct mutex {
    atomic_long_t owner;
    spinlock_t wait_lock;
    struct optimistic_spin_queue osq; // 优化 mutex 的性能
    struct list_head wait_list;
    };

    /*
    * Leftmost-cached rbtrees.
    *
    * We do not cache the rightmost node based on footprint
    * size vs number of potential users that could benefit
    * from O(1) rb_last(). Just not worth it, users that want
    * this feature can always implement the logic explicitly.
    * Furthermore, users that want to cache both pointers may
    * find it a bit asymmetric, but that's ok.
    */
    struct rb_root_cached {
    struct rb_root rb_root;
    struct rb_node *rb_leftmost;
    };

    struct file {
    struct path f_path;
    struct inode *f_inode; /* cached value */
    const struct file_operations *f_op;
    spinlock_t f_lock;
    // 对于epollfd指向 struct event_poll
    void *private_data;
    #ifdef CONFIG_EPOLL
    /* Used by fs/eventpoll.c to link all the hooks to this file */
    struct list_head f_ep_links; // 所有监听此文件的 epitem 的链表
    struct list_head f_tfile_llink;
    #endif /* #ifdef CONFIG_EPOLL */
    // ....
    };

    struct callback_head {
    struct callback_head *next;
    void (*func)(struct callback_head *head);
    } __attribute__((aligned(sizeof(void *))));
    #define rcu_head callback_head

epoll_create

  • glibcepoll_create -> 系统调用的epoll_create -> do_epoll_create
    • ep_alloc(&ep) 创建一个 struct eventpollep,并初始化
    • fd = get_unused_fd_flags 获取一个可用的fd
    • file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,O_RDWR | (flags & O_CLOEXEC));
      • 创建一个名为[eventpoll]的匿名文件,文件的fop设置为eventpoll_fopsstaic全局变量)
        • file->private_data = ep
        • fd_install(fd, file); 使 task_struct->files->fdt->fd[fd] = file
        • eventpoll_fops->poll = ep_eventpoll_poll
          • poll函数用于驱动提供给应用程序探测设备文件是否可读或可写。当该函数指针为空时表示设备可对文件非阻塞的读写。调用poll函数的进程会sleep知道可读或可写。

epoll_ctl

根据参数分别调用ep_insertep_removeep_modify

  • ADD
    • epoll_ctlADD时会调用对监听fdf_op->poll,里面将调用poll_wait创建一个wait_queue_entry,内含一个回调函数(在poll_table中 ),将此wait_queue_entry挂到被监听fdwait_queue_head上,当设备对此fd进行读写时会唤醒被监听fdwait_queue_head上监听对应事件的进行,调用注册进来的回调函数。
    • epoll使用红黑树管理它监听的所有文件,它为每个监听的文件创建一个epitem,加入到eventpollrbr字段(红黑树root)中。
    • 如果要监听的是一个非epoll文件,内核会调用epfdf_op->poll回调,内核创建一个epoll_entry,通过调用该回调函数,将epoll_entry加入到文件内部的wait_queue_head中,以便文件有事件到达时通过该epoll_entry,将对应的epitem加入到eventpollrdlist中。
    • 如果要监听的是一个epoll文件,epfd2对应的eventpoll结构体中有一个wait_queue_head——poll_wait。因此,内核同样会创建一个epoll_entry,然后将该epoll_entry加入到poll_wait中,以便epfd2通epfd1

epoll_wait

  • 如果eventpollrdlist中没有epollitem,则说明监听的文件没有一个事件到达,这时需要将当前文件陷入阻塞(睡在eventpoll->wq上),等待有任意事件到达。
  • 如果有事件到达,则将rdlist中所有的epollitem一口气取下来,放到一个临时链表中(避免争用rdlist)。对于临时链表中的每一项epollitem
    • epollitem从链表中取下
    • 调用ep_item_poll
      • 如果epitem对应的是一个非epoll文件,调用它的f_op->poll回调,获取该文件的事件,与epitem记录的要等待的事件取与。若取与得到的值不为0,
        • 说明确实有要等待的事件发生,首先将该事件拷贝到用户缓冲区
        • 然后,如果是EPOLLONESHOT,则在拷贝完事件后去掉对EPOLLIN/EPOLLOUT等的监听,只保留EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE四个标志位的值。
        • 然后,如果是水平触发模式,则将该epollitem再放到eventpoll的尾部。
      • 如果epitem对应的是一个epoll文件(称为epfd2),则以epfd2为参数,递归进入第二项判断rdlist
    • 如果从临时链表中取出了用户要求的最大事件数,则停止循环
  • 现在,将临时链表中剩下的没有处理完的那些事件重新加到rdlist中。
  • 如果rdlist中有还没有处理完的数据,唤醒睡在eventpoll->wq上的进程(处于epoll_wait阻塞的进程),并通知eventpoll->poll_wait上的进程(比如epfd1监听epfd2,那么epfd2会通过自己的poll_wait通知epfd1,这会导致epfd1rdlist中出现一项关于epfd2epollitem)。
  • 如果发现没有拿到任何事件(到达的事件和想要的事件不同,或者被其他进程提前一步拿走了),且还没有超过epoll_wait限制的时间,则跳转到第一步重新开始。
  • 通过上面的流程描述,可以知道,水平触发模式的文件,在缓冲区有数据时,会一直待在rdlist中,直到它的缓冲区数据为空。
  • 需要注意的是,rdlist上的epitem通常是由被监听文件自己挂上去的。比如,当监听一个管道时,管道文件自己有一个wait_queue_head,当我们将管道文件通过EPOLL_ADD加入到epoll中时,会调用一次管道文件的poll回调。poll回调通过调用poll_wait函数,将我们的epoll注册到它自己的wait_queue_head中。然后,当管道文件发现自己缓冲区可读可写时,它会通过自己的wait_queue_head通知epoll,将epitem挂在epollrdlist里面。这样,当epoll_wait被调用时,就会发现自己的rdlist里面有数据,然后,它会遍历rdlist中的每一项,对每一项调用poll回调,这样,管道的poll回调又一次被调用了。不过,这一次管道并不会将epoll注册到自己的wait_queue_head中去,它仅仅判断自己的缓冲区的可读可写事件,返回对于的POLLIN/POLLOUT,然后,epoll_wait根据返回的事件,与要监听的事件做与操作,来决定是否拷贝该事件。

select

1
2
3
4
5
6
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
// 默认 #define __FD_SETSIZE 1024
void FD_CLR(int fd, fd_set *set);
int FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);
  • 调用过程:select -> kern_select -> core_sys_select
    • 对于每个fd,如果在读/写/异常fd_set中的任意一个fd_set中置位了
      • 调用fd对于filef_op->poll回调
        • 如果poll_table中有回调函数,它将负责创建一个wait_queue_entry,并将该entry挂在file提供的wait_queue_head中。然后,poll回调函数还将返回文件的状态(POLLIN/POLLOUT/…
        • 如果poll_table没有回调函数,则poll回调仅仅返回文件的状态
      • 根据poll回调返回的文件状态,判断返回状态是否为想要监听的状态。比如返回了POLLIN,且fd恰好在读的fd_set中,则在返回给用户的读fd_set中,标记该位,然后将poll_table的回调置为NULL。这一步很重要,因为它会导致后续对fd的f_op->poll回调不再挂任何wait_queue_entry到剩下的fdwait_queue_head中。
    • 当对于所有的fd都判断完毕后
      • 如果得到了想要监听的事件,那么就取下那些之前遍历时挂上去的wait_queue_entry,然后将对应的事件拷贝给用户。
      • 如果超时了,那么操作和上面一样
      • 否则,说明没有任何想要的事件达到,而且我们已经将wait_queue_entry挂在了每个想要监听的文件的wait_queue_head上。现在,根据select传递进来的超时时间,陷入一定时间的睡眠,等待被超时唤醒,或者被监听文件唤醒,然后重复第一步。
  • struct timeval *timeout内的内容会经历从用户态拷贝到内核再拷贝回来的过程。
  • select只有一个系统调用,而不像epoll拥有三个系统调用。因此,select对事件的监听没有像epoll那样,做到常驻内核。
  • 在使用select时,需要传递三个fd_setfd_set实际上是关于fd的位图,所有需要监听的fd对于的位被置1。三个fd_set分别对应读/写/异常事件的监听

select 和 epoll 对比

  • selectepoll慢的主要原因:
    • fd_set的拷贝开销:三个fd_set,不管结果成功与否,都要进行拷贝。
    • 没有常驻内核:每次调用select都需要重新挂wait_queue_entry,离开时还需要给取下来。
  • 另外,由于select是位图,所以允许监听的最大fd数量是有限制的,而epoll使用红黑树,它可以监听更多的fd

problems

  • spinlock_t4.2之后内部实现改为了qspinlock
  • struct mutextrylock调用了atomic_long_try_cmpxchg_acquire
    + `struct optimistic_spin_queue osq;`的作用
    
  • rcu机制