评论

收藏

[C++] 并发服务器--02(基于I/O复用——运用epoll技术)

编程语言 编程语言 发布于:2021-07-31 13:35 | 阅读数:282 | 评论:0

本文承接自上一博文I/O复用——运用Select函数。
epoll介绍epoll是在2.6内核中提出的。和select类似,它也是一种I/O复用技术,是之前的select和poll的增强版本。
Linux下设计并发网络程序,向来不缺少方法,比如典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那为何还要再引入epoll呢?我们先来看一下常用模型的缺点:
PPC/TPC模型
这两种模型思想类似,就是让每一个到来的连接一边自己做事去,别再来烦我。只是PPC是为它开了一个进程,而TPC开了一个线程。可是别烦我是有代价的,它要时间和空间啊,连接多了之后,那么多的进程/线程切换,这开销就上来了;因此这类模型能接受的最大连接数都不会高,一般在几百个左右。
select模型
1)最大并发数限制,因为一个进程所打开的FD(文件描述符)是有限制的,由FD_SETSIZE设置,默认值是1024/2048,因此Select模型的最大并发数就被相应限制了。自己改改这个FD_SETSIZE?想法虽好,可是先看看下面吧…
2)效率问题,select每次调用都会线性扫描全部的FD集合,这样效率就会呈现线性下降,把FD_SETSIZE改大的后果就是,大家都慢慢来,什么?都超时了??!!
3)内核/用户空间内存拷贝问题,如何让内核把FD消息通知给用户空间呢?在这个问题上select采取了内存拷贝方法。
poll模型
基本上效率和select是相同的,select缺点的2和3它都没有改掉。
epoll的提升
其实把select的缺点反过来那就是Epoll的优点了:
1)epoll没有最大并发连接的限制,上限是最大可以打开文件的数目(一般远大于2048),一般跟系统内存关系很大。具体数目可以cat /proc/sys/fs/file-max察看。
2)效率提升,epoll最大的优点就在于它只管“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,epoll的效率就会远远高于select和poll。
3)内存拷贝,epoll在这点上使用了“共享内存”,所以这个内存拷贝也省略了。
epoll接口epoll操作过程用到的三个接口如下:
#include <sys/epoll.h>
int epoll_create(int size);  
// 返回:若成功返回非负epoll描述符,否则返回-1,并设置errno的值
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *events);  
// 返回:若成功返回0,否则返回-1,并设置errno的值
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);  
// 返回:若成功返回在timeout时间内就绪的文件描述符数,否则返回-1,并设置errno的值
epoll_create
该函数返回一个epoll的描述符(一个整数)。size用来告诉内核这个监听的数目一共有多大,不同于select()中的第一个参数给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id号/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
如下图是刚打开服务端程序(本文ET模式服务器程序)的截图:
DSC0000.png

上图中,前三个/dev/pts/1应该表示系统输入、输出及异常,socket表示监听套接字,eventepoll表示进程创建的epoll。
另外,当有新的客户连接到服务端时,截图如下:
DSC0001.png


epoll_ctl
该函数是epoll的事件注册函数。它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里注册要监听的事件类型。
该函数的参数说明如下(fd是file descriptor的缩写,表示文件描述符):
1)第一个参数是epoll_create函数的返回值。
2)第二个参数表示动作:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd。
3)第三个参数表示需要监听的fd。
4)第四个参数告诉内核需要监听什么事。struct epoll_event的结构如下:
struct epoll_event {
  uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

epoll_wait
该函数等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合;maxevents告诉内核返回的events的最大大小,这个maxevents的值不能大于创建epoll_create()时的size,也必须大于0;参数timeout是超时时间(毫秒,0会立即返回,-1将永久阻塞)。该函数返回需要处理的数目,如返回0表示已超时。
epoll工作模式这部分要详细参考博文Epoll在LT和ET模式下的读写方式(已附于文章最后)。
在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)
从字面上看,意思是:
* EAGAIN:再试一次
* EWOULDBLOCK:如果这是一个阻塞socket,操作将被block
* perror输出:Resource temporarily unavailable
总结:
这个错误表示资源暂时不够,可能read时,读缓冲区没有数据,或者,write时,写缓冲区满了。
遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1,同时errno设置为EAGAIN。所以,对于阻塞socket, read/write返回-1代表网络出错了。但对于非阻塞socket, read/write返回-1不一定网络真的出错了,可能是Resource temporarily unavailable。 这时我们应该再试,直到Resource available。

