掘金 后端 ( ) • 2024-04-30 15:01

多路转接IO

多路转接IO(Multiplexed IO)是一种IO模型,它允许一个进程同时监听多个IO事件,并在任何一个IO事件准备就绪时进行处理。多路复用IO通常与事件驱动机制结合使用,可以大大提高程序的并发性能和响应速度。

常见的多路复用IO机制包括:

  1. select:select是Unix系统中最早引入的多路复用IO机制之一,它允许程序同时监听多个文件描述符上的IO事件,并在任何一个IO事件准备就绪时进行处理。select函数的参数包括文件描述符集合和超时时间,在超时时间内阻塞等待IO事件发生。
  2. poll:poll是一种更高效的多路复用IO机制,与select类似,它也允许程序同时监听多个文件描述符上的IO事件,但是不同于select的是,poll使用一个数据结构来保存要监听的文件描述符,而不是使用文件描述符集合。poll函数不需要修改文件描述符的最大限制,并且没有文件描述符集合的大小限制。
  3. epoll:epoll是Linux系统中引入的高性能多路复用IO机制,它通过使用事件驱动的方式来监听IO事件,可以有效地处理大量并发的IO请求。epoll使用一组特殊的系统调用来操作事件集合,并使用内核事件表来保存要监听的事件,因此具有较低的开销和更高的效率。

这篇文章会介绍这三种多路转接IO机制

多路转接IO之Select

select 是 Linux 系统中最早引入的一种多路转接 IO 模型。它允许程序监视多个文件描述符,等待其中任何一个描述符就绪后,就通知程序进行相应的 I/O 操作。

使用 select 的一般步骤如下:

  1. 准备文件描述符集合(通常是使用 fd_set 数据结构):将需要监视的文件描述符添加到文件描述符集合中。
  2. 调用 select 函数:设置超时时间并调用 select 函数,使程序进入阻塞状态,等待文件描述符集合中的任何一个描述符就绪或超时。
  3. 检查就绪的文件描述符:当 select 函数返回时,程序需要遍历文件描述符集合,检查哪些文件描述符已经就绪(可读、可写、有异常等)。
  4. 处理就绪的文件描述符:根据文件描述符的状态进行相应的 I/O 操作。

通过man手册 认识一下select函数 man 2 select

image.png

  • 函数原型
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
  • 函数参数

    • nfds:指定待监视的最大文件描述符值加一。
    • readfds:指向可读事件文件描述符集合的指针。调用时用户告知内核需要监视哪些文件描述符的读事件是否就绪,返回时内核告知用户哪些文件描述符的读事件已经就绪。不关心设置为nullptr即可。
    • writefds:指向可写事件文件描述符集合的指针。调用时用户告知内核需要监视哪些文件描述符的写事件是否就绪,返回时内核告知用户哪些文件描述符的写事件已经就绪。不关心设置为nullptr即可。
    • exceptfds:指向异常事件文件描述符集合的指针。调用时用户告知内核需要监视哪些文件描述符的异常事件是否就绪,返回时内核告知用户哪些文件描述符的异常事件已经就绪。不关心设置为nullptr即可。
    • timeout:用来设置select()的等待时间。调用时由用户设置select的等待时间,返回时表示timeout的剩余时间。
  • 参数timeout取值

    • NULL:则表示select()没有timeout,select将一直被阻塞,直到某个文件描述符上发生了事件;
    • 0:仅检测描述符集合的状态,然后立即返回,并不等待外部事件的发生
    • 特定的时间值:如果在指定的时间段里没有事件发生,select将超时返回
  • 函数返回值

    • 如果函数调用成功,则返回有事件就绪的文件描述符个数。
    • 如果timeout时间耗尽,则返回0。
    • 调用失败,返回-1,同时错误码被设置。
  • select函数使用了一组宏来操作文件描述符的集合。

    • FD_CLR(int fd, fd_set *set):从文件描述符集合中清除指定文件描述符。
    • FD_ISSET(int fd, fd_set *set):检查指定文件描述符是否在集合中。
    • FD_SET(int fd, fd_set *set):将指定文件描述符添加到集合中。
    • FD_ZERO(fd_set *set):清空文件描述符集合。

简单的select使用样例:

#include <iostream>
#include <sys/select.h>
#include <unistd.h>
#include <cstring>
#include <cerrno>

int main()
{
    // 创建文件描述符集合 并将将标准输入文件描述符添加到集合中
    fd_set read_fds;
    FD_ZERO(&read_fds);
    FD_SET(0, &read_fds);

    // 设置超时时间为5秒
    struct timeval timeout;
    timeout.tv_sec = 5;
    timeout.tv_usec = 0; // 设置微妙

   
    while (true)
    {
        // 调用select函数等待标准输入就绪
        int ret = select(0 + 1, &read_fds, nullptr, nullptr, &timeout);
        if (ret < 0) // select 函数调用失败,输出错误信息
        {
            std::cerr << "select error: " << strerror(errno) << std::endl;
        }
        else if (ret == 0) // 超时,没有文件描述符就绪
        {
            std::cout << "Timeout" << std::endl;
            sleep(1);
        }
        else
        {
            // 检查标准输入是否就绪
            if (FD_ISSET(0, &read_fds))
            {
                // 从标准输入中读取
                char buffer[1024];
                ssize_t read_size = read(0, buffer, sizeof(buffer));
                if (read_size < 0)
                {
                    // read 函数调用失败,输出错误信息
                    std::cerr << "read error: " << strerror(errno) << std::endl;
                }
                else if (read_size == 0)
                {
                    // 读到文件结尾,输出提示信息
                    std::cout << "End of file" << std::endl;
                }
                else
                {
                    // 成功读取数据,输出读取的内容
                    buffer[read_size] = '\0';
                    std::cout << "Read from stdin: " << buffer << std::endl;
                }
            }
        }
    }
}

使用select监听标准输入的简单demo,通过这个示例先简单的了解select的用法。

上面的代码只关心的标准输入的读事件是否就绪,并且设置了超时时间。如果3秒后标准输入没有输入任何东西,无法从标准输入缓冲区中进行读取,那么就会每隔一秒输出time out信息。

运行结果:

recording.gif

当然 上面的代码超时后,再想向标准输入进行输入就无法进行了。解决这个问题,可以在 select 超时后重新设置文件描述符集合,然后再次调用 select 函数来等待新的输入事件。就是把上面12-17行代码写到循里面即可。

只有一个文件名描述符,是毫无意思的。select主要用在网络编程中。在服务器编程中,一个常见的场景是同时监听多个客户端的连接请求。使用 select 函数可以监视所有客户端的套接字,以确定哪些套接字已经准备好进行读取或写入操作。

Select服务器

实现一个简单的select服务器,用来接受客户端发送的消息并回显给客户端

编写服务器少不了socket,这里先封装一个socket

#pragma once
#include <iostream>
#include <cstring>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
class Sock
{
private:
    const static int backlog = 20;
public:
    Sock(){}
    //初始化套接字
    int Socket()
    {
        //SOCK_STREAM tcp通信协议
        int listen_sock = socket(AF_INET,SOCK_STREAM,0);
        if(listen_sock >= 0 )
        {
            LogMessage(NORMAL,"create socket succseeful");
        }
        else
        {
            LogMessage(FATAL,"create sockt fail %d:%s",errno,strerror(errno));
            exit(1);
        }
        //设置端口复用
		int opt = 1;
		setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
        return listen_sock;
    }

