C++|基于协程io_uring 异步网络库系列 III( Proactor、异步与协程 | C++20 coroutine 教程 | io_uring 异步IO 网络框架 系列笔记)

本系列通过结合 linux 的 io_uring 和 cppcoro (源码需要进行部分修改以适配 linux 下的 g++-11)在网络中的使用学习 C++20 coroutine。值得注意的是,cppcoro 目前已经暂停维护,仍然为 TS 版本的支持,同时其真异步底层支持只支持了 win32 的 IOCP(本身 cppcoro 兼容 MSVC),但是本系列不想涉及 IOCP 和 windows 的部分因为除了跨平台外,没有太大意义(如果采用 windows 的话,C# 是足够好的语言,但是目前广泛的服务器应用一般采用 linux)。
专栏内的所有笔记本身是和他们自洽的(也许漏了一篇讲如何理解协程和函数式编程中的 call/cc 的笔记,博客中也上传了,当然实际这系列笔记不是一个能够快速上手的,而是一个系列的学习,主要目的是供我自己复习或者有对 C++ 协程与 Proactor 网络框架编写感兴趣的读者。
本系列本身是内容自洽的如果前提得到了满足即读者(我)必须具备了 C++20 coroutine 的状态机(当然,通用的状态机本身也可以用协程实现,不过这已经离题了,可以另外开笔记来讲怎么做)基础流程在脑子里以及 io_uring 的 liburing API(不是必须的),而这些内容都可以在上面附的文章中得到答案(以及较官方的资料,如 io_uring 的除了本身的 pdf 、邮件列表、还有专门的网站)。尽管这里废话会很多,虽然废话的存在本身可能是因为要用来构造一种常识/直觉,而省去了会简洁很多但是要求读者分布式去中心化地具有这些常识,实际学到的东西很少,没有任何的挑战,而且必须是烂尾预定的系列,但是我还没学会高效的记录方式,很容易忘掉细节。本身看这些也没有用,可能搞懂现有的基础设施的源码对调优还有一点意义。我备注一下实际如果要读源码的话,需要准备 boost asio 支持 coroutine ts 的版本(主要看 example)、cppcoro 的代码库(以及 readme 的示例)。
cppcoro 源码级使用教程系列: 概述 | C++20 coroutine 教程 | io_uring 异步IO 网络框架 系列笔记_我说我谁呢 --CSDN博客
我们如果根据 ACT + Proactor 模式(POSAv2|参考)写的话,无非是说那些真异步组件全部都返回一个 awaitable (不过你要知道 task 也是 awaiter)就行了,然后马上就 co_await 这个 awaitable。例子用 asio 的:
co_await socket.async_read_some(...)
很明显的这个 async_read_some 应该要返回一个 awaitable / awaiter,或者通过 awaiter transform 来实现。重提一下吧, tranform 是基于协程 R 类型的(协程内部调用了 co_await)await_transform(T&& expr) 方法来进行转换得到一个 awaitable,如果这个 awaitable 不是 awaiter,就会再被 co_await 一次得到 awaiter。
event loop 的部分,即 proactor,这个东西做的事情是不断的分发 completion,也就是他应该是睡死在 io_uring_wait_cqe 上,等于 windows 的 IOCP 的 GetQueuedCompletionStatus(当然要放一个 eventfd 方便唤醒退出什么的) 。
协程的接口设计。主要可能是 acceptor 和 的概念,怎么编写他们的协程(逻辑函数),一般来说,接口其实都是制定好的。一种方案是完形填空,像 netty 那种直接继承一个类,然后 override 类的方法,其中参数什么的其实都规定好的了。在协程里面,其实还是很灵活的,因为知识 co_await 一个 awaiter 的东西(别忘了协程基本都是 awaiter+R,从而才能套娃)。muduo 的做法是规定了(tcp层面)接口,必须绑定这个 callback。这算是 proactor 的好处吧,但是 debug 起来比较麻烦。
以 echo 服务器为例子,协程的话,有一个点在这里就是 acceptor 本身 必须获取到 executor 才能 spawn(当然也可以在学 reactor 其他线程里面 spawn,但是因为这里完全是支持异步的,所以同一个线程中的高效才是协程的性能 benefit)。对此 asio 的方案是通过 co_await 一个东西还获得,起原理是因为一开始在 main 函数里面 co_spawn 的时候已经把这个 executor 给绑定到 listener 上去了。(注意 io_context/io_service 是对 executor 结合其他组件的包装,executor 实际是某个线程,这里的 transform 的源码分析在这一篇笔记里面)
awaitable listener() { auto executor = co_await this_coro::executor; tcp::acceptor acceptor(executor, {tcp::v4(), 55555}); for (; ; ) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); co_spawn(executor, echo(std::move(socket)), detached); } }