综上,对于non-blocking的socket,正确的读写操作为:
读:忽略掉errno = EAGAIN的错误,下次继续读。
写:忽略掉errno = EAGAIN的错误,下次继续写。
对于select和epoll的LT模式,这种读写方式是没有问题的。 但对于epoll的ET模式,这种方式还有漏洞。
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
LT模式:只要某个监听中的文件描述符处于readable/writable状态,无论什么时候进行epoll_wait都会返回该描述符;
ET模式:只有某个监听中的文件描述符从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该描述符。
ET模式下套接字要设定为non-blocking
我们先来看一下Linux官方给出的手册关于LT和ET的一个例子:
The epoll event distribution interface is able to behave both as
     edge-triggered (ET) and as level-triggered (LT).  The difference
     between the two mechanisms can be described as follows.  Suppose that
     this scenario happens:
     1. The file descriptor that represents the read side of a pipe (rfd)
      is registered on the epoll instance.
     2. A pipe writer writes 2 kB of data on the write side of the pipe.
     3. A call to epoll_wait(2) is done that will return rfd as a ready
      file descriptor.
     4. The pipe reader reads 1 kB of data from rfd.
     5. A call to epoll_wait(2) is done.
     If the rfd file descriptor has been added to the epoll interface
     using the EPOLLET (edge-triggered) flag, the call to epoll_wait(2)
     done in step 5 will probably hang despite the available data still
     present in the file input buffer; meanwhile the remote peer might be
     expecting a response based on the data it already sent.  The reason
     for this is that edge-triggered mode delivers events only when
     changes occur on the monitored file descriptor.  So, in step 5 the
     caller might end up waiting for some data that is already present
     inside the input buffer.  In the above example, an event on rfd will
     be generated because of the write done in 2 and the event is consumed
     in 3.  Since the read operation done in 4 does not consume the whole
     buffer data, the call to epoll_wait(2) done in step 5 might block
     indefinitely.
     An application that employs the EPOLLET flag should use nonblocking
     file descriptors to avoid having a blocking read or write starve a
     task that is handling multiple file descriptors.  The suggested way
     to use epoll as an edge-triggered (EPOLLET) interface is as follows:
        i   with nonblocking file descriptors; and
        ii  by waiting for an event only after read(2) or write(2)
          return EAGAIN.
     By contrast, when used as a level-triggered interface (the default,
     when EPOLLET is not specified), epoll is simply a faster poll(2), and
     can be used wherever the latter is used since it shares the same
     semantics.

从上述例子,我们可以看出:
1. 在LT模式下,只要某个监听中的文件描述符处于readable/writable状态,无论什么时候进行epoll_wait都会返回该描述符。所以,只要读缓冲区有数据或者写缓冲区仍有空间,那就可读或可写, 都不会因套接字设定为blocking或者non-blocking而导致epoll_wait进入无限期等待;
2. 在ET模式下,只有某个监听中的文件描述符从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该描述符。所以当我们没有把读缓冲区的数据全部读完或者没有把写缓冲区的空间写满就返回,套接字就会一直处于不可读或者不可写的状态,这样read/write会阻塞直到套接字可读或可写,但在这种情况下,套接字不可能变为可读或可写,所以read/write会一直阻塞下去,从而导致epoll_wait无限期阻塞于该套接字而无法返回。所以,要将套接字设定为non-blocking,让epoll_waite可以及时返回。
下边两图可形象展示LT和ET的区别:
从socket读数据:
DSC0002.jpg
往socket写数据:
DSC0003.jpg