    //绑定ip和端口
    bool Bind(int socket,uint16_t port,std::string ip="")
    {
        struct sockaddr_in src;
        memset(&src,0,sizeof(src));
        src.sin_family = AF_INET;
        src.sin_port = htons(port);
        src.sin_addr.s_addr = ip.empty() ?  INADDR_ANY : inet_addr(ip.c_str());

        int bind_ret = bind(socket,(struct sockaddr*)&src,sizeof(src));
        if(bind_ret == -1)
        {
            LogMessage(FATAL,"bind fail %d:%s",errno,strerror(errno));
            exit(2);
        }
        LogMessage(NORMAL,"bind succseeful");
        return true;
    }
    //设置监听转态
    bool Listen(int socket)
    {
        int listen_ret = listen(socket,backlog);
        if(listen_ret == -1)
        {
            LogMessage(FATAL,"listen fail %d:%s",errno,strerror(errno));
            exit(3);
        }
        LogMessage(NORMAL,"listen succseeful");
        return true;
    }
    //获取新连接
    int Accept(int socket,std::string* ip,uint16_t* port)
    {
        struct sockaddr_in src;
        socklen_t len = sizeof(src);
        int accept_ret = accept(socket,(struct sockaddr*)&src,&len);
        if(accept_ret < 0 )
        {
            LogMessage(FATAL,"accept fail %d:%s",errno,strerror(errno));
            exit(4);
        }
        *ip = inet_ntoa(src.sin_addr);
        *port = ntohs(src.sin_port);
        return accept_ret;
    }
    //建立连接
    bool Connect(int socket,std::string ip,uint16_t port)
    {
        struct sockaddr_in server;
        memset(&server, 0, sizeof(server));
        server.sin_family = AF_INET;
        server.sin_port = htons(port);
        server.sin_addr.s_addr = inet_addr(ip.c_str());
        int connect_ret = connect(socket,(struct sockaddr*)&server,sizeof(server));

        if(connect_ret < 0)
        {
            LogMessage(FATAL,"connect fail %d:%s",errno,strerror(errno));
            exit(5);
        }
        return true;
    }
    ~Sock()
    {}
};

封装了一个日志 方便查看错误信息

// 日志功能
#pragma once
#include <iostream>
#include <ctime>
#include <stdio.h>
#include <stdarg.h>

#define DEBUG 0
#define NORMAL 1
#define WARING 2
#define ERROR 3
#define FATAL 4

#define FILE_NAME "./log.txt"

const char *gLevelMap[] = {
    "DEBUGE",
    "NORMAL",
    "WARING",
    "ERROR",
    "FATAL"};
// 完整的日志功能,至少: 日志等级 时间 支持用户自定义(日志内容, 文件行,文件名)
void LogMessage(int level, const char *format, ...)
{
    // 日志标准部分
    char stdbuffer[1024];
    time_t get_time = time(nullptr);
    // struct tm *localtime(const time_t *timer)
    struct tm *info = localtime(&get_time);
    snprintf(stdbuffer, sizeof(stdbuffer), "[%s]|[%d:%d:%d]", gLevelMap[level], info->tm_year + 1900, info->tm_mon, info->tm_mday);

    // 日志自定义部分
    char logbuffer[1024];
    // 获取可变参数列表
    va_list valist;
    // 为 num 个参数初始化 valist
    va_start(valist, format);
    vsnprintf(logbuffer, sizeof(logbuffer), format, valist);
    // 清理为 valist 保留的内存
    va_end(valist);
    
    //打印完整的日志信息(可重定向到日志文件中)
    
    std::cout << stdbuffer << logbuffer << std::endl;

    // FILE* fp = fopen(FILE_NAME,"a");
    // fprintf(fp,"%s %s\n",stdbuffer,logbuffer);
    // fclose(fp);
}

select类

先把服务器建立完成了,然后在这基础上改造成为select服务器。

#include "Socket.hpp"
#include "log.hpp"

static const uint16_t DEFAULT_PORT = 8080;//默认端口号

class Select
{
public:
    //初始化服务器
    Select(uint16_t port = DEFAULT_PORT):_port(port)
    {
        _listen_fd = _listen_sock.Socket();//创建套接字
        _listen_sock.Bind(_listen_fd,_port);
        _listen_sock.Listen(_listen_fd);
        LogMessage(DEBUG,"SELECT INIT SUCCESS LISTEN_FD = %d",_listen_fd);
    }
    //服务器启动
    void Start()
    {
        std::string ip = "";
        uint16_t port = 0;
        while(true)
        {
            if(_listen_sock.Accept(_listen_fd,&ip,&port) < 0)
                continue;
            LogMessage(NORMAL, "link success %s:%u", ip.c_str(), port);
        }
    }   
    ~Select()
    {

    }
private:
    Sock _listen_sock;//套接字
    int _listen_fd;//套接字描述符
    uint16_t _port;//端口号
};

select服务器要监听大量的套接字描述符,这里将多个文件描述符存放到数组中。添加一个成员变量_fd_array。 而套接字的数量是有限的。这就和fd_set有关了。fd_set是一个结构体。在内核中如下:

image.png

他本质是一个位图。每个位(bit)表示一个文件描述符的状态,通常用 0 表示未就绪,用 1 表示已就绪。 也可以当作数组。数组的每个元素对应一个文件描述符。

在内核中,通过__FD_SETSIZE就能获取当前系统的最大文件描述符的数量。

image.png

所以在select服务器中,监听套接字是有上限的。即_fd_array数组最大个数为__FD_SETSIZE个。 那么在初始化服务器的时候要把这个数组也要初始化,初始化为-1比较合理,因为文件描述符是从0开始的,初始化为-1方便后面进行判断。

#include "Socket.hpp"
#include "log.hpp"

static const uint16_t DEFAULT_PORT = 8080;//默认端口号

class Select
{
public:
    //初始化服务器
    Select(uint16_t port = DEFAULT_PORT):_port(port)
    {
        _listen_fd = _listen_sock.Socket();//创建套接字
        _listen_sock.Bind(_listen_fd,_port);
        _listen_sock.Listen(_listen_fd);
        LogMessage(DEBUG,"SELECT INIT SUCCESS LISTEN_FD = %d",_listen_fd);
        // 初始化文件描述符数组
        for (int i = 0; i < __FD_SETSIZE; ++i)
        {
            _fd_array[i] = default_fd;
        }
    }
    //服务器启动
    void Start()
    {
        std::string ip = "";
        uint16_t port = 0;
        while(true)
        {
            if(_listen_sock.Accept(_listen_fd,&ip,&port) < 0)
                continue;
            LogMessage(NORMAL, "link success %s:%u", ip.c_str(), port);
        }
    }   
    ~Select()
    {

    }
private:
    Sock _listen_sock;//套接字
    int _listen_fd;//套接字描述符
    uint16_t _port;//端口号
    int _fd_array[__FD_SETSIZE];//被监听的文件描述符数组
};

