Skip to content

A coroutine framework aimed at high-concurrency io with reasonable latency, based on io_uring.

License

Notifications You must be signed in to change notification settings

Codesire-Deng/co_context

Folders and files

NameName
Last commit message
Last commit date

Latest commit

8f33887 · Jul 8, 2024
May 19, 2023
Dec 31, 2023
Sep 6, 2023
Dec 31, 2023
Dec 31, 2023
Jul 8, 2024
Jan 8, 2024
Dec 31, 2023
Nov 25, 2023
Dec 31, 2023
Dec 31, 2023
Dec 30, 2023
Jun 27, 2022
Oct 1, 2023
May 17, 2022

Repository files navigation

简中 | English

co_context

License Platform Language Ubuntu 22.04 (gcc) Ubuntu 22.04 (clang)

co_context 是一个协程并发框架,提供可靠的性能和强易用性,让 C++ 初学者也能轻松写出高并发程序。 co_context 基于 Linux io_uring,其性能通常优于 epoll。

不了解协程?

我在 Bilibili 上传了协程系列视频,请多指教!

已有功能

  1. 支持 read write accept timeout 等 io_uring 提供的所有系统调用,总计 74 个功能。
  2. 并发支持: any, some, all, mutex, semaphore, condition_variable, channel
  3. 调度提示: yield, resume_on
  4. 取消 IO/协程:timeout, timeout_at, stop_token

编译和运行

依赖

  1. GCC 11.3.0 或 Clang 14 或更高。
  2. [可选] mimalloc 从包管理器或源代码安装。
  3. Linux 内核版本 >= 5.6,建议 >= 5.11,越新越好。
    • 运行 uname -r 即可查看你的内核版本。
    • 由于开发环境是 Linux 6.0,在其他版本下可能出现兼容性错误。如遇问题,请将报错发到issue,非常感谢!
    • docker 将继承宿主机的 Linux 内核版本。 因此,docker 无法解决 Linux 内核版本过低的问题。

编译命令

cmake -B build && cmake --build build -j
build/example/timer # 跑一个小示例

代码示例

基础用法

创建一个 io_context,用于运行协程:

    using namespace co_context;
    io_context context;

使用 task<> 定义一个 socket 监听协程。task<> 就像一个普通的 void 函数,但可以在里面使用 co_await

task<> server(uint16_t port) {
    acceptor ac{inet_address{port}};
    for (int sockfd; (sockfd = co_await ac.accept()) >= 0;) {// 异步接受 client
        co_spawn(session(co_context::socket{sockfd})); // 每个连接生成一个 worker 任务
    }
}

继续使用 task<T> 描述业务逻辑,例如读取 socket 的内容并输出到 stdout:

task<> session(co_context::socket sock) {
    char buf[8192];
    int nr = co_await sock.recv(buf);

    // 不断接收字节流并打印到stdout
    while (nr > 0) {
        co_await lazy::write(STDOUT_FILENO, {buf, (size_t)nr});
        nr = co_await sock.recv(buf);
    }
}

task<T> 和普通 T 函数一样可以任意嵌套:

task<int> the_answer() {
    co_return 42;
}
task<> universe() {
    printf("The answer is %d\n", co_await the_answer());
    co_return;
}

如何写一个 main()

int main(int argc, const char *argv[]) {
    if (argc < 3) {
        printf("Usage:\n  %s hostname port\n  %s -l port\n", argv[0], argv[0]);
        return 0;
    }

    io_context context; // 1. 定义一个 io_context

    int port = atoi(argv[2]);
    if (strcmp(argv[1], "-l") == 0) {
        context.co_spawn(server(port)); // 2. 至少创建一个 task<>
    } else {
        context.co_spawn(client(argv[1], port));
    }

    context.start(); // 3. 启动 io_context 线程
    context.join();  // 4. 需要时等待 io_context 线程

    return 0;
}

更多特性示例

一秒触发器

task<> my_clock() {
    for (int cnt = 0;;) {
        printf("Time = %d\n", cnt++);
        co_await timeout(1s);
    }
}

网络超时

秒懂的超时控制。

task<> session(co_context::socket peer) {
    char buf[8192];
    int nr = co_await timeout(peer.recv(buf), 3s); // 限时3秒

    while (nr > 0) {
        co_await lazy::write(STDOUT_FILENO, {buf, (size_t)nr});
        nr = co_await timeout(peer.recv(buf), 3s); // 限时3秒
    }

    log_error(-nr);
}