所以,在epoll的ET模式下,正确的读写方式为:
读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN
示例
示例1:回射程序(LT模式)
这里服务器端程序主要摘自博文IO多路复用之epoll总结,不过修复了部分Bug,具体如下:
DSC0004.gif DSC0005.gif
1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <string.h>
  4 #include <errno.h>
  5 
  6 #include <netinet/in.h>
  7 #include <sys/socket.h>
  8 #include <arpa/inet.h>
  9 #include <sys/epoll.h>
 10 #include <unistd.h>
 11 #include <sys/types.h>
 12 
 13 #define PORT    9877
 14 #define BUFFERSIZ  1024
 15 #define LISTENQ   1024
 16 #define FDSIZE    1024
 17 #define EPOLLEVENTS 100
 18 
 19 //函数声明
 20 //创建套接字并进行绑定
 21 static int socket_bind(int port);
 22 //IO多路复用epoll
 23 static void do_epoll(int listenfd);
 24 //事件处理函数
 25 static void
 26 handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
 27 //处理接收到的连接
 28 static void handle_accpet(int epollfd, int listenfd);
 29 //读处理
 30 static void do_read(int epollfd, int fd, char *buf);
 31 //写处理
 32 static void do_write(int epollfd, int fd, char *buf);
 33 //添加事件
 34 static void add_event(int epollfd, int fd, int state);
 35 //修改事件
 36 static void modify_event(int epollfd, int fd, int state);
 37 //删除事件
 38 static void delete_event(int epollfd, int fd, int state);
 39 //close socket
 40 void Close(int fd);
 41 
 42 int main(int argc, char *argv[])
 43 {
 44   int  listenfd;
 45   listenfd = socket_bind(PORT);
 46   listen(listenfd, LISTENQ);
 47   do_epoll(listenfd);
 48   exit(0);
 49 }
 50 
 51 static int socket_bind(int port)
 52 {
 53   int  listenfd;
 54   struct sockaddr_in servaddr;
 55   listenfd = socket(AF_INET, SOCK_STREAM, 0);
 56   if (listenfd == -1)
 57   {
 58     perror("socket error:");
 59     exit(1);
 60   }
 61   bzero(&servaddr, sizeof(servaddr));
 62   servaddr.sin_family = AF_INET;
 63   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 64   servaddr.sin_port = htons(port);
 65   if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1)
 66   {
 67     perror("bind error: ");
 68     exit(1);
 69   }
 70   return listenfd;
 71 }
 72 
 73 static void do_epoll(int listenfd)
 74 {
 75   int epollfd;
 76   struct epoll_event events[EPOLLEVENTS];
 77   int num;
 78   char buf[BUFFERSIZ];
 79   memset(buf, 0, BUFFERSIZ);
 80   //创建一个描述符
 81   epollfd = epoll_create(FDSIZE);
 82   //添加监听描述符事件
 83   add_event(epollfd, listenfd, EPOLLIN);
 84   for ( ; ; )
 85   {
 86     //获取已经准备好的描述符事件
 87     if ((num = epoll_wait(epollfd, events, EPOLLEVENTS, -1)) == -1) 
 88     {
 89       perror("epoll_pwait");
 90       exit(1);
 91     }
 92     handle_events(epollfd, events, num, listenfd, buf);
 93   }
 94   Close(epollfd);
 95 }
 96 
 97 static void
 98 handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf)
 99 {
100   int i;
101   int fd;
102   // 遍历
103   for (i = 0; i < num; i++)
104   {
105     fd = events[i].data.fd;
106     //根据描述符的类型和事件类型进行处理
107     if ((fd == listenfd) && (events[i].events & EPOLLIN))
108       handle_accpet(epollfd, listenfd);
109     else if (events[i].events & EPOLLIN)
110       do_read(epollfd, fd, buf);
111     else if (events[i].events & EPOLLOUT)
112       do_write(epollfd, fd, buf);
113   }
114 }
115 static void handle_accpet(int epollfd, int listenfd)
116 {
117   int clifd;
118   struct sockaddr_in cliaddr;
119   socklen_t  cliaddrlen;
120   clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &cliaddrlen);
121   if (clifd == -1)
122     perror("accpet error:");
123   else
124   {
125     printf("accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
126     //添加一个客户描述符和事件
127     add_event(epollfd, clifd, EPOLLIN);
128   }
129 }
130 
131 static void do_read(int epollfd, int fd, char *buf)
132 {
133   int nread;
134   nread = read(fd, buf, BUFFERSIZ);
135   if (nread == -1)
136   {
137     perror("read error:");
138     //Close(fd);
139     delete_event(epollfd, fd, EPOLLIN);
140   }
141   else if (nread == 0)
142   {
143     fprintf(stderr, "client close.\n");
144     //Close(fd);
145     delete_event(epollfd, fd, EPOLLIN);
146   }
147   else
148   {
149     printf("read message is : %s", buf);
150     //修改描述符对应的事件,由读改为写
151     modify_event(epollfd, fd, EPOLLOUT);
152   }
153 }
154 
155 static void do_write(int epollfd, int fd, char *buf)
156 {
157   int nwrite;
158   nwrite = write(fd, buf, strlen(buf));
159   if (nwrite == -1)
160   {
161     perror("write error:");
162     //Close(fd);
163     delete_event(epollfd, fd, EPOLLOUT);
164   }
165   else
166     modify_event(epollfd, fd, EPOLLIN);
167   memset(buf, 0, BUFFERSIZ);
168 }
169 
170 static void add_event(int epollfd, int fd, int state)
171 {
172   struct epoll_event ev;
173   ev.events = state;
174   ev.data.fd = fd;
175   if((epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) == -1)
176   {
177     perror("epoll_ctl: add");
178   }
179 }
180 
181 static void delete_event(int epollfd, int fd, int state)
182 {
183   struct epoll_event ev;
184   ev.events = state;
185   ev.data.fd = fd;
186   if((epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev)) == -1)
187   {
188     perror("epoll_ctl: del");
189   }
190   Close(fd);  
191   // 如果描述符fd已关闭,再从epoll中删除fd,则会出现epoll failed: Bad file descriptor问题
192   // 所以要先从epoll中删除fd,在关闭fd. 
193 }
194 
195 static void modify_event(int epollfd, int fd, int state)
196 {
197   struct epoll_event ev;
198   ev.events = state;
199   ev.data.fd = fd;
200   if((epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev)) == -1)
201   {
202     perror("epoll_ctl: mod");
203   }
204 }
205 
206 void Close(int fd)
207 {
208   if((close(fd)) < 0)
209   {
210     perror("close socket error");
211     exit(1);
212   }
213 }
servepoll
客户端程序:
1 #include <netinet/in.h>
  2 #include <sys/socket.h>
  3 #include <stdio.h>
  4 #include <string.h>
  5 #include <stdlib.h>
  6 #include <sys/epoll.h>
  7 #include <time.h>
  8 #include <unistd.h>
  9 #include <sys/types.h>
 10 #include <arpa/inet.h>
 11 #include <errno.h>
 12 
 13 #define BUFFERSIZ  1024
 14 #define SERV_PORT   9877
 15 #define FDSIZE    1024
 16 #define EPOLLEVENTS 100
 17 
 18 void handle_connection(int sockfd);
 19 void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf);
 20 void add_event(int epollfd, int fd, int state);
 21 void delete_event(int epollfd, int fd, int state);
 22 void modify_event(int epollfd, int fd, int state);
 23 
 24 ssize_t Read(int fd, void *ptr, size_t nbytes);
 25 void Write(int fd, void *ptr, size_t nbytes);
 26 ssize_t writen(int fd, const void *vptr, size_t n);
 27 void Writen(int fd, void *ptr, size_t nbytes);
 28 
 29 void Close(int fd);
 30 
 31 int main(int argc, char **argv)
 32 {
 33   if(argc != 2)
 34     perror("usage: tcpcli <IPaddress>");
 35   
 36   int sockfd;
 37   struct sockaddr_in  servaddr;
 38   if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
 39   {
 40     perror("socket error\n");
 41     exit(1);
 42   }
 43   
 44   bzero(&servaddr, sizeof(servaddr));
 45   servaddr.sin_family = AF_INET;
 46   servaddr.sin_port = htons(SERV_PORT);
 47   if((inet_pton(AF_INET, argv[1], &servaddr.sin_addr)) <= 0)
 48   {
 49     perror("inet_pton error");
 50     exit(1);
 51   }
 52 
 53   if((connect(sockfd, (struct sockaddr*) &servaddr, sizeof(servaddr))) < 0)
 54   {
 55     perror("connect error\n");
 56     exit(1);
 57   }
 58   
 59   handle_connection(sockfd);
 60   
 61   Close(sockfd);
 62   exit(0);
 63 }
 64 
 65 void handle_connection(int sockfd)
 66 {
 67   int epollfd;
 68   struct epoll_event events[EPOLLEVENTS];
 69   char buf[BUFFERSIZ];
 70   int num;
 71   epollfd = epoll_create(FDSIZE);
 72   add_event(epollfd, sockfd, EPOLLIN);
 73   add_event(epollfd, STDIN_FILENO, EPOLLIN);
 74   for ( ; ; )
 75   {
 76     num = epoll_wait(epollfd, events, EPOLLEVENTS, -1);
 77     handle_events(epollfd, events, num, sockfd, buf);
 78   }
 79   Close(epollfd);
 80 }
 81 
 82 void handle_events(int epollfd, struct epoll_event *events, int num, int sockfd, char *buf)
 83 {
 84   int fd;
 85   int i;
 86   int stdineof = 0;
 87   for (i = 0; i < num; i++)
 88   {
 89     fd = events[i].data.fd;
 90     if(fd == sockfd)
 91     {
 92       int nread;
 93       nread = Read(sockfd, buf, BUFFERSIZ);
 94       if (nread == 0)
 95       {
 96         if(stdineof == 1)
 97           return;
 98         fprintf(stderr, "server close\n");
 99         Close(sockfd);
100         Close(epollfd);
101         exit(1);
102       }
103       Write(fileno(stdout), buf, nread);
104     }
105     else
106     {
107       int nread;
108       nread = Read(fd, buf, BUFFERSIZ);
109       if (nread == 0)
110       {
111         stdineof = 1;
112         fprintf(stderr, "no inputs\n");
113         Close(fd);
114         continue;
115       }
116       modify_event(epollfd, sockfd, EPOLLOUT);
117       Writen(sockfd, buf, nread);
118       modify_event(epollfd, sockfd, EPOLLIN);
119       memset(buf, 0, BUFFERSIZ);
120     }
121   }
122 }
123 
124 void add_event(int epollfd, int fd, int state)
125 {
126   struct epoll_event ev;
127   ev.events = state;
128   ev.data.fd = fd;
129   if(epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == -1)
130   {
131     perror("epoll_ctl: add");
132   }
133 }
134 
135 void delete_event(int epollfd, int fd, int state)
136 {
137   struct epoll_event ev;
138   ev.events = state;
139   ev.data.fd = fd;
140   if(epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1)
141   {
142     perror("epoll_ctl: del");
143   }
144 }
145 
146 void modify_event(int epollfd, int fd, int state)
147 {
148   struct epoll_event ev;
149   ev.events = state;
150   ev.data.fd = fd;
151   if(epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
152   {
153     perror("epoll_ctl: mod");
154   }
155 }
156 
157 ssize_t Read(int fd, void *ptr, size_t nbytes)
158 {
159   ssize_t n;
160 
161   if ( (n = read(fd, ptr, nbytes)) == -1)
162     perror("read error");
163   return(n);
164 }
165 
166 void Write(int fd, void *ptr, size_t nbytes)
167 {
168   if (write(fd, ptr, nbytes) != nbytes)
169     perror("write error");
170 }
171 
172 /* Write "n" bytes to a descriptor. */
173 ssize_t writen(int fd, const void *vptr, size_t n)
174 {
175   size_t    nleft;
176   ssize_t    nwritten;
177   const char  *ptr;
178 
179   ptr = vptr;
180   nleft = n;
181   while (nleft > 0) {
182     if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
183       if (nwritten < 0 && errno == EINTR)
184         nwritten = 0;    /* and call write() again */
185       else
186         return(-1);      /* error */
187     }
188 
189     nleft -= nwritten;
190     ptr   += nwritten;
191   }
192   return(n);
193 }
194 
195 void Writen(int fd, void *ptr, size_t nbytes)
196 {
197   if (writen(fd, ptr, nbytes) != nbytes)
198     perror("writen error");
199 }
200 
201 void Close(int fd)
202 {
203   if((close(fd)) < 0)
204   {
205     perror("close error");
206     exit(1);
207   }
208 }
cliepoll
程序运行截图如下:
1)客户端主动连接主动关闭
客户端:
DSC0006.png