那么问题来了,我们要怎么设计他呢?其实 asio 这种已经是很漂亮干净的方案了。另外的考虑就是,我们对这个有多大的需求?实际第一个是考虑 TCP 和 UDP。我们明确直到 TCP 需要明显划分听者和处理者,而 UDP 不需要 -- 而是直接 recv_some 类似的阻塞即可,收到了直接处理马上发包回去(仍然是异步的)然后投递下一个 recv_some。
但是获取协程当前 executor 又是一个必须的接口,因为实际你不可能只做一个 echo server 这种一次性 UDP 服务的,考虑我们定制一套底层为 UDP 应用层的协议(QUIC、RPC whatever),就必须有一个 demux 要将 recv 到的东西给拆包路由给不同的人,就算是用户态类似的 TCP,也要 spawn 新的读请求。
下面阐述为什么 acceptor 和 connection handler 不能同一个协程中编写,考虑:
s = co_await 入站连接
co_awaitread(s)
此时,本协程将会再度被挂起,因此 acceptor 就罢工了。
我们先回顾经典的 task 运行真异步任务的流程:
  • 一个 async void 作为 top level function / coroutine, 他执行到第一个 co_await 一个 task 生成表达式的时候,一个 task 被生成,task 协程中的内容未被执行(因为 initial_suspend 是 always 马上捕获了 continuation 进堆上的 R::promise_type 中,此处给他个名字 AAAA)。此时 async void 中的 co_await 触发 task 执行(实际执行的是 task 作为一个 awaiter 的 await_suspend(std::coroutine_handle) 函数)。
  • 此时 task 把 async void 的 continuation 捕获了,同时开始执行 task 中的流程(实际是 await_suspend 中调用了堆上 R::promise_type 中原本 AAAA.resume 从而恢复了 task 内部执行)。
  • task 中走到第一个 co_await 某个 awaiter 的地方,此时,task 本身的 continuation 会通过 await_suspend 被捕获,然后进入到 awaiter_suspend 上了。
这个时候,我们的 awaiter 是一个高级的异步应用。问题就是他是怎么实现的呢?在此时,我们先复习一下 co_await expr 会触发什么变换吧。
// https://lewissbaker.github.io/2017/11/17/understanding-operator-co-await { auto&& value = https://www.it610.com/article/; auto&& awaitable = get_awaitable(promise, static_cast(value)); auto&& awaiter = get_awaiter(static_cast(awaitable)); if (!awaiter.await_ready()){ using handle_t = std::experimental::coroutine_handle; using await_suspend_result_t = decltype(awaiter.await_suspend(handle_t::from_promise(p))); if constexpr (std::is_void_v){ awaiter.await_suspend(handle_t::from_promise(p)); } else{ static_assert( std::is_same_v, "await_suspend() must return 'void' or 'bool'."); if (awaiter.await_suspend(handle_t::from_promise(p))){ } } } return awaiter.await_resume(); }


其实就明白了,我们的活动空间只有 await_suspend 里面了。我们必须想办法把捕获到的 handle 作为 completion token 存起来,幸好他是一个指针的东西,是 POD,所以是随便存的,这样 io_uring 的提供的附带空间就可以用上了,不过我们实际还是要封装一个 token 类,为了上层方便获取结果(io_uring 的使用完全是系统调用们,所以这个部分只需要一个 int 字段)。
由于这里的跳转太多了,我画了一幅图,,,但是感觉要看懂他可能要花点时间。很遗憾的,我不打算修缮他了。你看懂这幅图,你就明白了。我时常感觉表达水平太差而说不明白,有些人写的东西我一看就恍然大悟。
C++|基于协程io_uring 异步网络库系列 III( Proactor、异步与协程 | C++20 coroutine 教程 | io_uring 异步IO 网络框架 系列笔记)
文章图片


