現(xiàn)在有這么一個場景:我是一個很忙的大老板,我有100個手機,手機來信息了,我的秘書就會告訴我“老板,你的手機來信息了?!蔽液苌鷼猓业拿貢褪沁@樣子,每次手機來信息就只告訴我來信息了,老板趕緊去看。但是她從來不把話說清楚:到底是哪個手機來信息??!我可有100個手機?。∮谑牵抑荒芤粋€一個手機去查看,來確定到底是哪幾個手機來信息了。這就是IO復用中select模型的缺點!老板心想,要是秘書能把來信息的手機直接拿到我桌子上就好了,那么我的效率肯定大增(這就是epoll模型)。

那我們先來總結一下select模型的缺點:

  1. 單個進程能夠監(jiān)視的文件描述符的數(shù)量存在最大限制,通常是1024,當然可以更改數(shù)量,但由于select采用輪詢的方式掃描文件描述符,文件描述符數(shù)量越多,性能越差;(在linux內核頭文件中,有這樣的定義:#define __FD_SETSIZE 1024)

  2. 內核 / 用戶空間內存拷貝問題,select需要復制大量的句柄數(shù)據(jù)結構,產生巨大的開銷;
    select返回的是含有整個句柄的數(shù)組,應用程序需要遍歷整個數(shù)組才能發(fā)現(xiàn)哪些句柄發(fā)生了事件;

  3. select的觸發(fā)方式是水平觸發(fā),應用程序如果沒有完成對一個已經就緒的文件描述符進行IO操作,那么之后每次select調用還是會將這些文件描述符通知進程。

設想一下如下場景:有100萬個客戶端同時與一個服務器進程保持著TCP連接。而每一時刻,通常只有幾百上千個TCP連接是活躍的(事實上大部分場景都是這種情況)。如何實現(xiàn)這樣的高并發(fā)?

粗略計算一下,一個進程最多有1024個文件描述符,那么我們需要開1000個進程來處理100萬個客戶連接。如果我們使用select模型,這1000個進程里某一段時間內只有數(shù)個客戶連接需要數(shù)據(jù)的接收,那么我們就不得不輪詢1024個文件描述符以確定究竟是哪個客戶有數(shù)據(jù)可讀,想想如果1000個進程都有類似的行為,那系統(tǒng)資源消耗可有多大??!

針對select模型的缺點,epoll模型被提出來了!

epoll模型的優(yōu)點

  • 支持一個進程打開大數(shù)目的socket描述符

  • IO效率不隨FD數(shù)目增加而線性下降

  • 使用mmap加速內核與用戶空間的消息傳遞

epoll的兩種工作模式

  • LT(level triggered,水平觸發(fā)模式)是缺省的工作方式,并且同時支持 block 和 non-block socket。在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續(xù)通知你的,所以,這種模式編程出錯誤可能性要小一點。比如內核通知你其中一個fd可以讀數(shù)據(jù)了,你趕緊去讀。你還是懶懶散散,不去讀這個數(shù)據(jù),下一次循環(huán)的時候內核發(fā)現(xiàn)你還沒讀剛才的數(shù)據(jù),就又通知你趕緊把剛才的數(shù)據(jù)讀了。這種機制可以比較好的保證每個數(shù)據(jù)用戶都處理掉了。

  • ET(edge-triggered,邊緣觸發(fā)模式)是高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變?yōu)榫途w時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,并且不會再為那個文件描述符發(fā)送更多的就緒通知,等到下次有新的數(shù)據(jù)進來的時候才會再次出發(fā)就緒事件。簡而言之,就是內核通知過的事情不會再說第二遍,數(shù)據(jù)錯過沒讀,你自己負責。這種機制確實速度提高了,但是風險相伴而行。

epoll模型API

#include <sys/epoll.h> /* 創(chuàng)建一個epoll的句柄,size用來告訴內核需要監(jiān)聽的數(shù)目一共有多大。當創(chuàng)建好epoll句柄后,
它就是會占用一個fd值,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。*/int epoll_create(int size);  

/*epoll的事件注冊函數(shù)*/int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 

/*等待事件的到來,如果檢測到事件,就將所有就緒的事件從內核事件表中復制到它的第二個參數(shù)events指向的數(shù)組*/int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