服务器端:
DSC0007.png

2)客户端主动连接被动关闭
客户端:
DSC0008.png

服务器端:
DSC0009.png


示例2:回射程序(ET模式)
这里我们只给出服务器端的程序。这部分程序部分参考自博文Epoll在LT和ET模式下的读写方式。具体程序如下:
1 #include <stdio.h>
  2 #include <stdlib.h>
  3 #include <string.h>
  4 #include <errno.h>
  5 #include <fcntl.h>
  6 
  7 #include <netinet/in.h>
  8 #include <sys/socket.h>
  9 #include <arpa/inet.h>
 10 #include <sys/epoll.h>
 11 #include <unistd.h>
 12 #include <sys/types.h>
 13 
 14 #define PORT    9877
 15 #define BUFFERSIZ  1024
 16 #define LISTENQ   1024
 17 #define FDSIZE    1024
 18 #define EPOLLEVENTS 100
 19 
 20 //函数声明
 21 //创建套接字并进行绑定
 22 int socket_bind(int port);
 23 //IO多路复用epoll
 24 void do_epoll(int listenfd);
 25 //事件处理函数
 26 void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf);
 27 //处理接收到的连接
 28 void handle_accpet(int epollfd, int listenfd);
 29 //读处理
 30 void do_read(int epollfd, int fd, char *buf);
 31 //写处理
 32 void do_write(int epollfd, int fd, char *buf);
 33 
 34 //设置socket连接为非阻塞模式
 35 void setnonblocking(int sockfd);
 36 //close socket
 37 void Close(int fd);
 38 
 39 int main(int argc, char *argv[])
 40 {
 41   int  listenfd;
 42   listenfd = socket_bind(PORT);
 43   listen(listenfd, LISTENQ);
 44   do_epoll(listenfd);
 45   exit(0);
 46 }
 47 
 48 int socket_bind(int port)
 49 {
 50   int  listenfd;
 51   struct sockaddr_in servaddr;
 52   listenfd = socket(AF_INET, SOCK_STREAM, 0);
 53   if (listenfd == -1)
 54   {
 55     perror("socket error:");
 56     exit(1);
 57   }
 58   setnonblocking(listenfd);
 59   bzero(&servaddr, sizeof(servaddr));
 60   servaddr.sin_family = AF_INET;
 61   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
 62   servaddr.sin_port = htons(port);
 63   if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1)
 64   {
 65     perror("bind error: ");
 66     exit(1);
 67   }
 68   return listenfd;
 69 }
 70 
 71 void do_epoll(int listenfd)
 72 {
 73   int epollfd;
 74   struct epoll_event ev;
 75   struct epoll_event events[EPOLLEVENTS];
 76   int num;
 77   char buf[BUFFERSIZ];
 78   memset(buf, 0, BUFFERSIZ);
 79   //创建一个描述符
 80   epollfd = epoll_create(FDSIZE);
 81   //添加监听描述符事件
 82   ev.events = EPOLLIN;
 83   ev.data.fd = listenfd;
 84   if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) 
 85   {
 86     perror("epoll_ctl: add");
 87     exit(1);
 88   }
 89 
 90   for ( ; ; )
 91   {
 92     //获取已经准备好的描述符事件
 93     if ((num = epoll_wait(epollfd, events, EPOLLEVENTS, -1)) == -1) 
 94     {
 95       perror("epoll_wait");
 96       exit(1);
 97     }
 98     handle_events(epollfd, events, num, listenfd, buf);
 99   }
