前言
有天晚上问朋友八股,突然问道epoll,让我解释什么是IO多路复用,我一时也没法解释清楚,归根结底还是自己没去理解,所以写一篇关于IO多路复用的文章来加强理解
IO多路复用
什么是IO多路复用
多路复用
我个人理解是这样的,”复用”代表着重复使用,多路复用的话我可以理解成多条道路重复使用,就像城市A和城市B之间原本有几条公路,但是随着时间发展,政府打算开辟几条新的道路,那么原本的旧路就直接被舍弃了吗?这样就很浪费资源了,那样不如将旧的道路进行休整成新的道路,这样就不会浪费原本的资源了,做到了资源利用最大化
IO多路复用就是将几个进行IO的线程进行复用,而不是说线程进行IO操作之后就销毁,一句话来解释IO多路复用就是:单线程或单进程同时检测多个文件描述符是否可以执行IO操作的能力
为什么会有IO多路复用
这里我们需要介绍两种模型
如果我们的服务器采用单线程的模型,那么就需要不断地while去接受新的网络连接,但是当我们服务器阻塞在recv或者send的时候,就没办法去接受其他的网络连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| (服务端)
socket();
bind();
listen();
while(1) { int fd = accept();
recv(fd) }
|
上面是一个伪代码,但是我们能明显地发现,如果服务器是单线程的情况下,BIO模型就会导致服务器如果阻塞在recv或者send函数的时候,就没办法去调用accept()去接收其他的客户端的连接请求,当然,这样做也是有好处的,这样就不会占用CPU宝贵的时间片,一旦阻塞,CPU就会去执行其他操作,但是缺点也是显而易见的,就是同一时刻只能处理一个操作,效率极低
解决方案
当然,我们会很容易想到,将服务器设置为多线程的情况,专门拿一个线程来进行accept,每次accept一个连接后,就新建一个线程来进行recv或者send
1 2 3 4 5
| while(1) { int fd = accept(); pthread_create(); }
|
但是我们需要知道,虽然线程是轻量级的,但是也是需要消耗资源的,一旦并发量很大,那么就会消耗很大的系统资源,显然这不是我们想要看到的
非阻塞模型的情况下,我们的服务端可以每次accept()一个请求后,就将返回的文件描述符放入一个数组,然后每次都轮询一边数组里面的fd的recv函数,如果没有数据就返回错误
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
SetNonBlock(listen_fd); while(1) { int fd = accept(listen_fd); if(fd!=NULL) { fds.append(fd); SetNonBlock(fd); } else { } for(fd int fds) { if(len=recv(fd)&&len>0) { } else { } } }
|
这样的操作,确实不需要服务器创建多个线程,同时也不会因为recv和send等阻塞函数的影响而导致无法对新的请求进行accept()
但是,这样做会导致有的文件描述符recv是没有数据的,而每次都轮询一遍,等到文件描述符数组很大的时候,就很浪费CPU资源了,假设数组里面有10000个文件描述符,但只有一个文件描述符recv能返回数据,但是我们每次都要查询10000个,这样就很浪费CPU资源了
解决方案
如果我们能够知道,每次是哪一个文件描述符能够读了,也就是这个文件描述符的recv成功了,那么我们就不用每次都把所有数组里的文件描述符查询一遍了,所以为了解决这个问题,科学家们就发明了IO多路复用
IO多路复用有三种:
- select
- poll
- epoll
Select
select是IO多路复用最早的一种方法
他的主旨思想就是:
- 构造一个关于文件描述符的列表,将要监听的文件描述符添加到该列表中
- 调用一个系统函数,监听该列表中的文件描述符,直到这些列表中的一个或者多个文件描述符进行IO操作时,该函数才返回(这个函数是阻塞的,并且函数对文件描述符的检测是由内核来进行的)
- 在返回时,函数会告诉进程有哪些文件描述符要进行IO操作
Select API
在Linux下,包含头文件 <sys/time.h> <sys/types.h> <unistd.h>
1 2 3 4 5 6 7 8 9 10 11 12
| int select (int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
参数: 1. nfds:委托内核检测的最大文件描述符的值+1 2. readfds:要检测的读的文件描述符集合,委托内核检测哪些文件描述符的读的属性,一般检测读操作(检测读缓冲区); 3. writefds:要检测的写的文件描述符集合,委托内核检检测写缓冲区是否可以写数据(不满就可以写) 4. exceptfds:检测发生异常的文件描述符的集合 5. timeout:设置的超时时间(阻塞对应的时间),如果设置为NULL,就表示永久阻塞,直到检测到文件描述符有变化 返回值: 失败: 返回-1 成功: 返回n(n>0),表示集合中有n个文件描述符发生了改变
|
1 2 3 4 5 6
| void FD_CLR(int fd,fd_set *set)
参数: fd: 需要置为0的文件描述符 set: readfds或者writefds或者exceptfds里面的一个集合
|
1 2 3 4
| int FD_ISSET(int fd,fd_set *set)
返回值: fd对应的标志位的值
|
1 2
| void FD_SET(int fd,fd_set *set);
|
1 2
| void FD_ZERO(fd_set *set);
|
select的大概使用流程
- 创建需要监听的类型的文件描述符集合
- 将要监听的文件描述符在对应的文件描述符集合置为1,调用FD_SET函数
- 循环调用select(),每次有文件描述符发生改变都可以通过FD_ISSET来判断当前文件描述符是否发生改变,是的话就进行业务逻辑
Select的缺点
- 每次调用select(),都需要把fd集合从用户态拷贝到内核态,这个开销在fd集合很大的时候会很大
- 每次调用select()都需要在内核遍历所有fd,这个开销在fd集合很大的时候也很大
- 只要有一个文件描述符发生改变,都会将整个集合从内核态拷贝到用户态,而且用户需要自己去遍历整个集合去判断是哪个或者哪几个文件描述符发生了改变,开销都是很大的
- select支持的最大文件描述符为1024,太小了
- 文件描述符集合不能复用,每次用户遍历完都需要重置
服务端采用Select的实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
#include <iostream> using namespace std; #include <unistd.h> #include <arpa/inet.h> #include <sys/time.h> #include <sys/types.h> #include <stdio.h> #include <string.h> int main() { int sockfd=socket(AF_INET,SOCK_STREAM,0);
sockaddr_in addr; addr.sin_port=htons(7777); addr.sin_family=AF_INET; addr.sin_addr.s_addr=INADDR_ANY; int ret=bind(sockfd,(struct sockaddr *)&addr,sizeof(addr)); if(ret==-1) { perror("bind"); } listen(sockfd,5);
fd_set readset,temp; FD_ZERO(&readset); FD_SET(sockfd,&readset); int maxfd=sockfd; while(1) { temp=readset; ret=select(maxfd+1,&temp,NULL,NULL,NULL); if(ret==-1) { perror("select"); break; } else if(ret>0) {
if(FD_ISSET(sockfd,&temp)==1) { sockaddr_in addr2; socklen_t len=sizeof(addr2); int newsockfd=accept(sockfd,(struct sockaddr *)&addr2,&len); char ip[16]; inet_ntop(AF_INET,(void *)&addr2.sin_addr.s_addr,ip,sizeof(ip)); cout<<"客户端的IP地址为:"<<ip<<endl; cout<<"客户端的端口为:"<<ntohs(addr2.sin_port)<<endl;
FD_SET(newsockfd,&readset); maxfd=maxfd>newsockfd?maxfd:newsockfd; } else { int i=0; while(i<=maxfd) { if(i==sockfd) { i++; continue; } else if(FD_ISSET(i,&temp)==1) { char buf[1024]; int len_r=read(i,buf,sizeof(buf)); if(len_r==-1) { perror("read"); exit(0); } else if(len_r==0) { cout<<"客户端断开了连接"<<endl; close(i); FD_CLR(i,&readset); } else if(len_r>0) { cout<<"接收到数据:"<<buf<<endl; write(i,buf,strlen(buf)+1); } } i++; } } } } close(sockfd); return 0; }
|
Poll
Poll和Select是很像的,只不过Poll解决了Select只能最多有1024个文件描述符的问题,但是Poll还是需要有一个从用户态拷贝到内核态的过程,所以一旦文件描述符数量多了,它的开销还是会很大,而且Poll也依旧需要用户遍历来寻找发生改变的文件描述符
Poll API
Poll的API都是包含在头文件<poll.h>里面的,poll也不再使用文件描述符集合来保存文件描述符,而是改用一个结构体
1 2 3 4 5 6 7 8 9
| struct pollfd{ int fd; short events; short revent; }
fd: 委托内核检测的文件描述符 events: 委托内核检测的文件描述符的什么事件,例如 写事件(POLLOUT)、读事件(POLLIN) revents: 文件描述符实际发生的事情
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| int poll(struct pollfd *fds,nfds_t nfds,int timeout)
参数: fds: 一个struct pollfd结构体数组,这是一个需要检测的文件描述符集合 nfds: 这是第一个参数数组中最后一个有效元素的下标+1 timeout: 阻塞时长 0: 不阻塞 -1: 阻塞,当检测到需要检测的文件描述符发生变化,解除阻塞 >0: 阻塞的时长 返回值: -1: 失败 n(n>0): 成功,检测到数组中有n个文件描述符发生变化
|
Poll的缺点
Poll只解决了Select文件描述符集合大小的限制,但是性能方面依旧没有解决,所以为了解决这个问题,就发明了至今都还在使用的一种IO多路复用,Epoll
服务端采用Poll的示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
#include <iostream> using namespace std; #include <unistd.h> #include <arpa/inet.h> #include <sys/time.h> #include <sys/types.h> #include <stdio.h> #include <string.h> #include <poll.h> int main() { int sockfd=socket(AF_INET,SOCK_STREAM,0);
sockaddr_in addr; addr.sin_port=htons(6666); addr.sin_family=AF_INET; addr.sin_addr.s_addr=INADDR_ANY; int ret=bind(sockfd,(struct sockaddr *)&addr,sizeof(addr)); if(ret==-1) { perror("bind"); } listen(sockfd,5);
struct pollfd fds[1024]; for(int i=0;i++;i<1024) { fds[i].fd=-1; } fds[0].fd=sockfd; fds[0].events=POLLIN; int maxfd=0;
while(1) { ret=poll(fds,maxfd+1,-1); if(ret==-1) { perror("poll"); break; } else if(ret==0) { continue; } else if(ret>0) {
if(fds[0].revents & POLLIN) { sockaddr_in addr2; socklen_t len=sizeof(addr2); int newsockfd=accept(sockfd,(struct sockaddr *)&addr2,&len); char ip[16]; inet_ntop(AF_INET,(void *)&addr2.sin_addr.s_addr,ip,sizeof(ip)); cout<<"客户端的IP地址为:"<<ip<<endl; cout<<"客户端的端口为:"<<ntohs(addr2.sin_port)<<endl;
for(int j=1;j<1024;j++) { if(fds[j].fd==-1) { fds[j].fd=newsockfd; fds[j].events=POLLIN; maxfd=maxfd>j?maxfd:j; break; } } } else { int i=1; while(i<=maxfd) { if(fds[i].revents & POLLIN) { char buf[1024]; int len_r=read(fds[i].fd,buf,sizeof(buf)); if(len_r==-1) { perror("read"); exit(0); } else if(len_r==0) { cout<<"客户端断开了连接"<<endl; close(fds[i].fd); fds[i].fd=-1; } else if(len_r>0) { cout<<"接收到数据:"<<buf<<endl; write(fds[i].fd,buf,strlen(buf)+1); } } i++; } } } } close(sockfd); return 0; }
|
Epoll
Epoll是目前为止都在使用的一种IO多路复用,其不光解决了select限制文件描述符大小的问题,也解决了Select和Poll的性能问题
Epoll API
包含头文件<sys/epoll.h>
1 2 3 4 5 6 7 8
| int epoll_create(int size)
参数: size: 目前无异议,只需>0就好 返回值 -1: 失败 n(n>0): 成功,返回一个文件描述符,须知epoll实例也需要用一个文件描述符来标志
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event)
参数: epfd: 通过epoll_create返回的文件描述符,标志创建的epoll实例 op: 对文件描述符的操作 EPOLL_CTL_ADD:添加(添加在实例中的红黑树) EPOLL_CTL_MOD:修改 EPOLL_CTL_DEL:删除(从实例中的红黑树删除) fd: 要检测的文件描述符 event: 检测文件描述符的事情 struct epoll_event{ uint32_t events; epoll_data_t date; }
typedef union epoll_data{ void *ptr; int fd;(主要使用) uint32_t u32; uint64_t u64; } epoll_data_t
返回值: 成功,0 失败,-1
|
1 2 3 4 5 6 7 8 9 10 11
| int epoll_wait(int epfd,struct epoll_event *event,int maxevent,int timeout)
参数: epfd: epoll实例的文件描述符 event: 传出参数,保存发生了改变的文件描述符,是一个数组 maxevent: 第二个参数(结构体数组)的大小 timoeout: 阻塞时间 0: 不阻塞 -1: 阻塞,直到检测到有文件描述符对应的缓冲区发生了改变 >0: 阻塞时长
|
Epoll的底层实现
epoll机制是通过红黑树和双向链表来实现的
实现过程
- 首先epoll_create创建一个epoll文件描述符,底层同时创建一个红黑树,和一个就绪链表
- 红黑树存储所监控的文件描述符的节点数据,就绪链表存储就绪的文件描述符的节点数据
- epoll_ctl将会添加新的描述符,首先判断是红黑树上是否有此文件描述符节点,如果有,则立即返回)如果没有, 则在树干上插入新的节点,并且告知内核注册回调函数
- 当接收到某个文件描述符过来数据时,那么内核将该节点插入到就绪链表里面
- epoll_wait将会接收到消息,并且将数据拷贝到用户空间,清空链表
对于LT模式,epoll_wait清空就绪链表之后会检查该文件描述符是哪一种模式,如果为LT模式,且必须该节点确实有事件未处理,那么就会把该节点重新放入到刚刚删除掉的且刚准备好的就绪链表,epoll_wait马上返回)ET模式不会检查,只会调用一次
Epoll的两种模式
LT(level-triggered)模式,是缺省的工作模式,也就是系统默认的工作模式,并且同时支持阻塞和非阻塞两种socket)在这种工作模式下,如果内核通知你有一个文件描述符发生了改变,只要你并没有对这个文件描述符做操作使其恢复到原本状态,那么内核就会不断通知你
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
#include <iostream> using namespace std; #include <unistd.h> #include <arpa/inet.h> #include <sys/time.h> #include <sys/types.h> #include <stdio.h> #include <string.h> #include <sys/epoll.h> int main() { int sockfd=socket(AF_INET,SOCK_STREAM,0);
sockaddr_in addr; addr.sin_port=htons(5555); addr.sin_family=AF_INET; addr.sin_addr.s_addr=INADDR_ANY; int ret=bind(sockfd,(struct sockaddr *)&addr,sizeof(addr)); if(ret==-1) { perror("bind"); } listen(sockfd,5);
int epfd=epoll_create(1);
struct epoll_event event; event.events=EPOLLIN; event.data.fd=sockfd; epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event);
struct epoll_event newevent[1024];
while(1) { ret=epoll_wait(epfd,newevent,1024,-1); if(ret==-1) { perror("epoll"); break; } else if(ret==0) { continue; } else if(ret>0) { int i=0; while(i<ret) { if(newevent[i].data.fd==sockfd) { sockaddr_in addr2; socklen_t len=sizeof(addr2); int newsockfd=accept(sockfd,(struct sockaddr *)&addr2,&len); char ip[16]; inet_ntop(AF_INET,(void *)&addr2.sin_addr.s_addr,ip,sizeof(ip)); cout<<"客户端的IP地址为:"<<ip<<endl; cout<<"客户端的端口为:"<<ntohs(addr2.sin_port)<<endl;
struct epoll_event sonevent; sonevent.events=EPOLLIN; sonevent.data.fd=newsockfd; epoll_ctl(epfd,EPOLL_CTL_ADD,newsockfd,&sonevent); } else { char buf[5]; int len_r=read(newevent[i].data.fd,buf,sizeof(buf)); if(len_r==-1) { perror("read"); exit(0); } else if(len_r==0) { cout<<"客户端断开了连接"<<endl; close(newevent[i].data.fd); epoll_ctl(epfd,EPOLL_CTL_DEL,newevent[i].data.fd,NULL); } else if(len_r>0) { cout<<"接收到数据:"<<buf<<endl; write(newevent[i].data.fd,buf,strlen(buf)+1); } sleep(1); } i++; } } } close(epfd); close(sockfd); return 0; }
|
ET(edge-triggered)是一种高速工作模式,只支持NONBLOCK的socket,在这种模式下面,当描述符从未就绪状态变为已就绪状态的时候,内核通过epoll告诉你,然后它就假设你已经知道了文件描述符已经就绪,此后,它就不会再给你发送更多的就绪通知了,直到你做了某些操作导致这个文件描述符不再为就绪状态)
如果一直不对这个文件描述符进行操作,此后内核也不会发送更多的通知
为什么要有ET模式呢
ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率就比LT模式高
我们假设有一个文件大小为100MB,但是我们用户每次只读取10MB,如果是LT模式的话,我们用阻塞的read函数,内核就会对这个事件重复触发10次,但是如果是ET模式的话,只会触发一次,所以效率就比较高
为什么ET是要用非阻塞的函数呢
我们知道,ET只会通知一次,所以我们就必须在接收到这个通知的时候,一次性将缓冲区的数据读取完,但是我们每次读取的数据大小有限,所以我们就需要循环去读取,也就是
while(read()>0),但是当我们将缓冲区读取完并且read函数为阻塞的情况下,程序就会阻塞在read上,这样这个线程就会卡在这里,没办法去执行其他操作,因为我们的服务器主线程一般都需要通过不断地循环调用epoll_wait(),来确保可以有新的连接,所以如果read是阻塞的情况下,就没办法接收新的连接了
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
#include <iostream> using namespace std; #include <unistd.h> #include <fcntl.h> #include <arpa/inet.h> #include <sys/time.h> #include <sys/types.h> #include <stdio.h> #include <string.h> #include <sys/epoll.h> int main() { int sockfd=socket(AF_INET,SOCK_STREAM,0);
sockaddr_in addr; addr.sin_port=htons(5555); addr.sin_family=AF_INET; addr.sin_addr.s_addr=INADDR_ANY; int ret=bind(sockfd,(struct sockaddr *)&addr,sizeof(addr)); if(ret==-1) { perror("bind"); } listen(sockfd,5);
int epfd=epoll_create(1);
struct epoll_event event; event.events=EPOLLIN; event.data.fd=sockfd; epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event);
struct epoll_event newevent[1024];
while(1) { ret=epoll_wait(epfd,newevent,1024,-1); if(ret==-1) { perror("epoll"); break; } else if(ret>0) { int i=0; while(i<ret) { if(newevent[i].data.fd==sockfd) { sockaddr_in addr2; socklen_t len=sizeof(addr2); int newsockfd=accept(sockfd,(struct sockaddr *)&addr2,&len); char ip[16]; inet_ntop(AF_INET,(void *)&addr2.sin_addr.s_addr,ip,sizeof(ip)); cout<<"客户端的IP地址为:"<<ip<<endl; cout<<"客户端的端口为:"<<ntohs(addr2.sin_port)<<endl; int flag=fcntl(newsockfd,F_GETFL); flag=flag | O_NONBLOCK; fcntl(newsockfd,F_SETFL,flag);
struct epoll_event sonevent; sonevent.events=EPOLLIN | EPOLLET; sonevent.data.fd=newsockfd; epoll_ctl(epfd,EPOLL_CTL_ADD,newsockfd,&sonevent); } else { int len=0; char buf[5]; while((len=read(newevent[i].data.fd,buf,sizeof(buf)))>0) { cout<<"接收到数据:"<<buf<<endl; write(newevent[i].data.fd,buf,strlen(buf)+1); } if(len==0) { cout<<"客户端断开了连接"<<endl; } else if(len==-1) { if(errno==EAGAIN) { cout<<"所有数据都读取完毕"<<endl; } else { perror("read"); exit(-1); } } char buf[5]; int len_r=read(newevent[i].data.fd,buf,sizeof(buf)); if(len_r==-1) { perror("read"); exit(0); } else if(len_r==0) { cout<<"客户端断开了连接"<<endl; close(newevent[i].data.fd); epoll_ctl(epfd,EPOLL_CTL_DEL,newevent[i].data.fd,NULL); } else if(len_r>0) { cout<<"接收到数据:"<<buf<<endl; write(newevent[i].data.fd,buf,strlen(buf)+1); } sleep(1); } i++; } } } close(epfd); close(sockfd); return 0; }
|
Epoll的优势
- Epoll没有文件描述符数量大小的限制
- Epoll创建的时候是直接在内核中创建,并且添加文件描述符等操作都是直接在内核中进行的,所以不会有从用户态拷贝到内核态的开销,不过返回链表的数据的时候,还是会将链表里的数据拷贝到数组里的,还是会有从内核态到用户态的开销
- Epoll返回触发的时候,只会返回监听事件发生了的文件描述符,每次返回都是有时间出发的文件描述符,所以用户不需要遍历所有的文件描述符
- Epoll底层是红黑树,所以对文件描述符的遍历复杂度很低,效率很高
结尾
很久没写博客了,秋招笔试写了好多啊,看到各位大佬都拿到offer了,就还是会有点焦虑,offer快来吧