写到select,思路必须有所转化。select他是单线程的,而他的这种行为却很像多线程,一个线程进行监听,监听到了有文件描述符准备好了,就让另一个线程进行处理。而select只是单进程,在调用 select 函数时,进程会传递一个文件描述符集合给操作系统,告诉操作系统要监听哪些文件描述符。然后,操作系统会阻塞进程,直到有文件描述符就绪或超时。

因此,虽然是进程发起了 select 函数的调用,但实际的监听工作是由操作系统来完成的。操作系统会在后台监视这些文件描述符,并在有就绪事件发生时通知进程。这样,进程就可以在监听和处理文件描述符的就绪事件之间切换,实现了并发处理多个连接的能力。

select这种并发编程和多线程的本质区别就是多线程在监听的时候也是进程本身在监听,而select是把监听这个任务交给了操作系统

所以在写select服务器的时候,服务器启动了,直接进行接收新连接,而是调用select让操作系统进行监听。

Start函数

重写一下start函数

void Start()
{
    //将监听的套接字添加到文件描述符集合中
    _fd_array[0] = _listen_fd;
    for (;;)
    {
        //定义文件描述符集合并初始化为空
        fd_set rfds;
        FD_ZERO(&rfds);
        //保存一下最大的文件描述符
        int maxfd = _fd_array[0];
        for (int i = 0; i < __FD_SETSIZE; ++i)
        {
            if (_fd_array[i] == default_fd)
                continue;

            FD_SET(_fd_array[i], &rfds);
            if (maxfd < _fd_array[i])
            {
                maxfd = _fd_array[i];
                LogMessage(DEBUG, "MAX FD IS UPDATA ,MAX FD IS %d", maxfd);
            }
        }
        //设置超时时间
        struct timeval timeout = {0, 0};
        int ret = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
        switch (ret)
        {
        case 0:
            LogMessage(DEBUG, "TIME OUT TIME:%d,%d", timeout.tv_sec, timeout.tv_usec);
            sleep(1);
            break;
        case -1:
            LogMessage(DEBUG, "SELECT FAIL");
            break;
        default:
            // 有新连接 进行处理
            LogMessage(DEBUG, "GET A NEW LINK");
            Dispatcher(rfds);
            break;
        }
    }
}

服务器在启动的时候,就应该让select去进行监听,监听到有文件描述符准备就绪的时候,通知进程进行处理。当然就绪的文件描述符可能不止一个,这里传给Dispatcher把整个集合传过去。一般称Dispatcher为分配器。

分配器实现

当操作系统监听到有文件描述符准备就绪的时候,就通知给进程进行处理(这里只关心读状态)。

void Dispatcher(fd_set &rfds)
{
    //遍历准备就绪的文件描述符集合
    for (int i = 0; i < __FD_SETSIZE; ++i)
    {
        int fd = _fd_array[i];
        if (fd == default_fd)
            continue;
        //检查是否就绪
        if (FD_ISSET(fd, &rfds))
        {
            //是否为监听套接字
            if (fd == _listen_fd)
            {
                Accepter();
            }
            else
            {
                Recver(fd, i);
            }
        }
    }
}
  • 进程拿到准备就绪的文件描述符集合的时候,遍历这组集合,检查哪些文件描述符已经就绪。
  • 如果已经就绪,进一步判断当前文件描述符是监听套接字还是其他套接字。
  • 如果是监听到的套接字就进行调用 Accepter 函数接受新的连接。
  • 否则调用 Recver 函数处理接收数据的操作。

Accepter实现

从准备就绪的文件描述符集合中获取到监听套接字之后,就开始接受新的连接。这个过程在tcp协议中就涉及到了三次握手建立连接了。

 void Accepter()
{

    std::string client_ip;
    uint16_t client_port;
    int sock = _listen_sock.Accept(_listen_fd, &client_ip, &client_port);
    if (sock < 0)
    {
        return;
    }
    LogMessage(NORMAL, "link success %s:%u", client_ip.c_str(), client_port);
    //从_fd_array找到空位置存放新的套接字信息
    int pos = 1;
    for (; pos < __FD_SETSIZE; ++pos)
    {
        if (_fd_array[pos] != default_fd)
        {
            continue;
        }
        else
        {
            break;
        }
    }
    //满了 无法存放新连接的套接字
    if (pos == __FD_SETSIZE)
    {
        LogMessage(WARING, "SERVER IS FULL ,CLOSE SOCKET %d NOW", sock);
    }
    else
    {
        //找到了空闲位置,存放新接受的套接字信息
        _fd_array[pos] = sock;
        PrinfFd();
    }
}
  • 调用accept接受新的连接,获取到客户端的ip地址以及端口号。
  • 连接成功打印一下日志 方便查看
  • _fd_array 数组中找到一个空闲的位置,用于存放新的套接字。操作系统就会再次监听到有新的文件描述符准备就绪,通知进程进行处理,然后分配器执行的时候,判断不是监听的套接字,就会从这个套接字中进行Recver操作。
  • 如果 _fd_array 已经满了,无法找到空闲位置,打印警告日志,关闭新接受的套接字。
  • 如果找到了空闲位置,将新接受的套接字存放到 _fd_array 数组中的空闲位置,并打印当前所有套接字的信息。

打印一下当前在线的套接字信息

void PrinfFd()
{
    std::cout << "online fd list";
    for (int i = 0; i < __FD_SETSIZE; ++i)
    {
        if (_fd_array[i] == default_fd)
            continue;

        std::cout << _fd_array[i] << " ";
    }
}

Recver实现

void Recver(int fd, int pos)
{
    char buffer[1024];
    ssize_t read_size = read(fd, buffer, sizeof(buffer - 1));
    if (read_size > 0)
    {
        buffer[read_size - 1] = '\0';
        std::cout << "ECHO# " << buffer << std::endl;
        sleep(1000);
    }
    else if (read_size == 0)
    {
        LogMessage(DEBUG, "CLIENT QUIT,CLOSE FD IS %d", fd);
        close(fd);
        _fd_array[pos] = default_fd;
    }
    else
    {
        LogMessage(DEBUG, "RECV ERROR,FD IS  %d", fd);
        close(fd);
        _fd_array[pos] = default_fd;
    }
}

Recver实现就相对比较简单了,直接从套接字中读取即可,读取到数据存放到数组中即可。 然后简单的回显一下读取到的信息。

完整的select服务器类

#include "Socket.hpp"
#include "log.hpp"