epoll的事件注冊函數(shù)epoll_ctl,第一個參數(shù)是 epoll_create() 的返回值,第二個參數(shù)表示動作,使用如下三個宏來表示:

POLL_CTL_ADD    //注冊新的fd到epfd中;EPOLL_CTL_MOD    //修改已經注冊的fd的監(jiān)聽事件;EPOLL_CTL_DEL    //從epfd中刪除一個fd;

struct epoll_event 結構如下:

typedef union epoll_data
{    void        *ptr;    int          fd;    __uint32_t   u32;    __uint64_t   u64;
} epoll_data_t;struct epoll_event 
{    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */};

epoll_event結構體中的events 可以是以下幾個宏的集合:

EPOLLIN     //表示對應的文件描述符可以讀(包括對端SOCKET正常關閉);EPOLLOUT    //表示對應的文件描述符可以寫;EPOLLPRI    //表示對應的文件描述符有緊急的數(shù)據(jù)可讀(這里應該表示有帶外數(shù)據(jù)到來);EPOLLERR    //表示對應的文件描述符發(fā)生錯誤;EPOLLHUP    //表示對應的文件描述符被掛斷;EPOLLET     //將EPOLL設為邊緣觸發(fā)(Edge Triggered)模式,這是相對于水平觸發(fā)(Level Triggered)來說的。EPOLLONESHOT//只監(jiān)聽一次事件,當監(jiān)聽完這次事件之后,如果還需要繼續(xù)監(jiān)聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里。

epoll的一個簡單使用范例

#include <sys/socket.h>#include <sys/epoll.h>#include <netinet/in.h>#include <arpa/inet.h>#include <fcntl.h>#include <unistd.h>#include <stdio.h>#include <errno.h>#include <stdlib.h>#include <string.h>#define MAXLINE 5#define OPEN_MAX 100#define LISTENQ 20#define SERV_PORT 5000#define INFTIM 1000void setnonblocking(int sock){    int opts;
    opts=fcntl(sock,F_GETFL);    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");        exit(1);
    }
    opts = opts|O_NONBLOCK;    if(fcntl(sock,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");        exit(1);
    }
}int main(int argc, char* argv[]){    int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber;    ssize_t n;    char line[MAXLINE];    socklen_t clilen;    if ( 2 == argc )
    {        if( (portnumber = atoi(argv[1])) < 0 )
        {            fprintf(stderr,"Usage:%s portnumber/a/n",argv[0]);            return 1;
        }
    }    else
    {        fprintf(stderr,"Usage:%s portnumber/a/n",argv[0]);        return 1;
    }    //聲明epoll_event結構體的變量,ev用于注冊事件,數(shù)組用于回傳要處理的事件

    struct epoll_event ev,events[20];    //生成用于處理accept的epoll專用的文件描述符

    epfd=epoll_create(256);    struct sockaddr_in clientaddr;    struct sockaddr_in serveraddr;
    listenfd = socket(AF_INET, SOCK_STREAM, 0);    //把socket設置為非阻塞方式

    //setnonblocking(listenfd);

    //設置與要處理的事件相關的文件描述符

    ev.data.fd=listenfd;    //設置要處理的事件類型

    ev.events=EPOLLIN|EPOLLET;    //ev.events=EPOLLIN;

    //注冊epoll事件

    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;    char *local_addr="127.0.0.1";
    inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber);

    serveraddr.sin_port=htons(portnumber);
    bind(listenfd,(struct sockaddr *)&serveraddr, sizeof(serveraddr));
    listen(listenfd, LISTENQ);
    maxi = 0;    for ( ; ; ) {        //等待epoll事件的發(fā)生

        nfds=epoll_wait(epfd,events,20,500);        //處理所發(fā)生的所有事件

        for(i=0;i<nfds;++i)
        {            if(events[i].data.fd==listenfd)//如果新監(jiān)測到一個SOCKET用戶連接到了綁定的SOCKET端口,建立新的連接。

            {
                connfd = accept(listenfd,(struct sockaddr *)&clientaddr, &clilen);                if(connfd<0){
                    perror("connfd<0");                    exit(1);
                }                //setnonblocking(connfd);

                char *str = inet_ntoa(clientaddr.sin_addr);                printf("accapt a connection from\n ");                //設置用于讀操作的文件描述符

                ev.data.fd=connfd;                //設置用于注測的讀操作事件

                ev.events=EPOLLIN|EPOLLET;                //ev.events=EPOLLIN;

                //注冊ev

                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
            }            else if(events[i].events&EPOLLIN)//如果是已經連接的用戶,并且收到數(shù)據(jù),那么進行讀入。

            {                printf("EPOLLIN\n");                if ( (sockfd = events[i].data.fd) < 0)                    continue;                if ( (n = read(sockfd, line, MAXLINE)) < 0) {                    if (errno == ECONNRESET) {
                        close(sockfd);
                        events[i].data.fd = -1;
                    } else
                        printf("readline error\n");
                } else if (n == 0) {
                    close(sockfd);
                    events[i].data.fd = -1;
                }                if(n<MAXLINE-2)
                    line[n] = '\0';                //設置用于寫操作的文件描述符

                ev.data.fd=sockfd;                //設置用于注測的寫操作事件

                ev.events=EPOLLOUT|EPOLLET;                //修改sockfd上要處理的事件為EPOLLOUT

                //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);

            }            else if(events[i].events&EPOLLOUT) // 如果有數(shù)據(jù)發(fā)送

            {
                sockfd = events[i].data.fd;
                write(sockfd, line, n);                //設置用于讀操作的文件描述符

                ev.data.fd=sockfd;                //設置用于注測的讀操作事件

                ev.events=EPOLLIN|EPOLLET;                //修改sockfd上要處理的事件為EPOLIN

                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
            }
        }
    }    return 0;
}

