Epoll

Epoll

Linux事件驱动机制 #

在本学人机交互课程的时候,用到过一个渲染库,由于需要监听很多组件的点击和键盘事件,主函数中有一个巨大的轮询机制,并且当时还是写的单线程的处理,这当然是十分鸡肋的一种处理方式。

Linux中,事件驱动机制,也称为IO多路复用。是内核实现的一种机制,在内核2.6以后的版本中才有。

在Linux中,一切皆文件,文件即IO。文件、socket通信、进程间通信管道pipe都是IO事件,都使用统一的fd文件描述符(File Descriptor)来表示。

文件描述符 #

  • 每个文件描述符会与一个打开的文件相对应
  • 不同的文件描述符也可能指向同一个文件
  • 相同的文件可以被不同的进程打开,也可以在同一个进程被多次打开

系统为维护文件描述符,建立了三个表:

  • 进程级的文件描述符表
  • 系统级的文件描述符表
  • 文件系统的i-node表

每一个进程都会维护一个fd表,一般一个进程允许打开的最大文件数量是1024。

事件驱动机制 #

应用程序层面的事件通知机制总是绕不过轮询这个天坑,轮询带来的问题至少有三个:

  1. 不加限制的for循环必然导致cpu跑满
  2. 加了sleep的轮询必然导致一定的拥挤,不必要的等待。
  3. 大IO事件的阻塞,影响了后续的事件处理。

Linux在内核层面提供了epoll的机制来实现事件的通知,在没有事件的时候阻塞应用进程。做到CPU的0浪费。具体的实现方式在应用端只需要三个函数。

  • epoll_create, 创建一个fd池
  • epollctl,向fd池中注册需要监听的事件
  • epollwait,等待事件唤醒自己

我们先从整体了解一下epoll的内部实现机制。

内核中epoll的实现机制 #

Linux系统的一切皆文件的设计中,为文件对象设计了一个poll回调机制,即当某个fd的读写状态满足的时候,即发生poll回调,将fd相关的数据(epitem)塞入一个队列中,并通知操作系统唤醒应用进程。

由此我们可以知道,当应用进程使用epollctl函数注册了事件以后,调用epollwait函数即可进入实际的睡眠状态,直到操作系统接到poll回调将其唤醒。应用线程没有任何浪费的操作。

此处需要注意一点,虽然Linux中一切皆文件,但是并不是所有的文件对象都具有一致的接口,poll接口就不是都支持的,朴素意义(狭义)的文件系统是不支持的。socket是支持的。所以epoll常用于网络IO的处理中。

代码示例 #

关于socket的知识:

  1. int socket(int domain, int type, int protocol); 函数对应一切皆文件的open操作;

  2. AF_INET: IPv4 Internet协议

  3. SOCK_STREAM: TCP字节流

  4. int bind(int sockfd, const struct sockaddr * addr, socklen_t addrlen); 分配一个具体的地址给sockfd

  5. listen() 函数让socket处于监听的状态,此时如果有请求进来,就放入缓冲区,直到缓冲区满;中间调用accept() 函数即可以从缓冲区获取一个请求。listen 非阻塞。

  6. accept() 函数会阻塞进程,并创建一个新的socket,这个socket记录了客户端的ip等信息,后续与客户端通信都是使用这个新的socket。accept只是建立了这个通信的通道,之后的处理要用read和write来处理消息。在epoll中这是两个事件。需要再监听这个新的socket才能在有数据的时候唤醒应用进程。

服务端:


#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/types.h>

#define IPADDRESS   "127.0.0.1"
#define PORT        8787
#define MAXSIZE     1024
#define LISTENQ     5
#define FDSIZE      1000
#define EPOLLEVENTS 100

//创建套接字并进行绑定
static int socket_bind(const char* ip,int port);

//IO多路复用epoll
static void do_epoll(int listenfd);

//事件处理函数
static void handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf);

//处理接收到的连接
static void handle_accpet(int epollfd,int listenfd);