100   Close(epollfd);
101 }
102 
103 void handle_events(int epollfd, struct epoll_event *events, int num, int listenfd, char *buf)
104 {
105   int i;
106   int fd;
107   // 遍历
108   for (i = 0; i < num; i++)
109   {
110     fd = events[i].data.fd;
111     //根据描述符的类型和事件类型进行处理
112     if ((fd == listenfd) && (events[i].events & EPOLLIN))
113       handle_accpet(epollfd, listenfd);
114     else if (events[i].events & EPOLLIN)
115       do_read(epollfd, fd, buf);
116     else if (events[i].events & EPOLLOUT)
117       do_write(epollfd, fd, buf);
118   }
119 }
120 
121 void handle_accpet(int epollfd, int listenfd)
122 {
123   int connfd;
124   struct sockaddr_in cliaddr;
125   socklen_t  cliaddrlen;
126   struct epoll_event ev;
127   while ((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddrlen)) > 0) 
128   {
129     printf("accept a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr), cliaddr.sin_port);
130     setnonblocking(connfd);
131     ev.events = EPOLLIN | EPOLLET;
132     ev.data.fd = connfd;
133     if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &ev) == -1) 
134     {
135       perror("epoll_ctl: add");
136       exit(1);
137     }
138   }
139   if (connfd == -1) 
140   {
141     if (errno != EAGAIN && errno != ECONNABORTED 
142         && errno != EPROTO && errno != EINTR) 
143       perror("accept");
144   }
145 }
146 
147 void do_read(int epollfd, int fd, char *buf)
148 {
149   int nread;
150   int n = 0;
151   struct epoll_event ev;
152   while ((nread = read(fd, buf + n, BUFFERSIZ-1)) > 0) 
153   {
154     n += nread;
155   }
156   if (nread == -1 && errno != EAGAIN) 
157   {
158     perror("read error");
159     
160     ev.data.fd = fd;
161     ev.events = EPOLLIN | EPOLLET;
162     if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) 
163     {
164       perror("epoll_ctl: mod");
165     }
166     Close(fd);
167   }
168   else if(nread == 0)
169   {
170     perror("client close");
171     
172     ev.data.fd = fd;
173     ev.events = EPOLLIN | EPOLLET;
174     if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) 
175     {
176       perror("epoll_ctl: mod");
177     }
178     Close(fd);
179   }
180   else
181   {
182     printf("read message is : %s", buf); 
183     ev.data.fd = fd;
184     ev.events = EPOLLOUT | EPOLLET;
185     if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) 
186     {
187       perror("epoll_ctl: mod");
188     }
189   }
190 }
191 
192 void do_write(int epollfd, int fd, char *buf)
193 {
194   int nwrite, data_size = strlen(buf);
195   int n = data_size;
196   struct epoll_event ev;
197   int flag = 0;
198   while (n > 0) 
199   {
200     nwrite = write(fd, buf + data_size - n, n);
201     if (nwrite < n) 
202     {
203       if (nwrite == -1 && errno != EAGAIN) 
204       {
205         flag = 1;
206         perror("write error");
207   
208         ev.data.fd = fd;
209         ev.events = EPOLLOUT | EPOLLET;
210         if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) 
211         {
212           perror("epoll_ctl: mod");
213         }
214         Close(fd);
215       }
216       break;
217     }
218     n -= nwrite;
219   }
220   
221   if(flag != 1)
222   {
223     ev.data.fd = fd;
224     ev.events =  EPOLLIN | EPOLLET;
225     if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) 
226     {
227       perror("epoll_ctl: mod");
228     }
229   }
230   // 这句很重要
231   memset(buf, 0, BUFFERSIZ);
232 }
233 
234 void setnonblocking(int sockfd) 
235 {
236   int opts;
237   opts = fcntl(sockfd, F_GETFL);
238   if(opts < 0) 
239   {
240     perror("Error: fcntl(F_GETFL)\n");
241     exit(1);
242   }
243   opts = (opts | O_NONBLOCK);
244   if(fcntl(sockfd, F_SETFL, opts) < 0) 
245   {
246     perror("Error: fcntl(F_SETFL)\n");
247     exit(1);
248   }
249 }
250 
251 void Close(int fd)
252 {
253   if((close(fd)) < 0)
254   {
255     perror("close socket error");
256     exit(1);
257   }
258 }
servepoll2
参考资料IO多路复用之epoll总结
Epoll在LT和ET模式下的读写方式
epoll精髓
Linux Epoll介绍和程序实例
特附博文Epoll在LT和ET模式下的读写方式在一个非阻塞的socket上调用read/write函数,返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)。从字面上看,EAGAIN,再试一次;EWOULDBLOCK,如果这是一个阻塞socket,操作将被block,perror输出: Resource temporarily unavailable。
总结:
这个错误表示资源暂时不够,能read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉;而如果是非阻塞socket,read/write则立即返回-1, 同时errno设置为EAGAIN。
所以,对于阻塞socket,read/write返回-1代表网络出错了。但对于非阻塞socket,read/write返回-1不一定网络真的出错了。可能是Resource temporarily unavailable。这时你应该再试,直到Resource available。
综上,对于non-blocking的socket,正确的读写操作为:
读:忽略掉errno = EAGAIN的错误,下次继续读
写:忽略掉errno = EAGAIN的错误,下次继续写
对于select和epoll的LT模式,这种读写方式是没有问题的。但对于epoll的ET模式,这种方式还有漏洞。
epoll的两种模式LT和ET
二者的差异在于level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。
所以,在epoll的ET模式下,正确的读写方式为:
读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN
写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN
正确的读
1 n = 0;
2 while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
3   n += nread;
4 }
5 if (nread == -1 && errno != EAGAIN) {
6   perror("read error");
7 }
正确的写
1 int nwrite, data_size = strlen(buf);
 2 n = data_size;
 3 while (n > 0) {
 4   nwrite = write(fd, buf + data_size - n, n);
 5   if (nwrite < n) {
 6     if (nwrite == -1 && errno != EAGAIN) {
 7       perror("write error");
 8     }
 9     break;
10   }
11   n -= nwrite;
12 }
正确的accept,accept 要考虑 2 个问题
1)阻塞模式 accept 存在的问题
考虑这种情况:TCP连接被客户端夭折,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。
解决办法是把监听套接口设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epool,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。
2)ET模式下accept存在的问题
考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。
解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。
综合以上两种情况,服务器应该使用非阻塞地accept,accept在ET模式下的正确使用方式为:
1 while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0) {
2   handle_client(conn_sock);
3 }
4 if (conn_sock == -1) {
5   if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
6   perror("accept");
7 }
一道腾讯后台开发的面试题
使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?
第一种最普遍的方式:
需要向socket写数据的时候才把socket加入epoll,等待可写事件。
接受到可写事件后,调用write或者send发送数据。
当所有数据都写完后,把socket移出epoll。
这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。
一种改进的方式:
开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。
这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。
最后贴一个使用epoll、ET模式的简单HTTP服务器代码:
1 #include <sys/socket.h>
  2 #include <sys/wait.h>
  3 #include <netinet/in.h>
  4 #include <netinet/tcp.h>
  5 #include <sys/epoll.h>
  6 #include <sys/sendfile.h>
  7 #include <sys/stat.h>
  8 #include <unistd.h>
  9 #include <stdio.h>
 10 #include <stdlib.h>
 11 #include <string.h>
 12 #include <strings.h>
 13 #include <fcntl.h>
 14 #include <errno.h> 
 15 
 16 #define MAX_EVENTS 10
 17 #define PORT 8080
 18 
 19 //设置socket连接为非阻塞模式
 20 void setnonblocking(int sockfd) {
 21   int opts;
 22 
 23   opts = fcntl(sockfd, F_GETFL);
 24   if(opts < 0) {
 25     perror("fcntl(F_GETFL)\n");
 26     exit(1);
 27   }
 28   opts = (opts | O_NONBLOCK);
 29   if(fcntl(sockfd, F_SETFL, opts) < 0) {
 30     perror("fcntl(F_SETFL)\n");
 31     exit(1);
 32   }
 33 }
 34 
 35 int main(){
 36   struct epoll_event ev, events[MAX_EVENTS];
 37   int addrlen, listenfd, conn_sock, nfds, epfd, fd, i, nread, n;
 38   struct sockaddr_in local, remote;
 39   char buf[BUFSIZ];
 40 
 41   //创建listen socket
 42   if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
 43     perror("sockfd\n");
 44     exit(1);
 45   }
 46   setnonblocking(listenfd);
 47   bzero(&local, sizeof(local));
 48   local.sin_family = AF_INET;
 49   local.sin_addr.s_addr = htonl(INADDR_ANY);;
 50   local.sin_port = htons(PORT);
 51   if( bind(listenfd, (struct sockaddr *) &local, sizeof(local)) < 0) {
 52     perror("bind\n");
 53     exit(1);
 54   }
 55   listen(listenfd, 20);
 56 
 57   epfd = epoll_create(MAX_EVENTS);
 58   if (epfd == -1) {
 59     perror("epoll_create");
 60     exit(EXIT_FAILURE);
 61   }
 62 
 63   ev.events = EPOLLIN;
 64   ev.data.fd = listenfd;
 65   if (epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {
 66     perror("epoll_ctl: listen_sock");
 67     exit(EXIT_FAILURE);
 68   }
 69 
 70   for (;;) {
 71     nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
 72     if (nfds == -1) {
 73       perror("epoll_pwait");
 74       exit(EXIT_FAILURE);
 75     }
 76 
 77     for (i = 0; i < nfds; ++i) {
 78       fd = events[i].data.fd;
 79       if (fd == listenfd) {
 80         while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, 
 81                 (size_t *)&addrlen)) > 0) {
 82           setnonblocking(conn_sock);
 83           ev.events = EPOLLIN | EPOLLET;
 84           ev.data.fd = conn_sock;
 85           if (epoll_ctl(epfd, EPOLL_CTL_ADD, conn_sock,
 86                 &ev) == -1) {
 87             perror("epoll_ctl: add");
 88             exit(EXIT_FAILURE);
 89           }
 90         }
 91         if (conn_sock == -1) {
 92           if (errno != EAGAIN && errno != ECONNABORTED 
 93               && errno != EPROTO && errno != EINTR) 
 94             perror("accept");
 95         }
 96         continue;
 97       }  
 98       if (events[i].events & EPOLLIN) {
 99         n = 0;
100         while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
101           n += nread;
102         }
103         if (nread == -1 && errno != EAGAIN) {
104           perror("read error");
105         }
106         ev.data.fd = fd;
107         ev.events = events[i].events | EPOLLOUT;
108         if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev) == -1) {
109           perror("epoll_ctl: mod");
110         }
111       }
112       if (events[i].events & EPOLLOUT) {
113         sprintf(buf, "HTTP/1.1 200 OK\r\nContent-Length: %d\r\n\r\nHello World", 11);
114         int nwrite, data_size = strlen(buf);
115         n = data_size;
116         while (n > 0) {
117           nwrite = write(fd, buf + data_size - n, n);
118           if (nwrite < n) {
119             if (nwrite == -1 && errno != EAGAIN) {
120               perror("write error");
121             }
122             break;
123           }
124           n -= nwrite;
125         }
126         close(fd);
127       }
128     }
129   }
130 
131   return 0;
132 }
View Code



关注下面的标签,发现更多相似文章