void log_error(int err) {
    switch (err) {
        case ECANCELED:
            log::e("timeout!\n");
            break;
        default:
            log::e("%s\n", strerror(err));
            break;
    }
}

负载均衡

每个 io_context 代表一个线程。要负载均衡,只需将 task<> 分配到合理的 io_context

示例:echo_server_MT.cpp

借助 resume_on(),可以在运行时任意切换线程。

示例:resume_on.cpp

Generator

同步执行的生成器,可配合std::range需要 g++

co_context::generator<int> gen_iota(int x) {
    while (true)
        co_yield x++;
}

int main() {
    using namespace std::views;

    for (int x : gen_iota(1) | drop(5) | take(3)) {
        std::cout << x << " ";
    }
    // 输出 6 7 8

    return 0;
}

链接 IO

&& 来链接两个 I/O。链接 I/O 可以减少重入内核态和调度器,增强性能表现。(默认只返回最后一个返回值。)

nr = co_await (
    peer.send({buf, (size_t)nr})
    && peer.recv(buf)
);

此例子利用 link_io 大幅增强 echo_server 的性能

channel(实验性)

借鉴自 Go 语言的阻塞队列。

示例:channel.cpp

Draft

性能

co_context 在开发过程中表现出惊人的性能。早期测试见我的博客。下一个开发周期将进行更多测试。

协程方案的局限场景

由于内置动态内存分配,基于协程的异步框架可能不是性能的最优解,如果你正处于类似 30ns 延迟的极端性能场景,且不在乎编程复杂度,推荐关注 sender/receiver model,而无需尝试协程。

协程方案的适用场景

如果你希望异步框架能够最佳地平衡「开发、维护成本」和「项目质量、性能」,从而最大化经济效益,推荐你关注协程方案。感性理解:协程 + 内核态 I/O 的性能类似于 Redis 的网络模块。

关于缓存友好问题

co_context 竭尽所能避免缓存问题:

  1. co_context 的主线程和任意 worker 的数据交换中没有使用互斥锁,极少使用原子变量。
  2. co_context 的数据结构保证「可能频繁读写」的 cacheline 最多被两个线程访问,无论并发强度有多大。这个保证本身也不经过互斥锁或原子变量。(若使用原子变量,高竞争下性能损失约 33%~70%)
  3. 对于可能被多于两个线程读写的 cacheline,co_context 保证乒乓缓存问题最多发生常数次。
  4. 在 AMD-5800X,3200 Mhz-ddr4 环境下,若绕过 io_uring,co_context 的线程交互频率可达 1.25 GHz。
  5. 在一个本地测试中(I7-8550U 移动端),单线程的协程切换的平均延迟为 4.6 ns,代码于 test/benchmark/lazy_yield
  6. 在一个本地测试中(R7-5800X 桌面端),跨线程的协程切换的平均延迟为 37 ns,代码于 test/ctx_swtch.cpp
  7. 协程自身的缓存不友好问题(主要由 operator new 引起),需要借助其他工具来解决,例如 mimalloc

协程存在的问题

弱点

  1. 除非编译器优化,每个协程都需要通过 operator new 来分配 frame:
    • 多线程高频率动态内存分配可能引发性能问题;
    • 在嵌入式或异构(例如 GPU)环境下,缺乏动态内存分配能力,难以工作。
  2. 除非编译器优化,协程的可定制点太多,需要大量间接调用/跳转(而不是内联),同样引起性能问题。
    • 目前,编译器通常难以内联协程
    • HALO 优化理论:P0981R0
  3. 动态分配间接调用的存在,导致协程暂时无法成为异步框架的最优方法。

拆分子协程?

  • 出于性能考虑,不要将大协程拆分为几个小协程,因为会增加动态内存分配次数。
    • 可以做 placement new 吗?

与异步框架高度耦合

  1. 暂停和恢复都需要通过异步框架。
  2. 表达式模板的潜力不如 sender/receiver 模型:
    • 协程是顺序/分支/循环结构,s/r是表达式。

draft

  • 研究 liburingcxx 如何支持多生产者,多消费者并行(线程池中每个线程同时是 IO 生产者和消费者)
  • Coroutine 解决内联和动态内存分配问题
  • 表达式模板解决 task && ||
  • std::execution 能否兼容