static const uint16_t DEFAULT_PORT = 8080; // 默认端口号
int default_fd = -1;
class Select
{
public:
    // 初始化服务器
    Select(uint16_t port = DEFAULT_PORT) : _port(port)
    {
        _listen_fd = _listen_sock.Socket();
        _listen_sock.Bind(_listen_fd, _port);
        _listen_sock.Listen(_listen_fd);
        LogMessage(DEBUG, "SELECT INIT SUCCESS LISTEN_FD = %d", _listen_fd);
        // 初始化文件描述符
        for (int i = 0; i < __FD_SETSIZE; ++i)
        {
            _fd_array[i] = default_fd;
        }
    }
    // 服务器启动
    void Start()
    {
        //将监听的套接字添加到文件描述符集合中
        _fd_array[0] = _listen_fd;
        for (;;)
        {
            //定义文件描述符集合并初始化为空
            fd_set rfds;
            FD_ZERO(&rfds);
            //保存一下最大的文件描述符
            int maxfd = _fd_array[0];
            for (int i = 0; i < __FD_SETSIZE; ++i)
            {
                if (_fd_array[i] == default_fd)
                    continue;

                FD_SET(_fd_array[i], &rfds);
                if (maxfd < _fd_array[i])
                {
                    maxfd = _fd_array[i];
                    LogMessage(DEBUG, "MAX FD IS UPDATA ,MAX FD IS %d", maxfd);
                }
            }
            //设置超时时间
            struct timeval timeout = {0, 0};
            int ret = select(maxfd + 1, &rfds, nullptr, nullptr, &timeout);
            switch (ret)
            {
            case 0:
                LogMessage(DEBUG, "TIME OUT TIME:%d,%d", timeout.tv_sec, timeout.tv_usec);
                sleep(1);
                break;
            case -1:
                LogMessage(DEBUG, "SELECT FAIL");
                break;
            default:
                // 有新连接 进行处理
                LogMessage(DEBUG, "GET A NEW LINK");
                Dispatcher(rfds);
                break;
            }
        }
    }
    ~Select()
    {
    }

private:
    void PrinfFd()
    {
        std::cout << "online fd list";
        for (int i = 0; i < __FD_SETSIZE; ++i)
        {
            if (_fd_array[i] == default_fd)
                continue;

            std::cout << _fd_array[i] << " ";
        }
    }
    // select 监听到就绪
    void Accepter()
    {

        std::string client_ip;
        uint16_t client_port;
        int sock = _listen_sock.Accept(_listen_fd, &client_ip, &client_port);
        if (sock < 0)
        {
            return;
        }
        LogMessage(NORMAL, "link success %s:%u", client_ip.c_str(), client_port);
        //从_fd_array找到空位置存放新的套接字信息
        int pos = 1;
        for (; pos < __FD_SETSIZE; ++pos)
        {
            if (_fd_array[pos] != default_fd)
            {
                continue;
            }
            else
            {
                break;
            }
        }
        //满了 无法存放新连接的套接字
        if (pos == __FD_SETSIZE)
        {
            LogMessage(WARING, "SERVER IS FULL ,CLOSE SOCKET %d NOW", sock);
        }
        else
        {
            //找到了空闲位置,存放新接受的套接字信息
            _fd_array[pos] = sock;
            PrinfFd();
        }
    }
    void Recver(int fd, int pos)
    {
        char buffer[1024];
        ssize_t read_size = read(fd, buffer, sizeof(buffer - 1));
        if (read_size > 0)
        {
            buffer[read_size - 1] = '\0';
            std::cout << "ECHO# " << buffer << std::endl;
            sleep(1000);
        }
        else if (read_size == 0)
        {
            LogMessage(DEBUG, "CLIENT QUIT,CLOSE FD IS %d", fd);
            close(fd);
            _fd_array[pos] = default_fd;
        }
        else
        {
            LogMessage(DEBUG, "RECV ERROR,FD IS  %d", fd);
            close(fd);
            _fd_array[pos] = default_fd;
        }
    }
    // 事件分配器 select 监听到有就绪的套接字 交给分配器处理
    void Dispatcher(fd_set &rfds)
    {
        //遍历准备就绪的文件描述符集合
        for (int i = 0; i < __FD_SETSIZE; ++i)
        {
            int fd = _fd_array[i];
            if (fd == default_fd)
                continue;
            //检查是否就绪
            if (FD_ISSET(fd, &rfds))
            {
                //是否为监听套接字
                if (fd == _listen_fd)
                {
                    Accepter();
                }
                else
                {
                    Recver(fd, i);
                }
            }
        }
    }

private:
    Sock _listen_sock; // 套接字
    int _listen_fd;    // 套接字描述符
    uint16_t _port;    // 端口号
    int _fd_array[__FD_SETSIZE];
};

以上就是关于select模型的简单实现了。

select的优点

  • 可以同时等待多个文件描述符,并且只负责等待,实际的IO操作由accept、read、write等接口来完成,这些接口在进行IO操作时不会被阻塞。
  • select同时等待多个文件描述符,因此可以将“等”的时间重叠,提高了IO的效率。
  • 可以监听任意类型的文件描述符,包括套接字、标准输入输出、管道等,使得程序的编写更加灵活多样。
  • select 函数是 POSIX 标准的一部分,几乎在所有主流操作系统上都有实现,因此具有良好的跨平台兼容性。
  • 可以设置超时时间,允许程序在一定时间内等待文件描述符就绪,避免了永久阻塞的情况。

select的缺点

  • 每次调用select,都需要手动设置fd集合,从接口使用角度来说也非常不便。
  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
  • 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
  • elect 函数有一定的限制,比如最大文件描述符数量的限制,通常是 1024。这个限制可以通过修改操作系统参数来调整,但在某些操作系统上可能会有限制。

多路转接之poll

poll 函数与 select 类似,也是一种 I/O 多路复用的机制,但相较于 select,它有一些优点和特点:

  1. 无文件描述符数量限制: poll 函数没有 select 的文件描述符数量限制,因此可以监听任意数量的文件描述符,不会受到 FD_SETSIZE 的限制。
  2. 无需维护文件描述符集合: 在使用 select 时,需要维护一组文件描述符的集合,而 poll 函数不需要这样做,它只需要传入一个指向 pollfd 结构体数组的指针即可。
  3. 事件通知更精确: poll 函数通过 pollfd 结构体中的 revents 字段来通知发生的事件,而不像 select 需要在每次调用后遍历整个文件描述符集合来判断哪些文件描述符发生了事件。
  4. 效率略高: 在某些场景下,poll 函数的效率可能比 select 高一些,因为它不需要复制文件描述符集合到内核空间,也不需要在内核空间和用户空间之间进行数据拷贝。
  5. 可移植性较好: poll 函数是 POSIX 标准的一部分,因此具有较好的跨平台兼容性,在几乎所有主流的操作系统上都有实现。

通过man手册认识poll函数man 2 poll

image.png poll函数原型

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • 函数参数

    • 参数fds: fds是一个poll函数监听的结构列表. 每一个元素中, 包含了三部分内容: 文件描述符, 监听的事件集合, 返 回的事件集合.
    • 参数nfds: 表示fds数组的长度。
    • 参数timeout: 表示poll函数的超时时间,单位是毫秒(ms)。对比select使用更简单。
  • 函数返回值:

    • 如果函数调用成功,则返回有事件就绪的文件描述符个数。
    • 如果timeout时间耗尽,则返回0。
    • 果函数调用失败,则返回-1,同时错误码会被设置。
  • poll调用失败时,错误码可能被设置为:

    • EFAULT:fds数组不包含在调用程序的地址空间中。
    • EINTR:此调用被信号所中断。
    • EINVAL:nfds值超过RLIMIT_NOFILE值。
    • ENOMEM:核心内存不足。

struct pollfd结构体

struct pollfd结构体包含三个成员:

  • fd:特定的文件描述符,若设置为负值则忽略events字段并且revents字段返回0。
  • events:需要监视该文件描述符上的哪些事件。
  • revents:poll函数返回时告知用户该文件描述符上的哪些事件已经就绪。