帶ET和LT雙模式的epoll服務器

#include <stdio.h>#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <unistd.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <pthread.h>#include <errno.h>#include <stdbool.h>#define MAX_EVENT_NUMBER 1024  //event的最大數(shù)量#define BUFFER_SIZE 10      //緩沖區(qū)大小#define ENABLE_ET  1       //是否啟用ET模式/* 將文件描述符設置為非擁塞的  */int SetNonblocking(int fd){    int old_option = fcntl(fd, F_GETFL);    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);    return old_option;
}/* 將文件描述符fd上的EPOLLIN注冊到epoll_fd指示的epoll內核事件表中,參數(shù)enable_et指定是否對fd啟用et模式 */void AddFd(int epoll_fd, int fd, bool enable_et){    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN; //注冊該fd是可讀的
    if(enable_et)
    {
        event.events |= EPOLLET;
    }

    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);  //向epoll內核事件表注冊該fd
    SetNonblocking(fd);
}/*  LT工作模式特點:穩(wěn)健但效率低 */void lt_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd){    char buf[BUFFER_SIZE];    int i;    for(i = 0; i < number; i++) //number: 就緒的事件數(shù)目
    {        int sockfd = events[i].data.fd;        if(sockfd == listen_fd)  //如果是listen的文件描述符,表明有新的客戶連接到來
        {            struct sockaddr_in client_address;            socklen_t client_addrlength = sizeof(client_address);            int connfd = accept(listen_fd, (struct sockaddr*)&client_address, &client_addrlength);
            AddFd(epoll_fd, connfd, false);  //將新的客戶連接fd注冊到epoll事件表,使用lt模式
        }        else if(events[i].events & EPOLLIN) //有客戶端數(shù)據(jù)可讀
        {            // 只要緩沖區(qū)的數(shù)據(jù)還沒讀完,這段代碼就會被觸發(fā)。這就是LT模式的特點:反復通知,直至處理完成
            printf("lt mode: event trigger once!\n");            memset(buf, 0, BUFFER_SIZE);            int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);            if(ret <= 0)  //讀完數(shù)據(jù)了,記得關閉fd
            {
                close(sockfd);                continue;
            }            printf("get %d bytes of content: %s\n", ret, buf);

        }        else
        {            printf("something unexpected happened!\n");
        }
    }
}/* ET工作模式特點:高效但潛在危險 */void et_process(struct epoll_event* events, int number, int epoll_fd, int listen_fd){    char buf[BUFFER_SIZE];    int i;    for(i = 0; i < number; i++)
    {        int sockfd = events[i].d
    http://www.cnblogs.com/skyfsm/p/7102367.html