掘金 后端 ( ) • 2024-07-02 18:13

theme: cyanosis highlight: xcode


前言

在Linux中有一句经典台词:"Linux一切皆文件"。IO操作是与文件进行交流的唯一方式,也就是说这是与Linux系统交流的唯一手段。就如同人与人之间的交流,如果我们连交流的方式都不甚了解,交流的效率就会变得低下。操作系统也是如此,唯有深入学习IO模型,才能更高效地在Linux进行开发。

IO模型

基本概念

IO模型指的是数据从内核读取到用户态的方式,在Linux中IO模型分为五类,它们分别如下:

  • 阻塞式IO
    • 概念:我们平时所使用的默认文件读写操作就是阻塞式IO,线程在数据未就绪时会一直阻塞着。

    • 优点:简单,逻辑清晰。

    • 缺点:等待期间线程被阻塞,IO密集型程序效率低下。

    • 应用场景:简单的IO操作,系统资源充足时使用。

image.png

  • 非阻塞式IO

    • 概念:线程在进行IO操作时不进行阻塞,继续往下执行,需要不断轮询数据是否就绪。

    • 优点:线程可以继续执行其他任务,提高IO利用率。

    • 缺点:需要不断轮询,提高CPU负担。

    • 应用场景:需要提高应用响应度的场景。

image.png

  • 多路复用
    • 概念:通过"select"、"epoll"等调用,同时等待多个文件。

    • 优点:同时处理多个IO事件,效率高,节省资源。

    • 缺点:实现较为复杂。

    • 应用场景:服务器设计(Reacter),多路复用是服务器应用最广的IO模型。

image.png

  • 信号驱动IO
    • 概念:利用信号通知机制(SIGIO),异步接收数据。

    • 优点:无需轮询IO状态,响应快。

    • 缺点:不适用于信号密集的通信,如tcp。

    • 应用场景:需要异步通知的IO操作,网络UDP通信。

image.png

  • 异步IO

    • 概念:在发起IO操作后立刻返回,内核在完成IO操作后通知进程。

    • 优点:最大化IO效率,可以完全异步处理其他任务。

    • 缺点:实现复杂,对操作系统有要求(不是所有系统都支持异步IO)。

    • 应用场景:高性能服务器设计(Proacter)。

image.png

虽然IO模型分为5大类,但他们也可以被分为同步IO操作与异步IO操作。这里同步与异步是指IO操作是否需要阻塞等待IO事件完成,所以真正意义上的异步IO操作只有异步IO。

多路复用

多路复用是作为服务器应用最广的一种IO模型,能够同时监听多个连接(文件描述符),只要发现文件的IO事件就绪,就能够进行统一操作,从而提高效率。

多路复用随着时间的推移,也衍生出了不同的系统调用接口,每个接口的推出都是为了改进上一个接口的缺点,到现在主流的多路复用接口都为epoll(Linux独有)、kqueue(BSD、MAC)、IOCP(Windows),还有跨平台的第三方IO库,如:libevent、libuv。

本文将按其发展的顺序逐个介绍select、poll、epoll这些Linux中的多路复用接口。不过为了避免写太多重复的代码,所以这里先将socket封装成类。

#include <netinet/in.h>
#include <string>
#include <sys/socket.h>
#include <unistd.h>


const int backlog = 5;

class Socket {
public:
    Socket* Get() { return this; }

    Socket() = default;

    ~Socket() { Close(); }

    void BuildListenSocket(const int port)  {
        _port = port;

        _fd = socket(AF_INET, SOCK_STREAM, 0);
        if(_fd < 0) {
            perror("socket");
            abort();
        }

        Bind();
        Listen();
    }

    ssize_t Recv(std::string& str, const int size) const {
        char buffer[size];
        ssize_t n  = ::recv(_fd , buffer, size, 0);
        buffer[n] = '\0';
        str += buffer;
        return  n;
    }

    ssize_t Send(std::string& buf, const int size) const {
        return ::send(_fd, buf.data(), size, 0);
    }

    explicit Socket(const int fd) :_fd(fd) {}

    int Fd() const { return _fd; }

    void SetFd(const int fd) { _fd = fd; }

    int Accept() {
        int fd = ::accept(_fd, (sockaddr*)&_addr, &_len);
        _port = ntohs(_addr.sin_port);
        if(fd < 0) {
            perror("accept");
        }
        return fd;
    }

    void Listen() const {
        if(::listen(_fd, backlog) != 0) {
            perror("listen");
            abort();
        }
    }

    void Close() const {
        close(_fd);
    }

    void Bind() {
        _addr.sin_family = AF_INET;
        _addr.sin_addr.s_addr = INADDR_ANY;
        _addr.sin_port = htons(_port);

        if(bind(_fd, (sockaddr*)&_addr, _len) != 0) {
            perror("bind");
            abort();
        }
    }

private:
    sockaddr_in _addr{};

    socklen_t _len = sizeof(_addr);
    int _port{};
    int _fd = -1;
};

select

select是最先被推出的多路复用接口,可以同时监听多个文件描述符的读写与异常事件,但需要自己定义数据结构来存储文件描述符和轮询哪些文件描述符就绪了。所以每次使用select都需要写一堆重复的代码。

select在每次调用时,无论我们是否需要监听所有的fd,内核都会根据nfds参数,从0~nfds-1轮询读写、异常FD集合,来查明哪个文件描述符触发了事件,时间复杂度为O(n)。

函数原型与参数:

#include <sys/select>

// 从fd_set移除文件描述符
 void
 FD_CLR(fd, fd_set *fdset);

// 拷贝fd_set
 void
 FD_COPY(fd_set *fdset_orig, fd_set *fdset_copy);

// 检查某个文件描述符是否被事件就绪
 int
 FD_ISSET(fd, fd_set *fdset);

// 为文件描述符设计监听
 void
 FD_SET(fd, fd_set *fdset);

// 初始化fd_set
 void
 FD_ZERO(fd_set *fdset);

 int
 select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);
  • 参数:
    • nfds: 指定要监听的文件描述符中最大的文件描述符+1.

    • readfds: 要监听读事件的文件描述符合集

    • writefds: 要监听写时间的文件描述符合集

    • errorfds: 要监听是否有异常情况的文件描述符合集

    • timeout: 可以为事件设置超时时间,为空时阻塞等待。

select的服务器设计

#include "Socket.hpp"
#include <sys/select.h>
#include <iostream>
#include <vector>

const int selectNum = 10;   //select 最大管理的文件描述符数量。

class Select {
public:
    explicit Select(int port) :_socket(port) {
        Init(port);
    }

    void Handle() {
        for(int i = 0; i < _array.size(); ++i) {
            if(!_array[i])  continue;

            int fd = _array[i]->Fd();

            if(FD_ISSET(fd, &_rfdset)) {
                //  新连接到来的情况
                if(fd == _socket.Fd()) {
                    Socket* socket = new Socket(_socket.Accept());

                    int j;
                    for(j = 1; j < _array.size(); ++j) {
                        if(!_array[j]) {
                            _array[j] = socket;
                            std::cout << "New Connection:" << socket->Fd() << std::endl;
                            break;
                        }
                    }

                    if(j == _array.size()) delete socket;
                }
                else {
                    //正常文件读取
                    std::string buf;
                    ssize_t sz = _array[i]->Recv(buf, 1024);
                    if(sz > 0) {
                        std::cout << "client: " << buf << std::endl;
                        _array[i]->Send(buf, sz);
                    }
                    else {
                        std::cout << "client error or quit:" << _array[i]->Fd() << std::endl;
                        FD_CLR(_array[i]->Fd(), &_rfdset);
                        delete _array[i];
                        _array[i] = nullptr;
                    }
                }
            }
        }
    }

    void Loop() {   //程序入口
        _isRunning = true;
        while (_isRunning) {
            FD_ZERO(&_rfdset);  // fd_set 每次使用前都要初始化
            int maxfd = _socket.Fd();

            for(int i = 0; i < _array.size(); ++i) {
                if(!_array[i])  continue;

                int fd = _array[i]->Fd();
                FD_SET(fd, &_rfdset);
                maxfd = fd > maxfd ? fd : maxfd;
            }

            timeval timeout = {1, 0};

            int n = select(maxfd+1, &_rfdset, nullptr, nullptr, &timeout);
            switch (n) {
                case 0:
                    std::cout << "select timeout, last time: " << timeout.tv_sec << ":" << timeout.tv_usec << std::endl;
                    break;
                case -1:
                    std::cerr << "select error!!!" << std::endl;
                    break;
                default:
                    Handle();
                    break;
            }
        }
    }

    void Stop() { _isRunning = false; }

private:
    void Init(int port) {   //初始化
        _socket.BuildListenSocket(port);
        _array.resize(selectNum);
        for (int i = 0; i < _array.size(); ++i)
            _array[i] = nullptr;

        _array[0] = _socket.Get();
    }

private:
    Socket _socket;
    fd_set _rfdset;
    bool _isRunning{};

    std::vector<Socket*> _array{};  //使用vector来管理文件描述符
};

select的优缺点

  • 优点:

    • 兼容性好:select是最早的多路复用IO模型,大部分操作系统都拥有。
    • 接口简单:只需要传入
  • 缺点:

    • 可监控的文件描述符有限:监控的文件描述符数目取决于sizeof(fd_set)的大小。
    • 使用不便:每次调用select都需要手动设置fd合集。
    • 设计缺陷:
      • 每次调用select都需要将fd合集复制到内核,再将其从内核复制回来用户态。
      • select在内核中需要轮询所有fd集合,效率非常低。

poll

poll比起select多了不少可以监听的事件,并且使用了一个pollfd结构体作为存储文件描述符的容器,使其接口传参和使用上变得更简便了。

poll能够在pollfd数组设置需要监听的文件描述符合集,与select不同,内核只会监听这些被设置了的fd。因此,在文件描述符分布稀疏的情况下,poll比起select效率更高。但,其本质还是轮询所有的文件描述符,所以时间复杂度还是O(n)。

函数原型与参数:

 #include <poll.h>

 int
 poll(struct pollfd fds[], nfds_t nfds, int timeout);
 
struct pollfd {
 int    fd;       /* 需要监听的文件描述符 */
 short  events;   /* 需要监听的事件 */
 short  revents;  /* 实际发生的事件 */
};

// 常用的事件
// POLLIN : 输入
// POLLOUT : 输出
// POLLRDHUP : 对端套接字关闭
// POLLERR : 错误

  • 参数:
    • fds: 需要监听的fd合集

    • nfds: fds数组的数据个数。

    • timeout: 超时时间,为-1时一直阻塞,0时则不阻塞。

poll服务器设计

//
// Created by lang liu on 2024/6/30.
//

#ifndef POLL_HPP
#define POLL_HPP


#include "Socket.hpp"
#include <poll.h>
#include <iostream>
#include <memory>

class Poll {
public:
    Poll(const int num, const int port) : _num(num), _port(port), _isRunning(false) {
        Init();
    }

    void Loop() {
        _isRunning = true;
        while (_isRunning) {
            int timeout = -1;   //-1表示一直阻塞,直到有文件描述符就绪

            int ret = poll(_fds.get(), _num, timeout);

            switch (ret)
            {
            case 0:
                std::cout << "poll timeout" << std::endl;
                break;
            case -1:
                std::cout << "poll error" << std::endl;
                break;
            default:
                Handle();
                break;
            }
        }
    }

private:
     void Handle() {
        for (int i = 0; i < _num; ++i) {
            if(_fds[i].fd == -1)    continue;

            int fd = _fds[i].fd;
            short event = _fds[i].revents;

            if(event & POLLIN) {
                if(fd == _socket->Fd()) {
                    int newFd = _socket->Accept();

                    if(newFd < 0) continue;

                    std::cout << "new connection: " << newFd << std::endl;

                    int j;
                    for(j= 1; j < _num; ++j) {
                        if(_fds[j].fd == -1) {
                            _fds[j].fd = newFd;
                            _fds[j].events = POLLIN;
                            break;
                        }
                    }
                    if(j == _num) close(newFd);
                }
                else {
                    // 普通读事件
                    char buffer[1024]{};
                    ssize_t n = recv(fd, buffer, sizeof(buffer), 0);
                    if(n > 0) {
                        std::cout << "client: " << buffer << std::endl;
                    }
                    else {
                        std::cout << "client quit or error" << std::endl;
                        close(fd);
                        _fds[i].fd = -1;
                        _fds[i].events = 0;
                    }
                }
            }
        }
    }

    void Init() {
        _socket = std::make_unique<Socket>();
        _socket->BuildListenSocket(_port);
        _fds = std::make_unique<pollfd[]>(_num);
        for(int i = 0; i < _num; ++i) {
            _fds[i].fd = -1;
            _fds[i].events = _fds[i].revents = 0;
        }

        _fds[0].fd = _socket->Fd();
        _fds[0].events = POLLIN;
    }

private:
    std::unique_ptr<Socket> _socket;
    std::unique_ptr<pollfd[]> _fds;

    int _num;
    int _port;
    bool _isRunning;
};

#endif //POLL_HPP

poll的优缺点:

  • 优点:

    • 不限文件描述符:poll能够监听的文件描述符没有上限。

    • 更灵活:比起select,poll可以监听更多事件。

    • 使用简便:比起select,poll免去了手动管理fd合集数据结构。

  • 缺点:

    • 效率低下:poll调用时依然需要遍历所有fd合集。

epoll

epoll是linux独有的一种多路复用机制,其在使用上分为了三个接口,分别为epoll_create、epoll_ctl、epoll_wait。虽然接口变多了,但其实在使用上反而变得更简便、优雅了。

在处理大量文件描述符时,epoll的效率远远高于select与poll。这是因为select和poll每次在调用时,都需要将文件描述符集合从用户态拷贝到内核态,然后再将结果内核态重新拷贝回来。而epoll则免去了这种频繁的用户态与内核态之间的拷贝,只需要在内核态复制就绪的fd集合到用户态。

epoll的接口

  • epoll_create:创建epoll模型
#include <sys/epoll.h>

int epoll_create(int size);   //size选项在2.6.8被废除
int epoll_create1(int flags);  //和epoll_create差不多,不过可以使用一些特殊选项

//返回值都为文件描述符, epfd
  • epoll_ctl:控制epoll模型
int epoll_ctl(int epfd, int op, int fd,
             struct epoll_event *_Nullable event);

// op 选项:
// EPOLL_CTL_ADD
// EPOLL_CTL_MOD
// EPOLL_CTL_DEL

// 常用event选项
// EPOLLIN
// EPOLLOUT
// EPOLLET
// EPOLLERR
  • 参数:

    • epfd:epoll_create创建的epoll模型fd。
    • op:操作选项, 如增添监控。
    • event:需要监听的事件。
  • epoll_wait:获取就绪的文件描述符

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events,
              int maxevents, int timeout);
  • 参数:
    • epfd:epoll模型的文件描述符。

    • events:存储就绪文件描述符队列的数组。

    • maxevents:events数组的大小。

    • timeout:设定超时时间(毫秒)。

epoll工作原理

epoll在调用epoll_create时,会在内核创建出一个epoll模型(eventpoll),这个模型拥有一颗红黑树用于存储监控的文件描述符和存储就绪文件描述符节点的就绪队列。

image.png

epoll在检查哪些文件描述符是否就绪时,不需要像select、poll一样,轮询所有的文件描述符。当某个文件描述符有事件就绪时,epoll模型在其红黑树中查找是否有匹配的文件描述符和事件(O(logn))。如果找到了匹配的结果,则在就绪队列中链入其节点,最后将就绪队列复制回用户态(O(n))。

在了解了这些之后,再来看epoll的三个接口就会发现它们所做的工作都在围绕着epoll模型进行。

  • epoll_create:epoll在使用前,必须先在内核中创建epoll模型,之后的增删查改都在epoll模型中进行。

  • epoll_ctl:在epoll模型中的红黑树进行增删改操作。

  • epoll_wait:epoll模型开始进行事件的监听,如果在红黑树发现有匹配的节点,则链入到就绪队列。

// linux内核中对epoll模型的描述
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;

/*
 * This mutex is used to ensure that files are not removed
 * while epoll is using them. This is held during the event
 * collection loop, the file cleanup path, the epoll file exit
 * code and the ctl operations.
 */
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;

/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;

/* List of ready file descriptors */
struct list_head rdllist;

/* RB tree root used to store monitored fd structs */
struct rb_root rbr;

/*
 * This is a single linked list that chains all the "struct epitem" that
 * happened while transferring ready events to userspace w/out
 * holding ->lock.
 */
struct epitem *ovflist;

/* The user that created the eventpoll descriptor */
struct user_struct *user;
};

struct epitem {   //红黑树、就绪队列存放的都是epitem
/* RB tree node used to link this structure to the eventpoll RB tree */
struct rb_node rbn;

/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink;

/*
 * Works together "struct eventpoll"->ovflist in keeping the
 * single linked chain of items.
 */
struct epitem *next;

/* The file descriptor information this item refers to */
struct epoll_filefd ffd;

/* Number of active wait queue attached to poll operations */
int nwait;

/* List containing poll wait queues */
struct list_head pwqlist;

/* The "container" of this item */
struct eventpoll *ep;

/* List header used to link this item to the "struct file" items list */
struct list_head fllink;

/* The structure that describe the interested events and the source fd */
struct epoll_event event;
};

总结

epoll的代码因为太多,所以我放在github上,感兴趣可以去看看。以下是select、poll、epoll的一些对比。

特性 select poll epoll 系统调用 select poll epoll_create, epoll_ctl, epoll_wait 监控方法 使用位图(bitmap) 使用数组 使用红黑树(rb-tree)和链表 性能 随文件描述符数量线性增长 随文件描述符数量线性增长 O(1)操作,适合大规模文件描述符监控 事件处理方式 每次调用都要重新设置描述符集合 每次调用都要重新设置描述符数组 内核维护就绪事件列表 优点 简单,广泛支持 简单,广泛支持,比select稍高效 高效,适合大规模并发,支持边缘触发和水平触发 缺点 只能监控有限数量的文件描述符,性能随描述符数量增加而下降 性能随描述符数量增加而下降 实现相对复杂,只在Linux上可用