struct pollfd {
   int   fd;         /* file descriptor */
   short events;     /* requested events */
   short revents;    /* returned events */
};

events和revents的取值:

image.png

#include "Socket.hpp"
#include "log.hpp"
#include <poll.h>
#include <unistd.h>

static const uint16_t DEFAULT_PORT = 8080; // 默认端口号
static const int fd_num_max = 64;

int default_fd = -1;
int no_event = 0;
class Poll
{
public:
    Poll(uint16_t port = DEFAULT_PORT) : _port(port)
    {
        _listen_fd = _listen_sock.Socket();
        _listen_sock.Bind(_listen_fd, _port);
        _listen_sock.Listen(_listen_fd);
        LogMessage(DEBUG, "SELECT INIT SUCCESS LISTEN_FD = %d", _listen_fd);
        for (int i = 0; i < fd_num_max; ++i)
        {
            _event_fds[i].fd = default_fd;
            _event_fds[i].events = no_event;
            _event_fds[i].revents = no_event;
        }
    }

    void Start()
    {
        _event_fds[0].fd = _listen_fd;
        _event_fds[0].events = POLLIN;
        int timeout = 3000;
        for (;;)
        {
            int n = poll(_event_fds, fd_num_max, timeout);
            switch (n)
            {
            case 0:
                LogMessage(DEBUG, "TIME OUT TIME:%d", timeout);
                sleep(1);
                break;
            case -1:
                LogMessage(DEBUG, "POLL FAIL");
                break;
            default:
                Dispatcher();
                break;
            }
        }
    }
    ~Poll()
    {
    }

private:
    // 事件分配器 poll 监听到有就绪的套接字 交给分配器处理
    void Dispatcher()
    {
        // 遍历准备就绪的文件描述符集合
        for (int i = 0; i < fd_num_max; ++i)
        {
            int fd = _event_fds[i].fd;
            if (fd == default_fd)
                continue;
            // 检查是否就绪
            if (_event_fds[i].events & POLLIN)
            {
                // 是否为监听套接字
                if (fd == _listen_fd)
                {
                    Accepter();
                }
                else
                {
                    Recver(fd, i);
                }
            }
        }
    }
    void Accepter()
    {

        std::string client_ip;
        uint16_t client_port;
        int sock = _listen_sock.Accept(_listen_fd, &client_ip, &client_port);
        if (sock < 0)
        {
            return;
        }
        LogMessage(NORMAL, "link success %s:%u", client_ip.c_str(), client_port);
        // 从_fd_array找到空位置存放新的套接字信息
        int pos = 1;
        for (; pos < fd_num_max; ++pos)
        {
            if (_event_fds[pos].fd != default_fd)
            {
                continue;
            }
            else
            {
                break;
            }
        }
        // 满了 无法存放新连接的套接字
        if (pos == fd_num_max)
        {
            LogMessage(WARING, "SERVER IS FULL ,CLOSE SOCKET %d NOW", sock);
        }
        else
        {
            // 找到了空闲位置,存放新接受的套接字信息
            _event_fds[pos].fd = sock;
            _event_fds[pos].events = POLLIN;
            _event_fds[pos].revents = no_event;
            PrinfFd();
        }
    }
    void Recver(int fd, int pos)
    {
        char buffer[1024];
        ssize_t read_size = read(fd, buffer, sizeof(buffer - 1));
        if (read_size > 0)
        {
            buffer[read_size - 1] = '\0';
            std::cout << "ECHO# " << buffer << std::endl;
            sleep(1000);
        }
        else if (read_size == 0)
        {
            LogMessage(DEBUG, "CLIENT QUIT,CLOSE FD IS %d", fd);
            close(fd);
            _event_fds[pos].fd = default_fd;
        }
        else
        {
            LogMessage(DEBUG, "RECV ERROR,FD IS  %d", fd);
            close(fd);
            _event_fds[pos].fd = default_fd;
        }
    }
    void PrinfFd()
    {
        std::cout << "online fd list";
        for (int i = 0; i < fd_num_max; ++i)
        {
            if (_event_fds[i].fd == default_fd)
                continue;

            std::cout << _event_fds[i].fd << " ";
        }
    }

private:
    Sock _listen_sock;
    uint16_t _port;
    int _listen_fd;
    struct pollfd _event_fds[fd_num_max];
};

关于poll简单了解一下即可,最重要的是下面介绍的epoll。

多路转接IO之epoll

epoll 是一种非常有用的机制。它相比于传统的 selectpoll 具有许多优点,主要包括以下特性:

  1. 高效: epoll 使用了红黑树(Red-Black Tree)和事件就绪链表(Ready List)的组合,这种数据结构使得 epoll 在处理大量文件描述符时具有更高的效率。
  2. 扩展性好: epoll 支持大量的并发连接,可以处理成千上万个文件描述符,因此非常适用于高并发的服务器程序。
  3. 事件驱动: epoll 是一种事件驱动的机制,当某个文件描述符上有事件发生时,epoll 会立即通知应用程序,而不需要应用程序轮询文件描述符状态。
  4. 支持边缘触发和水平触发: epoll 提供了两种工作模式,即边缘触发(Edge Triggered)和水平触发(Level Triggered),应用程序可以根据需要选择合适的模式。
  5. 文件描述符状态记录在内核中: 与 selectpoll 不同,epoll 将文件描述符的状态记录在内核空间中,减少了每次调用时需要复制文件描述符集合的开销。
  6. 零拷贝: epoll 支持使用 sendfile()splice() 等系统调用进行零拷贝传输,提高了数据传输的效率。

epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll相关的系统调用函数

epoll相关的系统调用函数有三个,分别是epoll_create,epoll_ctl,epoll_wait;

  • epoll_create
    • epoll_create函数用于创建一个epoll模型 函数原型
int epoll_create(int size);
  • 函数参数

    • size 参数指定了内核事件表的大小,它表示可以监听的文件描述符的数量上限。这个参数在大多数情况下可以被忽略(设置为0),内核会自动调整事件表的大小。
  • 返回值

    • 调用成功时,epoll_create 返回一个新的 epoll 实例的文件描述符(非负整数),失败时返回 -1,并设置 errno 表示具体的错误原因。
    • 用完之后, 必须调用close()关闭.
  • epoll_ctl

epoll_ctl 是用于控制 epoll 实例的操作函数,它主要用于添加、修改和删除需要监听的文件描述符以及设置感兴趣的事件。其函数原型如下:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 函数参数

    • epfd 参数是 epoll 实例的文件描述符,即之前使用 epoll_create 创建的返回值。
    • op 参数表示操作类型,有三种取值:
      • EPOLL_CTL_ADD:向 epoll 实例中添加新的文件描述符,并设置关注的事件。
      • EPOLL_CTL_MOD:修改已经存在的文件描述符的关注事件。
      • EPOLL_CTL_DEL:从 epoll 实例中删除文件描述符。
    • fd 参数是需要操作的文件描述符。
    • event 参数是一个指向 struct epoll_event 结构体的指针,用于指定需要添加、修改或删除的文件描述符的事件。
      • 对应的结构体如下:
      typedef union epoll_data {
         void        *ptr;
         int          fd;
         uint32_t     u32;
         uint64_t     u64;
      } epoll_data_t;
      
      struct epoll_event {
         uint32_t     events;      /* Epoll events */
         epoll_data_t data;        /* User data variable */
      };
      
      • struct epoll_event结构中有两个成员,第一个成员events表示的是需要监视的事件,第二个成员data是一个联合体结构,一般选择使用该结构当中的fd,表示需要监听的文件描述符。

      • events的常用取值如下:

      • EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)。

      • EPOLLOUT:表示对应的文件描述符可以写。

      • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。

      • EPOLLERR:表示对应的文件描述符发送错误。

      • EPOLLHUP:表示对应的文件描述符被挂断,即对端将文件描述符关闭了。

      • EPOLLET:将epoll的工作方式设置为边缘触发(Edge Triggered)模式。

      • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听该文件描述符的话,需要重新将该文件描述符添加到epoll模型中。

  • 返回值 调用成功时,epoll_ctl 返回 0,表示操作成功。失败时返回 -1,并设置 errno 表示具体的错误原因。

  • epoll_wiat
    epoll_wait 是用于等待 epoll 实例中的事件发生的函数。 函数原型如下:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
  • 函数参数

    • epfd 参数是 epoll 实例的文件描述符,即之前使用 epoll_create 创建的返回值。
    • events 参数是一个指向 struct epoll_event 结构体数组的指针,用于存储发生的事件。
    • maxevents 参数表示 events 数组的大小,即最多能够存储多少个事件。
    • timeout 参数表示等待事件发生的超时时间,单位是毫秒。传递 -1 表示无限等待,传递 0 表示立即返回,传递一个正数表示等待指定的毫秒数。
  • 返回值

    • 调用成功时,epoll_wait 返回发生事件的文件描述符的个数。失败时返回 -1,并设置 errno 表示具体的错误原因。

epoll原理

当某一进程调用epoll_create函数时,Linux内核会创建一个eventpoll结构体,也就是我们所说的epoll模型,eventpoll结构体当中的成员rbr(红黑树)和rdlist(双链表)与epoll的使用方式密切相关。

struct eventpoll{
	...
	//红黑树的根节点,这棵树中存储着所有添加到epoll中的需要监视的事件
	struct rb_root rbr;
	//就绪队列中则存放着将要通过epoll_wait返回给用户的满足条件的事件
	struct list_head rdlist;
	...
}

图解:

image.png

  • epoll模型当中的红黑树本质就是告诉内核,需要监视哪些文件描述符上的哪些事件,调用epll_ctl函数实际就是在对这颗红黑树进行对应的增删改操作。
  • epoll模型当中的就绪队列本质就是告诉内核,哪些文件描述符上的哪些事件已经就绪了,调用epoll_wait函数实际就是在从就绪队列当中获取已经就绪的事件。

在epoll中,每个事件都有一个对应的epitem结构体。红黑树和就绪队列中的节点就是基于epitem结构中的rbn成员和rdlink成员的。epitem结构当中的成员ffd记录的是指定的文件描述符值,event成员记录的就是该文件描述符对应的事件。

struct epitem{
	struct rb_node rbn; //红黑树节点
	struct list_head rdllink; //双向链表节点
	struct epoll_filefd ffd; //事件句柄信息
	struct eventpoll *ep; //指向其所属的eventpoll对象
	struct epoll_event event; //期待发生的事件类型
}
  • 对于epitem结构当中rbn成员来说,ffd与event的含义是,需要监视ffd上的event事件是否就绪。

  • 对于epitem结构当中的rdlink成员来说,ffd与event的含义是,ffd上的event事件已经就绪了。

  • 当调用epoll_ctl向epoll模型中添加事件时,本质就是向红黑树中新增节点。如果设置了EPOLLONESHOT选项当监听完这次事件之后,如果还需要继续监听该文件描述符的话,则需要重新将该文件描述符添加到epoll模型中。本质就是当设置了EPOLLONESHOT选项的事件就绪时,操作系统会自动将其从红黑树当中删除。

  • 如果没有设置EPOLLONESHOT选项,那么该节点插入红黑树后就会一直存在,除非用户调用epoll_ctl将该节点从红黑树当中删除(第二个参数op进行设置)。

回调机制

  • 所有添加到红黑树中的事件,都会与网卡设备的驱动程序建立回调方法,这个回调方法在内核中叫做ep_poll_callback
  • 对于select和poll来说,操作系统监视多个文件描述符上的事件是否就绪的时候,需要让操作系统主动对这些文件描述符轮询进行检测。这肯定会增加操作系统的负担的。
  • 而对于epoll来说,操作系统不需要在主动进行事件的检测,当红黑树中的监听事件就绪的时候,会自动调用回调机制,将就绪的事件添加到就绪队列中。
  • 当应用层调用epoll_wait函数获取就绪事件的时候,只需要关注底层的就绪队列是否为空,如果不为空,就将队列中就绪的事件拷贝给应用层即可。
  • 采用回调机制的最大的好处就是,不需要操作系统主动对就绪事件进行检测了,当事件就绪的时候,会自动调用对应的回调函数进行处理。

注意:

  • 只有添加到红黑树中的事件才会与底层建立回调方法,因此只有当红黑树当中对应的事件就绪时,才会执行对应的回调方法将其添加到就绪队列当中。
  • 当不断有监视的事件就绪时,会不断调用回调方法向就绪队列当中插入节点,而上层也会不断调用epoll_wait函数从就绪队列当中获取节点,这是典型的生产者消费者模型。
  • 由于就绪队列可能会被多个执行流同时访问,因此必须要使用互斥锁对其进行保护,eventpoll结构当中有lock和mtx成员就是用于保护临界资源的,因此epoll本身是线程安全的。
  • eventpoll结构当中的wq(wait queue)就是等待队列,当多个执行流想要同时访问同一个epoll模型时,就需要在该等待队列下进行等待。

epoll三部曲

  1. 创建 epoll 实例(调用epoll_create)
  2. 注册文件描述符(调用epoll_ctl)
  3. 等待事件和处理(调用epoll_wait)

epoll服务器

只是为了介绍epoll相关系统调用的使用,实现一个简单的epoll服务器接受客户端发送的信息并且回显。

继续使用上面封装的socket。

封装一个epoller类

由于epoll提供的系统调用比较多一点,封装一个epoller类,方便后续使用。

#pragma once
#include <sys/epoll.h>
#include <unistd.h>
#include "log.hpp"
static const int LISTEN_FD_SIZE = 128;
class Epoller
{
public:
    Epoller()
    {
        _efd = epoll_create(LISTEN_FD_SIZE);
        if(_efd == -1)
        {
            LogMessage(DEBUG,"EPOLL_CREATE FAIL");
        }
        else
        {
            LogMessage(DEBUG,"EPOLL_CREATE SUCCESS");
        }
    }   
    int EpollerWait(struct epoll_event *events, int maxevents)
    {
        int wait_ret = epoll_wait(_efd,events,maxevents,_timeout);
        if(wait_ret == -1)
        {
            LogMessage(DEBUG,"EPOLL_WAIT FAIL");
        }
        return wait_ret;      
    }
    int EpollUpdata( int op, int fd, uint32_t event)
    {
        int ctl_ret =0;
        if(op == EPOLL_CTL_DEL)//del
        {
            ctl_ret = epoll_ctl(_efd,op,fd,nullptr);//删除时不需要关心event
            if(ctl_ret == -1)
            {
                LogMessage(DEBUG,"EPOLL_CTL DELETE FAIL");
            }
        }
        else//mod or add
        {
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = fd;

            ctl_ret = epoll_ctl(_efd,op,fd,&ev);
            if(ctl_ret == -1)
            {
                LogMessage(DEBUG,"EPOLL_CTL FAIL");
            }
        }
        return ctl_ret;
    }

    ~Epoller()
    {
        if(_efd >= 0)
        {
            close(_efd);
        }
    }
private:
    int _efd;//epoll实列的文件描述符
    int _timeout{1000};
};

这个 Epoller 类封装了 epoll 的基本操作,包括创建 epoll 实例、等待就绪事件和更新事件集合。下面是对该类的一些说明:

  • 构造函数:在构造函数中使用 epoll_create 创建了一个 epoll 实例,并在日志中记录了创建是否成功。
  • EpollerWait 方法:用于等待就绪事件,调用 epoll_wait 函数。如果等待失败,将会在日志中记录失败信息,并返回 -1。
  • EpollUpdata 方法:用于更新事件集合,可以添加、修改或删除文件描述符和关联的事件。根据传入的操作类型 op,分别调用 epoll_ctl 函数执行对应的操作。如果操作失败,将会在日志中记录失败信息,并返回 -1。
  • 析构函数:关闭 epoll 实例的文件描述符,避免资源泄漏。

EpollServer类实现

#pragma once
#include "log.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include <sys/epoll.h>

static const int PORT = 8080;//默认端口号
static const int NUM = 64;//默认就绪事件数量
class EpollServer
{
public:
    EpollServer(uint16_t port = PORT) : _port(port), _epoller(), _listen_sock()
    {
        _listen_fd = _listen_sock.Socket();
        _listen_sock.Bind(_listen_fd, _port);
        _listen_sock.Listen(_listen_fd);
        LogMessage(DEBUG, "EPOLL INIT SUCCESS LISTEN_FD = %d", _listen_fd);
    }
    void Start()
    {
        // 将_listen_fd添加到epoll中 只关心读事件
        _epoller.EpollUpdata(EPOLL_CTL_ADD, _listen_fd, EPOLLIN);
        //用于存储就绪的事件。最多可以存储的就绪事件数量。
        struct  epoll_event revs[NUM];
        for(;;)
        {
            int n  = _epoller.EpollerWait(revs,NUM);
            if(n > 0 )
            {
                //有事件就绪  让事件分配器进行处理
                LogMessage(DEBUG,"event-ready object fd is %d",revs[0].data.fd);
                sleep(1);//防止刷屏 每隔一秒打印一次
                Dispatcher();
            }
            else if( n == 0)
            {
                LogMessage(DEBUG,"time out");
            }
            else
            {
                LogMessage(DEBUG,"WAIT FAIL");
            }
        }
        
    }
    ~EpollServer()
    {
        if(_listen_fd >= 0)
        {
            close(_listen_fd);
        }
    }
private:
    void Dispatcher();
private:
    uint16_t _port;//端口号 
    Sock _listen_sock;//套接字
    int _listen_fd;//套接字描述符
    Epoller _epoller;//epoll
};

这个 EpollServer 类使用了之前定义的 Epoller 类来处理 epoll 相关操作。以下是对该类的一些说明:

  • 构造函数:在构造函数中创建了一个监听套接字,并将其绑定到指定的端口。然后将该监听套接字添加到 epoll 实例中进行监听。
  • Start 方法:在一个无限循环中调用 EpollerWait 方法等待就绪事件。如果有事件就绪,则调用事件分配器 Dispatcher 进行处理。如果等待超时,则记录超时信息。如果等待失败,则记录失败信息。
  • 析构函数:关闭监听套接字的文件描述符,避免资源泄漏。

这样一个简单的epoll服务器框架就搭建好了。
进行简单的测试。

recording.gif 使用本地回环地址(127.0.0.1)进行测试。发现只要有连接到来,上层不进行处理,就会一直打印。下面就实现事件分配器进行处理事件。

分配器实现

这里的分配器的实现和select逻辑基本一样。遍历存储就绪事件数组查找就绪的事件。

void Dispatcher(struct epoll_event revs[], int num)
{
    for (int i = 0; i < num; ++i)
    {
        uint32_t event = revs[i].events;
        int fd = revs[i].data.fd;
        if (event & EPOLLIN)
        {
            if (fd == _listen_fd)
            {
                // 接受客户端的连接请求
                Accept();
            }
            else
            { 
                //从套接字中读取信息
                Recver(fd);
            }
        }
    }
}

这里只关心读事件,并根据事件类型进行相应的处理。

Accepter实现

void Accept()
{
    std::string ip;
    uint16_t port;
    int sock = _listen_sock.Accept(_listen_fd,&ip,&port);
    if(sock > 0)
    {
        //不能直接从套接字中读取  交给epoll 
        _epoller.EpollUpdata(EPOLL_CTL_ADD,sock,EPOLLIN);
        LogMessage(DEBUG,"GET A NEW LINK # ip = %s port = %d",ip.c_str(),port);
    }
}

调用_listen_sock.Accept() 从监听套接字中接受新的连接,并将新连接的套接字描述符添加到 epoll 实例中。 需要注意 这里不能直接从套接字中读取,交付epoll进行监听。通知对应的进程在进行处理。

Recver实现

 void Recver(int fd)
{
    char buffer[1024];//读取缓冲区
    ssize_t read_size = read(fd,buffer,sizeof(buffer));
    if(read_size > 0)
    {
        buffer[read_size-1] = '\0';
        std::cout << "get a message " << buffer << std::endl;

        //回显
        std::string echo_str = "server echo #";
        echo_str += buffer;
        write(fd,echo_str.c_str(),echo_str.size());
    }
    else if(read_size == 0)
    {
        LogMessage(DEBUG,"CLIENT QUIT FD = %d",fd);
        _epoller.EpollUpdata(EPOLL_CTL_DEL,fd,0);
        close(fd);
    }
    else
    {
        LogMessage(DEBUG,"READ FAIL FROM %d",fd);
        _epoller.EpollUpdata(EPOLL_CTL_DEL,fd,0);
        close(fd);
    }
}

epool监听到事件就绪,交给事件分配器,事件分配器进行判断在进行读取。 Recver从套接字中读取数据,并进行了回显处理。在读取到数据后,它将数据发送回客户端,并在读取失败或客户端关闭连接时关闭相关的文件描述符,并从 epoll 实例中移除该文件描述符。 没有设置 EPOLLONESHOT选项,所以要手动从epoll模型中删除。

完整的EpollServer类

#pragma once
#include "log.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include <sys/epoll.h>

static const int PORT = 8080; // 默认端口号
static const int NUM = 64;    // 默认就绪事件数量
class EpollServer
{
public:
    EpollServer(uint16_t port = PORT) : _port(port), _epoller(), _listen_sock()
    {
        _listen_fd = _listen_sock.Socket();
        _listen_sock.Bind(_listen_fd, _port);
        _listen_sock.Listen(_listen_fd);
        LogMessage(DEBUG, "EPOLL INIT SUCCESS LISTEN_FD = %d", _listen_fd);
    }
    void Start()
    {
        // 将_listen_fd添加到epoll中 只关心读事件
        _epoller.EpollUpdata(EPOLL_CTL_ADD, _listen_fd, EPOLLIN);
        // 用于存储就绪的事件。最多可以存储的就绪事件数量。
        struct epoll_event revs[NUM];
        for (;;)
        {
            int n = _epoller.EpollerWait(revs, NUM);
            if (n > 0)
            {
                // 有事件就绪  让事件分配器进行处理
                LogMessage(DEBUG, "event-ready object fd is %d", revs[0].data.fd);
                sleep(1);
                Dispatcher(revs, n);
            }
            else if (n == 0)
            {
                LogMessage(DEBUG, "time out");
            }
            else
            {
                LogMessage(DEBUG, "WAIT FAIL");
            }
        }
    }
    ~EpollServer()
    {
        if (_listen_fd >= 0)
        {
            close(_listen_fd);
        }
    }

private:
    void Dispatcher(struct epoll_event revs[], int num)
    {
        for (int i = 0; i < num; ++i)
        {
            uint32_t event = revs[i].events;
            int fd = revs[i].data.fd;
            if (event & EPOLLIN)
            {
                if (fd == _listen_fd)
                {
                    // 接受客户端的连接请求
                    Accept();
                }
                else
                { 
                    //从套接字中读取信息
                    Recver(fd);
                }
            }
        }
    }
    void Accept()
    {
        std::string ip;
        uint16_t port;
        int sock = _listen_sock.Accept(_listen_fd,&ip,&port);
        if(sock > 0)
        {
            //不能直接从套接字中读取  交给epoll 
            _epoller.EpollUpdata(EPOLL_CTL_ADD,sock,EPOLLIN);
            LogMessage(DEBUG,"GET A NEW LINK # ip = %s port = %d",ip.c_str(),port);
        }
    }
    void Recver(int fd)
    {
        char buffer[1024];//读取缓冲区
        ssize_t read_size = read(fd,buffer,sizeof(buffer));
        if(read_size > 0)
        {
            buffer[read_size-1] = '\0';
            std::cout << "get a message " << buffer << std::endl;

            //回显
            std::string echo_str = "server echo #";
            echo_str += buffer;
            write(fd,echo_str.c_str(),echo_str.size());
        }
        else if(read_size == 0)
        {
            LogMessage(DEBUG,"CLIENT QUIT FD = %d",fd);
            _epoller.EpollUpdata(EPOLL_CTL_DEL,fd,0);
            close(fd);
        }
        else
        {
            LogMessage(DEBUG,"READ FAIL FROM %d",fd);
            _epoller.EpollUpdata(EPOLL_CTL_DEL,fd,0);
            close(fd);
        }
    }

private:
    uint16_t _port;    // 端口号
    Sock _listen_sock; // 套接字
    int _listen_fd;    // 套接字描述符
    Epoller _epoller;  // epoll
};

epoll的工作模式

epoll的工作模式有两种 分别是水平触发(Level-Triggered)LT模式和边缘触发(Edge-Triggered)ET模式。

LT模式

在水平触发模式下,当文件描述符上有数据可读时,Epoll通知对应的进程即使进程没有读取全部的数据,下次仍然会通知。

如果对应的文件描述符套接字中有数据,而进程没有读取,下次epoll仍然会通知进程该文件描述符可读。

在LT模式下,进程要处理完文件描述符上的所有数据,否则会一直触发可读事件。

ET模式

在边缘模式下,Epoll仅在该文件描述符的状态发生改变时通知对应的进程,例如从未就绪转态到就绪态。

如果文件描述符仍然是就绪状态对应的进程也没有读取数据,Epoll不会再次通知对应的进程。

这种模式下,进程需要注意,一旦文件描述符状态发生变化,立即读取数据并处理,否则可能错过事件。

LT VS ET

  • epoll默认的工作模式就是LT模式,如果要将epoll改为ET工作模式,则需要在添加事件时设置EPOLLET选项。
  • 使用 ET 能够减少 epoll 触发的次数,相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到 每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的。
  • 使用 ET 能够减少 epoll 触发的次数。但是代价就是强逼着程序猿一次响应就绪过程中就把所有的数据都处理完。ET的代码复杂度也更高了。

在LT模式下,支持堵塞读写和非阻塞读写。而在ET模式下仅支持非阻塞读写。

ET模式和非阻塞IO

使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工程实践" 上的要求。

比如下面这样的场景:

  1. 服务器收到客户端10kb数据的请求,服务器会向客户端发送一个应答,如果客户端收不到应答,不会向服务器发送第二个10k的请求。

image.png 2. 如果服务器进程的代码是阻塞式的IO,并且一次只读取1kb数据时(read不能保证一次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断),剩下的9kb数据就会待在缓冲区中。

image.png 3. 此时由于epoll是ET模式,文件描述符的状态没有发生改变,就不会通知对应的进程,剩下的这9kb数据一直在缓冲区存放,直到下一次客户端向服务器写数据时,文件描述符的状态才会发生改变才会通知对应的进程。

image.png 这时候,就有问题来了。

- 服务器只读到1kb数据, 要10kb读完才会给客户端返回响应数据。
- 而客户端要读到服务器的响应。
- 而只有客户端发送了下一个请求,服务器才会通知对应的进程, 才能去读缓冲区中剩余的数据。

这就陷入僵局了,两者谁也服谁的样子。 为了解决上面的问题,(阻塞式read不一定能一次性把缓冲区的数据读完),使用非阻塞轮询的方式进行读取缓冲区,保证一定把完整的请求读取出。

而如果是LT则没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件描述符读就绪。

所以在ET模式下,仅支持非阻塞轮询IO。

总结

常见的多路转接技术有 select、poll 和 epoll,它们都是在 Linux 系统中使用的。这三种 I/O 模型各有特点,下面对它们进行简要总结:

  • Select

    • 特点:最古老的一种多路转接方式,在 UNIX 系统中广泛使用。
    • 优点:跨平台支持良好,适用于小规模的 I/O 操作。
    • 缺点:效率低下,无法处理大量的文件描述符。
    • 适用场景:适用于小规模的 I/O 操作,对性能要求不高的情况。
  • Poll

    • 特点:较 Select 更高效。
    • 优点:提供了更高的效率,能够处理大量的文件描述符。
    • 缺点:随着文件描述符数量的增加,性能仍会下降。
    • 适用场景:适用于中等规模的 I/O 操作,性能要求较高的情况。
  • Epoll

    • 特点:Linux 2.6 内核引入的新型多路转接技术,目前是 Linux 平台上的首选。
    • 优点:具有更高的性能和伸缩性,能够处理大量的并发连接。
    • 缺点:不跨平台,仅适用于 Linux 系统。
    • 适用场景:适用于大规模的高性能服务器程序,能够处理大量的并发连接和高负载情况。
  • 参考源码