我需要补充一个点,很早之前的一些笔记里面提到,我之前对公平服务有一定的误区(就是比如你 epoll 一次返回一个列表,如果单个用户服务时间太长了怎么办?),而这个误区通过一篇 IO最后的拼图(还有一个内容是为什么 epoll 不支持磁盘 io)的笔记解决了,其实就是对于非阻塞活动,进行长时间的读写会不会影响到公平性,是不是需要用户态的定时调度(像内核那种)以保证他的公平性呢?答案是否定的。
回到这幅图上来,如果你恢复的时候要回到 main 的时候,会不会出现连接请求响应不及时的情况呢?(starvation 现象) 噢,这下麻烦了,之前我那篇定量分析 redis 单线程为什么快的文章里面漏了一个点,作为服务器来说,单线程 accept 不就饥饿了吗?其实也不贵,回到 IO最后的拼图那篇笔记(这个笔记还是没有上传,所以这里我贴一部分结论过来)里面,既然线程/进程调度(我海不知道具体的线程和进程的调度 timeslice 是否一致)都是几毫秒到十几二十毫秒了,一个 read 或者 write 系统调用的时间应该还行(按1000us 即 4MB (注意 tcp 是整个窗口最大 64KB 因为编号原因,当然 wrap around reuse、long fat pipe 这种另外谈论,比如现在的 40Gbps 网络)内存读写来说只占了最多 1ms,涉及数据、量化计算部分如果有差错请指出,我修改)。一次 context switch 最多就几 us 到几千 us(几ms),meltdown + spectre 之后就不知道了,我没有资料参考(主要和 CPU 型号也有关系)。这个还有一个疑问是和队列长度有关吗?实际也是没有的。就算设置 io_uring 队列才开几百个个 slot(非常值得注意的事情是,如果 queue full 了,会有一些问题出现,比如 setup 时的 IORING_FEAT_NODROP 选项就是用来避免抛弃 cqe 的,所以丢失事件是可能的,至于实际设计多少合适,英语网上有一些 benchmark 的代码达到很好的性能,看看他们是怎么设置的好啦),你算一下,就知道了,注意了数据包大小肯定不能按一个人 4MB(1ms)算的,实际就是十万 QPS 在 1Gig NIC 下,1kbyte 已经极限了(但是实际其他开销太多打个 1 折差不多,100byte 吧)。总之就是单线程数据包100byte差不多目标往 10w QPS 去应该是可以接受的,更高可能得上 POLL 或者 SQ POLLING 了。饿,其实到这里差不多能理解为什么百度云下载这么贵了。当然,你为了避免这个事情,可以计数啊,比如 event loop 的 backlog 队列里面要 resume 的 coroutine_handle 太多的话,就结合 unlikely 宏(有没有用存疑)让某个时刻停下来非阻塞读一下 Completion Queue 看看呗。
好了,这里我回来(这一段本来想说什么忘记了)。
总之,现在 event loop 其实也明白了。剩下最后一个话题是,async_operation 即上面图片中的粉红色要这么写的问题了。 根据 ACT(POSAv2),其实不是易过借火(此时口水话说的太多了)。
反正经过上面的推导(倒不如说是试验,我无论如何无法直接思想实验整个流程),我们知道了粉红头只要负责把 task1 的 coroutine_handle 给恢复了,一切都是正常的了。总而言之就是,只要没到最后的真异步(asynchronous operation as awaiter),你都用类似 task 的东西运行就行了。当然,async void 也被证实是可行的,只不过是 async void 是不用 co_await 就会执行,而且直接回到 main thread 中,也就是说硬脱钩了,你再也没法获取返回的信息了(as co_awaited)。
【C++|基于协程io_uring 异步网络库系列 III( Proactor、异步与协程 | C++20 coroutine 教程 | io_uring 异步IO 网络框架 系列笔记)】

    推荐阅读