线程池实现

  • 一个内核线程 polling,一个主线程收集提交、收割推送I/O,其他固定 worker 线程,thread bind core
  • 节能模式:信号量表示允许的 idle worker 线程数量。低延模式:每个 worker 都 polling
  • 每个 worker 自带两条任务队列(一个sqe,一个cqe),固定长度,原子变量,cacheline友好。sqe放不下就放 std::queue,等有空位再放入共享cache。
  • 主线程cqe推送满了就切换到提交sqe
  • 主线程sqe提交满了就切换到推送cqe

eager_io

一种激进的 IO awaiter,在构造函数中初始化 IO 请求并提交。

在被 co_await 时,若 IO 早已完成,则无需让出。否则,需要等待 IO 完成后由调度器唤醒。

eager_io 的动机

  1. 可以轻易部署并行化的 IO 请求,且对于 caller 协程来说是非阻塞的。还可进化出可取消的协程。
  2. 尽早提交 IO 请求,可能带来更低的延迟。

eager_io 的缺点

涉及多线程并行,需要同步 IO 的状态(未完成、已完成)。至少要保证:调度器必须确保 「eager_io 已经知悉 IO 已完成」,否则可能丢数据。

eager_io 的实现

TODO: 改用原子变量,弃用检查队列

co_context 假设大多数 eager_io 会陷入「等待状态」,以此为优化立足点

  1. eager_io 的 coroutine state(promise) 是调度器负责决定由谁销毁(由调度器或者由协程自己)。
  2. eager_io 发起 IO 前,自我标记为「初始状态」「无结果」「无权销毁」,然后发起 IO。
  3. eager_io 在「初始状态」下被 co_await,检查结果:
    1. 为「无结果」,则自我标记为「等待状态」「有权销毁」「有结果」,让出执行权
    2. 为「有结果」,自我标记为「IO 后状态」(保持「无权销毁」),继续执行。
    3. 析构时,「有权销毁」则销毁协程,否则自我标记为「待销毁」。
  4. 调度器收割 IO 时,检查协程的标记:
    1. 为「等待状态」,则将协程加入调度队列,令其自行销毁。
    2. 为「初始状态」(初始、等待叠加态),向协程标记「有结果」,随后将协程加入检查队列
  5. 调度器完成一轮提交/收割后,轮询检查队列:
    1. 若协程为「等待状态」,则弹出检查队列,并加入调度队列,令其自行销毁。
    2. 若协程为「初始状态」或者「IO 后状态」,不管它。
    3. 若协程为「待销毁」,销毁它,弹出检查队列。

xxx <-> is_detached is_waiting is_ready

manager:

  • ready: xx0 to xx1
    • 1x1 : manager delete task_info, do not resume.
    • 001 : worker will delete task_info, do not resume.
    • 011 : worker will delete task_info, resume

worker:

  • wait: x0x to x1x
    • 11x : wait after detached, logic error
    • 010 : suspend, worker will delete task_info
    • 011 : do not suspend, worker will delete task_info
  • detach: 0xx to 1xx
    • 1x1 : worker will delete task_info
    • 100 : manager will delete task_info
    • 110 : detach after waited, logic error

此实现中可能的漏洞:

  1. 未反省协程发生异常时的内存模型
  2. 等你来发现……

lazy_io

一种懒惰的 IO awaiter,在,在构造函数时什么都不做。

在被 co_await 时暂停,并发起 IO 请求,未来等待由调度器唤醒。当前线程轮询可以切入的协程。

lazy_io 的实现

  1. lazy_io 返回一个 awaiter,其中的 await_suspend 负责主要逻辑:
    1. 提交一个 IO 请求。
    2. 找到一个已收割的 IO 请求,恢复它
  2. awaiterawait_resume 返回特定结果。
  3. 析构时,销毁协程。

semaphore

仅运行在用户态 co_context 的信号量

semaphore 的动机

限制 co_spawn 和同类活跃协程的并发量

semaphore 的实现

  1. 参考 std::semaphore,优化 binary_semaphore 的原子变量
  2. 链表栈模拟无锁队列,均摊O(1)
  3. acquire 分别在栈上创建 awaiter,形成等待链表
  4. release 时放出一个release请求,由io_context处理(强制单消费者),放入某个reap_swap