打算输出一系列 Netty
源码分析与实践的文章,也作为后端开发学习过程中的沉淀。写作风格会遵循目标导向,关注核心,抽离出知识的Pattern,无价值细节决不花时间。此文章为第一篇,从操作系统底层的IO讲起,为 Netty
的出场做下知识准备。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。读写文件也需要使用文件描述符来指定待读写的文件。
根据UNIX网络编程对I/O模型的分类,UNIX提供了5种IO模型:
在进程空间中调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间会一直被阻塞
recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误, 一般都对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来。
将一个或者多个fd通过select/poll调用, 然后阻塞在select上,一个进程用来监控多个fd,即所谓的I/O复用,细节后面会描述。
首先开启套接字信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数并立即返回(非阻塞)。当数据就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据。
告知内核启动某个操作, 内核会将数据从内核态自己copy到用户态缓冲区,然后通知我们。其和信号驱动模型的主要区别是: 信号驱动I/O由内核通知我们何时可以开始一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成。
Linux通过 socket
的睡眠队列( sleep_list
)来管理所有等待 socket
的某个事件的进程( task
), select
、 poll
、 epoll_wait
函数操作会陷入内核,判断监控的socket是否有关心的事件发生了,如果没,则为当前 task
构建一个 wait_entry
节点,然后插入到每个socket的 sleep_list
里,直到超时或事件发生,同时通过 wakeup
机制来异步唤醒整个睡眠队列上等待事件的 task
,通知 task
相关事件发生,每一个 sleep_list
上的 wait_entry
都拥有一个 callback
, wakeup
逻辑在唤醒睡眠队列时,会遍历该队列链表上的每一个 wait_entry
,直到完成队列的遍历或遇到某个 wait_entry
节点是排他的才停止,调用每一个 wait_entry
的 callback
,并将当前 task
的 wait_entry
节点从socket的 sleep_list
中删除。
select
是一种同步IO,函数签名如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout); 复制代码
使用示例:
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <wait.h> #include <signal.h> #include <errno.h> #include <sys/select.h> #include <sys/time.h> #include <unistd.h> #define MAXBUF 256 void child_process(void) { sleep(2); char msg[MAXBUF]; struct sockaddr_in addr = {0}; int n, sockfd,num=1; srandom(getpid()); /* Create socket and connect to server */ sockfd = socket(AF_INET, SOCK_STREAM, 0); addr.sin_family = AF_INET; addr.sin_port = htons(2000); addr.sin_addr.s_addr = inet_addr("127.0.0.1"); connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)); printf("child {%d} connected /n", getpid()); while(1){ int sl = (random() % 10 ) + 1; num++; sleep(sl); sprintf (msg, "Test message %d from client %d", num, getpid()); n = write(sockfd, msg, strlen(msg)); /* Send message */ } } int main() { char buffer[MAXBUF]; int fds[5]; struct sockaddr_in addr; struct sockaddr_in client; int addrlen, n,i,max=0;; int sockfd, commfd; fd_set rset; //创建了5个子进程, 每个进程都向server发送了数据 for(i=0;i<5;i++) { if(fork() == 0) { child_process(); exit(0); } } sockfd = socket(AF_INET, SOCK_STREAM, 0); memset(&addr, 0, sizeof (addr)); addr.sin_family = AF_INET; addr.sin_port = htons(2000); addr.sin_addr.s_addr = INADDR_ANY; bind(sockfd,(struct sockaddr*)&addr ,sizeof(addr)); listen (sockfd, 5); ///告诉内核服务端的一些信息 连接队列个数为5,大于5个socket连接,会出现延时 for (i=0;i<5;i++) { memset(&client, 0, sizeof (client)); addrlen = sizeof(client); fds[i] = accept(sockfd,(struct sockaddr*)&client, &addrlen); //保留最大的 文件描述符值 if(fds[i] > max) max = fds[i]; } while(1){ //将文件描述符数组每一位全都置为0 FD_ZERO(&rset); //每次while循环都要重新设置要监控的socket for (i = 0; i< 5; i++ ) { FD_SET(fds[i],&rset); } puts("round again"); //一直阻塞直到有读事件已ready select(max+1, &rset, NULL, NULL, NULL); for(i=0;i<5;i++) { //循环判断是哪个socket可读 if (FD_ISSET(fds[i], &rset)){ memset(buffer,0,MAXBUF); read(fds[i], buffer, MAXBUF); puts(buffer); } } } return 0; } 复制代码
为了要高效的处理网络IO数据,不可能为每个 socket
创建一个进程 task
,进程创建是一种高昂的性能损耗,所以采用一个 task
来监控多个 socket
,但这一个 task
不可能去阻塞式的监控某一个socket的事件发生,我们应该block在关心的N个socket中一个或多个socket有数据可读的事件,意味着当block解除的时候,我们一定可以找到一个或多个socket上有可读的数据(至少一个可读), select
将这个 task
放到每个 socket的 sleep_list
,等待任意一个socket可读事件发生而被唤醒,当task被唤醒的时候,其 callback
里面应该有个逻辑去检查具体哪些socket可读了。然后把这些事件反馈给用户程序, select
为每个socket引入一个 poll
逻辑,该逻辑用于收集socket发生的事件。对于可读事件来说,简单伪码如下:
private int sk_event; void poll() { //其他逻辑... when (receive queue is not empty) { sk_event |= POLL_IN; } //其他逻辑... } 复制代码
当 receive queue
不为空的时候,我们就给这个socket的 sk_event
添加一个 POLL_IN
事件,用来表示当前这个socket可读。将来task遍历到这个socket,发现其 sk_event
包含 POLL_IN
的时候,就说明这个socket已是可读的。当用户task调用select的时候,select会将需要监控的readfds集合拷贝到内核空间,然后遍历自己监控的socket,挨个调用socket的poll逻辑以便检查该socket是否有可读事件。
遍历完所有的socket后,如果没有任何一个sk可读,那么select会调用schedule,使得task进入睡眠。如果在timeout时间内某个socket上有数据可读了,或者等待timeout了,则调用select的task会被唤醒。唤醒后select就是遍历监控的socket集合,挨个收集可读事件并返回给用户了,相应的伪码如下:
for (socket in readfds) { sk_event.evt = socket.poll(); sk_event.sk = socket; return_event_for_process; } 复制代码
就像示例代码一样while循环内的for循环,在select返回后,task需要遍历已ready的描述符集合,循环的次数就是之前记录的fd值。
poll
实际上在Unix系统是不支持的,不像select使用bitmap集合来存储fd值,它通过一个大小为nfds的pollfd结构来表示需要监控的fd set,函数签名如下:
int poll (struct pollfd *fds, unsigned int nfds, int timeout); 复制代码
pollfd的结构如下, 每个fd都有对应的监听事件events,和就绪返回的事件revents,现在fd的大小是int最大值了。
struct pollfd { int fd; short events; short revents; }; 复制代码
代码示例:
for (i=0;i<5;i++) { memset(&client, 0, sizeof (client)); addrlen = sizeof(client); pollfds[i].fd = accept(sockfd,(struct sockaddr*)&client, &addrlen); pollfds[i].events = POLLIN; } sleep(1); while(1){ puts("round again"); poll(pollfds, 5, 50000); for(i=0;i<5;i++) { if (pollfds[i].revents & POLLIN){ pollfds[i].revents = 0; memset(buffer,0,MAXBUF); read(pollfds[i].fd, buffer, MAXBUF); puts(buffer); } } } 复制代码
细看 select
和 poll
的函数原型,我们会发现,每次调用 select
或 poll
都在重复地准备整个需要监控的fds集合。我们需要监控三个socket,就要准备一个 readfds
,然后新增监控一个socket,就要再准备一个 readfds
(包含旧的和新的socket的 readfds
)。然而对于频繁调用的 select
或 poll
而言,fds集合的变化频率要低得多,我们没必要每次都重新准备整个fds集合。
于是, epoll
引入了 epoll_ctl
系统调用,将高频调用的 epoll_wait
和低频的 epoll_ctl
隔离开。 epoll_ctl
是 epoll
的事件注册函数,它不同与 select()
是在监听事件时,告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。到了有变化才变更,将 select
或 poll
高频、大块内存拷贝变成 epoll_ctl
的低频、小块内存的拷贝,避免了大量的内存拷贝。
同时,对于高频 epoll_wait
的可读就绪的fd集合返回的拷贝问题, epoll
通过内核与用户空间 mmap
同一块内存来解决。 mmap
将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址(不管是用户空间还是内核空间都是虚拟地址,最终要通过地址映射映射到物理地址),使得这块物理内存对内核和对用户均可见,减少用户态和内核态之间的数据交换。
另外, epoll
通过 epoll_ctl
来对监控的fds集合来进行增、删、改,那么必须涉及到fd的快速查找问题。于是在linux 2.6.8以后的内核中采用了红黑树的结构来组织fds。 示例代码:
struct epoll_event events[5]; int epfd = epoll_create(10); ... ... for (i=0;i<5;i++) { static struct epoll_event ev; memset(&client, 0, sizeof (client)); addrlen = sizeof(client); ev.data.fd = accept(sockfd,(struct sockaddr*)&client, &addrlen); ev.events = EPOLLIN; epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); } while(1){ puts("round again"); nfds = epoll_wait(epfd, events, 5, 10000); for(i=0;i<nfds;i++) { memset(buffer,0,MAXBUF); read(events[i].data.fd, buffer, MAXBUF); puts(buffer); } } 复制代码
遍历就绪的fds集合通过上面的socket的睡眠队列唤醒逻辑我们知道,socket唤醒睡眠在其睡眠队列的 wait_entry
的时候会调用 wait_entry
的回调函数 callback
,并且,我们可以在 callback
中做任何事情。为了做到只遍历就绪的fd,我们需要有个地方来组织那些已经就绪的fd。
为此, epoll
引入了一个中间层,一个双向链表 ready_list
,一个单独的睡眠队列 single_epoll_wait_list
,并且,与 select
或 poll
不同的是, epoll
的task不需要同时插入到多路复用的socket集合的所有睡眠队列中,相反task只是插入到中间层的 epoll
的单独睡眠队列中(即 single_epoll_wait_list
),task睡眠在 epoll
的单独队列上,等待事件的发生。同时,引入一个中间的 wait_entry_sk
,它与某个socket密切相关, wait_entry_sk
睡眠在socket的睡眠队列上,其 callback
函数逻辑是将当前socket排入到 epoll
的 ready_list
中,并唤醒 epoll
的 single_epoll_wait_list
。而 single_epoll_wait_list
上睡眠的task的回调函数就明朗了:遍历 ready_list
上的所有socket,挨个调用socket的 poll
函数收集事件,然后唤醒task从 epoll_wait
返回。
额,epoll模型太常用了,碉碉的。。。。 上一张大佬画的图: