OS-并发-事件循环

基于事件的循环

一个典型的事件循环如下:

1
2
3
4
5
while(1){
    events=getEvents()
    for e in events:
        processEvent(e)
}

基于事件循环的并发,调度由程序自己进行,因此调度是可控的。同时,事件循环有两类实现:单线程事件循环、一个主线程负责事件循环并拉起其他线程处理具体事务。本次主要基于单线程事件循环进行。

select 和 poll

select()

select() 是 Linux/UNIX 系统中用于 I/O 多路复用的核心系统调用之一。它允许程序同时监视多个文件描述符(如套接字、管道等),以确定它们是否处于可读、可写或异常状态。

select() 的主要功能是:

  • 监视文件描述符:可以同时监视多个文件描述符,检查它们是否准备好进行读取、写入或是否有异常情况。
  • 阻塞与非阻塞:通过设置超时参数,可以控制 select() 的阻塞行为。如果超时设置为 NULLselect() 会无限期阻塞,直到至少有一个文件描述符准备好。如果超时设置为 0,select() 会立即返回,用于非阻塞检查。
  • 返回就绪描述符select() 返回时,会修改传入的描述符集合,只保留那些已经准备好的描述符。返回值是所有集合中就绪描述符的总数。
1
2
3
#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
  • nfds:要检查的文件描述符的最大值加 1。
  • readfds:指向一个 fd_set 结构,用于检查可读性。
  • writefds:指向一个 fd_set 结构,用于检查可写性。
  • exceptfds:指向一个 fd_set 结构,用于检查异常情况。
  • timeout:指向一个 struct timeval 结构,用于设置超时时间。如果为 NULLselect() 会无限期阻塞。

文件描述符集合操作

select() 使用 fd_set 结构来表示文件描述符集合,并通过以下宏进行操作:

  • FD_ZERO(set):清除集合中的所有文件描述符。
  • FD_SET(fd, set):将文件描述符 fd 添加到集合中。
  • FD_CLR(fd, set):将文件描述符 fd 从集合中移除。
  • FD_ISSET(fd, set):检查文件描述符 fd 是否在集合中。

性能问题select() 在处理大量文件描述符时性能较差,因为每次调用都需要遍历所有描述符。为此,对于高并发场景,建议使用以下替代方案:

  • poll():支持更大的文件描述符集合,且没有 FD_SETSIZE 的限制。
  • epoll():Linux 特有的高效 I/O 多路复用机制,适用于大规模并发连接。

poll()

poll() 是 Linux/UNIX 系统中用于 I/O 多路复用的另一个核心系统调用,与 select() 类似,但提供了更灵活和高效的文件描述符监视机制。

poll() 的主要功能是:

  • 监视文件描述符:可以同时监视多个文件描述符,检查它们是否准备好进行读取、写入或是否有异常情况。
  • 阻塞与非阻塞:通过设置超时参数,可以控制 poll() 的阻塞行为。如果超时设置为负数,poll() 会无限期阻塞,直到至少有一个文件描述符准备好。如果超时设置为 0,poll() 会立即返回,用于非阻塞检查。
  • 返回就绪描述符poll() 返回时,会修改传入的 pollfd 结构数组,填充每个文件描述符的实际事件状态。返回值是所有 pollfd 结构中 revents 字段非零的元素数量
1
2
3
#include <poll.h>

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds:指向一个 pollfd 结构数组,用于指定要监视的文件描述符及其事件。
  • nfdsfds 数组中的元素数量。
  • timeout:超时时间(以毫秒为单位)。如果为负数,poll() 会无限期阻塞;如果为 0,poll() 会立即返回。

pollfd 结构

pollfd 结构用于描述要监视的文件描述符及其事件:

1
2
3
4
5
struct pollfd {
    int   fd;         // 文件描述符
    short events;     // 请求监视的事件(输入参数)
    short revents;    // 实际发生的事件(输出参数)
};
  • fd:要监视的文件描述符。如果为负数,eventsrevents 字段会被忽略。
  • events:指定要监视的事件(如 POLLINPOLLOUT 等)。
  • revents:由内核填充,表示实际发生的事件。

事件类型

poll() 支持以下事件类型:

  • POLLIN:有数据可读。
  • POLLPRI:有紧急数据可读(如 TCP 带外数据)。
  • POLLOUT:可写。
  • POLLRDHUP:对端关闭连接或关闭写端(Linux 2.6.17 及以上)。
  • POLLERR:发生错误(仅在 revents 中返回)。
  • POLLHUP:挂起(仅在 revents 中返回)。
  • POLLNVAL:文件描述符无效(仅在 revents 中返回)。

单线程的事件循环

如果我们的事件循环只使用一个线程,那么我们一次只能处理一个任务,好处是许多并发问题不再出现。但随着而来的是另一个问题:我们不能调用会产生阻塞的系统调用。

多线程并发最常见的目的是让 I/O 操作和计算并行,当一个线程因 read 而被阻塞时其他线程可以正常执行。但对于单线程的事件循环来说,阻塞意味着一切都停下来了。

解决方案

异步 I/O

异步 I/O 使得应用程序能够发出 I/O 请求,并在完成前将控制器返回给调用者,同时提供其他 API 使调用者可以确认 I/O 是否完成。

POSIX AIO 的核心思想是:

  • 异步执行:I/O 操作在后台执行,应用程序可以继续执行其他任务。
  • 通知机制:当 I/O 操作完成时,应用程序可以通过信号、线程或轮询方式获知结果。
  • 控制块:每个异步 I/O 操作通过一个 aiocb 结构(Asynchronous I/O Control Block)来描述和控制。

异步 I/O 将提供两类 API:请求执行异步 I/O 操作、检查 I/O 操作完成情况。

比如:aio_read(&aiocb) 将执行异步读取,同时 aio_error(&aiocb) 可以获取异步 I/O 操作的错误状态。

在对于的 aiocb 未完成之前,aio_error(&aiocb) 将返回 EINPROGRESS,以下是一个简单的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int fd = open(argv[1], O_RDONLY);
if (fd == -1) {
    perror("open");
    exit(EXIT_FAILURE);
}
struct aiocb aiocb;
char buf[BUF_SIZE];
// 初始化 aiocb 结构
aiocb.aio_fildes = fd;
aiocb.aio_offset = 0;
aiocb.aio_buf = buf;
aiocb.aio_nbytes = BUF_SIZE;
aiocb.aio_reqprio = 0;
aiocb.aio_sigevent.sigev_notify = SIGEV_NONE; // 无通知
// 提交异步读取请求
if (aio_read(&aiocb) == -1) {
    perror("aio_read");
    close(fd);
    exit(EXIT_FAILURE);
}
// 轮询检查操作状态
while (aio_error(&aiocb) == EINPROGRESS) {
    printf("Operation in progress...\n");
    sleep(1);
}
// 处理完成的操作
handle_completion(&aiocb);

当然,如果每定义一个异步事件,就要选择一个时间检查 aio_error 的返回情况,那么未免过于麻烦了。为此,通常利用 UNIX 信号向程序传递信息,UNIX 信号是一种和中断结合的机制,当进程接收到信号时,会暂停当前的任何任务,转而执行注册的信号处理函数。

比如以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
void handle(int arg){
    // do some thing
}

int main(){
    signal(SIGHUP,handle);
    while(1){
        // do some thing
    }
    return 0;
}

当接收到 SIGHUP 时会暂停执行循环中的代码,转而执行 handle 函数。

updatedupdated2025-04-272025-04-27