//读处理
static void do_read(int epollfd,int fd,char *buf);

//写处理
static void do_write(int epollfd,int fd,char *buf);

//添加事件
static void add_event(int epollfd,int fd,int state);

//修改事件
static void modify_event(int epollfd,int fd,int state);

//删除事件
static void delete_event(int epollfd,int fd,int state);

int main(int argc,char *argv[])
{
    int  listenfd;
    listenfd = socket_bind(IPADDRESS,PORT);
    listen(listenfd,LISTENQ);
    do_epoll(listenfd);
    return 0;
}

static int socket_bind(const char* ip,int port)
{
    int  listenfd;
    struct sockaddr_in servaddr;
    listenfd = socket(AF_INET,SOCK_STREAM,0);
    if (listenfd == -1)
    {
        perror("socket error:");
        exit(1);
    }
    memset(*servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    inet_pton(AF_INET,ip,&servaddr.sin_addr);
    servaddr.sin_port = htons(port);
    if (bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) == -1)
    {
        perror("bind error: ");
        exit(1);
    }
    return listenfd;
}

static void do_epoll(int listenfd)
{
    int epollfd;
    struct epoll_event events[EPOLLEVENTS];
    int ret;
    char buf[MAXSIZE];
    memset(buf,0,MAXSIZE);
    //创建一个描述符
    epollfd = epoll_create(FDSIZE);
    //添加监听描述符事件
    add_event(epollfd,listenfd,EPOLLIN);
    for ( ; ; )
    {
        //获取已经准备好的描述符事件
        ret = epoll_wait(epollfd,events,EPOLLEVENTS,-1);
        handle_events(epollfd,events,ret,listenfd,buf);
    }
    close(epollfd);
}

static void
handle_events(int epollfd,struct epoll_event *events,int num,int listenfd,char *buf)
{
    int i;
    int fd;
    //进行选好遍历
    for (i = 0;i < num;i++)
    {
        fd = events[i].data.fd;
        //根据描述符的类型和事件类型进行处理
        if ((fd == listenfd) &&(events[i].events & EPOLLIN))
            handle_accpet(epollfd,listenfd);
        else if (events[i].events & EPOLLIN)
            do_read(epollfd,fd,buf);
        else if (events[i].events & EPOLLOUT)
            do_write(epollfd,fd,buf);
    }
}
static void handle_accpet(int epollfd,int listenfd)
{
    int clifd;
    struct sockaddr_in cliaddr;
    socklen_t  cliaddrlen;
    clifd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
    if (clifd == -1)
        perror("accpet error:");
    else
    {
        printf("accept a new client: %s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port);
        //添加一个客户描述符和事件
        add_event(epollfd,clifd,EPOLLIN);
    }
}

static void do_read(int epollfd,int fd,char *buf)
{
    int nread;
    nread = read(fd,buf,MAXSIZE);
    if (nread == -1)
    {
        perror("read error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else if (nread == 0)
    {
        fprintf(stderr,"client close.\n");
        close(fd);
        delete_event(epollfd,fd,EPOLLIN);
    }
    else
    {
        printf("read message is : %s",buf);
        //修改描述符对应的事件,由读改为写
        modify_event(epollfd,fd,EPOLLOUT);
    }
}

static void do_write(int epollfd,int fd,char *buf)
{
    int nwrite;
    nwrite = write(fd,buf,strlen(buf));
    if (nwrite == -1)
    {
        perror("write error:");
        close(fd);
        delete_event(epollfd,fd,EPOLLOUT);
    }
    else
        modify_event(epollfd,fd,EPOLLIN);
    memset(buf,0,MAXSIZE);
}

static void add_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    Site is undergoing maintenance = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev);
}

static void delete_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    ev.events = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
}

static void modify_event(int epollfd,int fd,int state)
{
    struct epoll_event ev;
    Site is undergoing maintenance = state;
    ev.data.fd = fd;
    epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev);
}

客户端代码