本文主要梳理 Rust 和 Python 的 async 实现中涉及的一些通用概念和实现机制。头脑中储备一些异步编程底层的实现原理,可以帮助我们更好地掌握异步编程。
协程:可暂停可恢复
正常函数调用的控制流是“单入单出”,从调用开始,正常或异常返回后结束,调用的栈帧也随之销毁。而异步编程要求在函数执行到一半时,“暂停”控制流,在未来的某个时刻再“恢复”。由于控制流尚未结束,因此调用链路上的栈帧还不能被销毁,这些信息需要以某种形式保存。可暂停可恢复的控制流,加上它所保存的信息,就可以称为“协程”。
栈帧(Stack Frame)
函数调用过程中使用的临时变量会记录到栈上,这些信息是与某个函数的某次调用绑定的,调用结束后就被废弃,这些数据就是栈帧。物理形态上,通常栈帧是“叠”在一起的,例如函数 A 中调用了函数 B,而 B 又调用了 C,则在 C 运行中,栈的状态类似下图:
| ... | |
Python 记录栈帧
Python coroutine[1] 的处理方式是直接保存栈帧。调用的最内层通过 yield
暂停控制流,中间层通过 yield from
或 await
[2] 将内层的
coroutine 一路往外传,需要恢复时,再使用 send
方法恢复执行[3]:
def inner(): |
注意在 coroutine 中,最终的返回值是通过 StopIteration
带出来的。
此外,外层拿到的 coro
其实包含了最内层 inner
的栈帧(需要了解
yield from 的机制),因此第二次调用
coro.send(None)
时,会从 inner
函数 yield
处恢复执行。
Rust 编译成状态机
对于缺少 GC 的语言来说,移动、复制栈帧是个原理可行,实际几乎不可行的操作。这些语言里手工创建的指针,可以指向栈上分配的内存,指针还可能被其它线程引用。栈帧移动时,这些指针都需要“修复”;栈帧复制时,数据多了份引用,内存释放又成问题。
Rust 使用了“状态机”的方式来实现控制流的暂停、恢复的能力[4]。
首先是最内层的暂停逻辑,与 Python 不同,内层没有专门的暂停机制,只约定了接口,如果(因为资源未就绪)要暂停,则返回一个特殊值(Poll::Pending
),由调用方来决定是否真的暂停和处理恢复。
pub trait Future { |
Python 的中间层会通过 yield from
向外传递栈帧[5],那 Rust
的中间层如何对外层提供暂停、恢复的能力呢?Rust 里提供了 await
关键词来表达等待内层的 future[6]:
fn inner1() -> impl Future<Output = u32> { |
那么 async/await
底层发生了什么?Rust 编译器会做这么几件事:
- 遇到
async fn
定义时,会把middle
方法的返回改为Future<Output=...>
- 将代码逻辑以
await
为拆分点,拆成状态机的 N 个状态,每个状态存储下个 await 可见的变量和 future - 将两个 await 之间的代码,转换成状态机的转移逻辑
上面的例子编译器会编译成类似下面的这些代码[7]:
// 状态存储 |
可以看到中间层返回的 StateMachine 本身记录了内部调用的 Future 所处的状态。最外层的调用方如果需要恢复执行,只需再调用 middle
返回 future 的 poll
方法即可,
middle
会根据当前状态决定去 poll
哪个内层 future。
轮询与中断/回调
异步编程的特征之一,是当资源未就绪时,先暂停当前控制流,先执行其它可推进的逻辑,等资源就绪时,再恢复之前暂停的控制流。那什么时候才知道资源就绪呢?一般有两种方法:轮询与中断。
轮询
轮询很好理解,就是外围调用方不断调用 poll
方法去查看当前资源的状态是否就绪:
future = middle(); |
但如果是这么做,资源未就绪前会不断执行 future.poll
,浪费 CPU。此时空闲的 CPU
可以用来处理其它就绪的 future,于是可以把所有需要轮询的协程添加到一个队列里,这样一个线程就可以处理 N 个协程。伪代码如下:
loop { |
这会引申出一个问题:在 ① 中,如果 middle future 的结果就绪了,接下来需要执行哪部分代码呢?显然需要从 future 暂停的地方接着执行(即 outer 的后续逻辑),但我们怎么找到外层的逻辑?
一种想法是把外层逻辑也封装成一个 future[8],队列里直接存放outer
future 而不是 middle future,恢复时只要执行 outer future 的 poll
方法即可。这就是异步编程的传染性,只要内部有一处异步,它的每个调用方都需要是异步的,一直到顶层的 main 函数[9]。
于是就像有多个线程一样,我们的队列里可以存放 N 个顶层的 future,可以类比成轮询
N 个 main 函数。这个不断从队列中获取新的协程并调用 poll
的角色在 Rust 里叫
executor[10]。
loop { |
② 中的逻辑会不断把未就绪的 future 放入队列,这样每轮轮询时都会 poll 所有future,这样依旧会浪费很多资源(CPU & IO),最理想的方式是每次 poll 时只poll 那些“很有希望 ready”的 future。这就是我们下面要说的“中断”的模式,当资源就绪时,再把 future 加入队列。
中断与 waker
我们希望 future 只在资源就绪时才被重新放回队列[11],于是 executor 需要提供如下方法(伪代码):
let mut ready_queue = Queue::new(); |
现在的问题是:“谁”负责在“什么时候”调用 wake_up
方法?
先来看“谁”的问题,唤醒的条件是资源就绪,那必然是资源的拥有者来唤醒,而只有“最内层”的协程才知道它等待的是什么资源,因此需要最内层的协程(通过注册回调函数)来触发。但是 wake_up
唤醒的时候得唤醒最外层的协程,即上面伪代码的参数 n
,于是每次调用 poll 都需要把 n
一路下传到最内层:
fn run() { |
当然,伪代码里用 future 的序号 n
来唤醒外层 future 是一个实现细节。回过头来看 rust Future
接口,它包含了一个 Context
的引用,cx.waker()
可以获得“唤醒器”,再调用wake
方法即可唤醒对应的最外层的协程。与 n
一样,每次对
poll
的调用,都需要把 cx
一路下传到最内层。
pub trait Future { |
另一个问题是“什么时候”调用,显然是“资源就绪”时。那怎么知道资源什么时候就绪?这就需要资源的提供方来通知了。通常异步编程多是在处理 IO,对于 IO 一般是操作系统通过 select
或者 epoll
等等机制提供了异步通知的能力。代码里需要在等待资源时加上回调函数。整体逻辑如下图:
其中的 reactor 会监听所有在等待的资源,如果某个资源就绪了,同步的 poll
会返回就绪的资源,reactor 会调用它们的回调函数(即 wake
方法来唤醒)。Rust 里一般把 executor 和 reactor 合起来称为 Runtime。
Python 实现
前文的描述都是以 Rust 为样例,这是因为 Rust 里的角色分得相对更清楚一些。像 executor 和 reactor 的能力,在 Python 里都囊括在 event loop 里了,能监听什么资源,也被安排得明明白白了。
Python 里也经常用到
Future
,但它的概念和 Rust 里的不太一样,Python 中的 Future
本身是一个协程(实现了
await
方法),另外有一个 set_result
方法能设置最终结果,结果设置后,协程就能正常返回了(类似Rust里返回 Poll::Ready
)。
Python 里的一个典型协程工作流如下所示:
图里包含了比较多的细节,整体逻辑和 Rust 类似,注意几点:
inner
注册监听事件时,Python 的做法是创建一个 future、注册事件,await future
- 事件的注册最终都是调用 loop 的 API 来完成,也说明 Python 的 loop 包含了多个角色
- 几乎所有的操作都是异步的,包括注册,也是通过
loop.call_soon
延迟执行的 future.set_result
之后,也是通过call_soon
延迟唤醒协程- 唤醒后的协程,是直接从断点处恢复的(通过栈帧机制),与 Rust 不同
- Event Loop 直接操作的是
task
而不是coroutine
,它是一个包装类,提供了取消、唤醒等功能
小结
异步编程的优势主要是节省线程数量(从而节省线程占用的栈等资源),也有说减少线程切换来节省 CPU 消耗。但总的来说,异步的最大作用和目标是提高吞吐而非降低延时。
但是,异步编程的缺点也很明显,最关键的是它的“传染性”,只要有一处要异步,所有地方都需要异步。另一个是“隔离性”,它的生态和同步的方法天然不通,一般为了支持异步,几乎所有同步的标准库都需要重写一个异步版本的。我甚至认为如果“高吞吐”不是产品的核心特性(如网关),就不应该使用异步框架。
本文尝试挖掘 Rust 和 Python 实现异步框架的模式,让我们对异步的底层实现建立一个概念,希望借助这些概念,去理解、解决编程中遇到的异步相关问题。文章主要讲解了三方面的内容:
- 协程的核心是控制流的中断和恢复,Python 为代表的 GC 语言用的是存储栈帧的方式,而以 Rust 为代表的非 GC 语言使用了编译成状态机的方式。
- 异步的优势想要体现,需要满足一个线程可以处理多个协程的能力。轮询的想法引导我们创建了 executor 处理协程队列的思路;中断的想法引导我们理清 reactor 的作用以及上下层需要传递的信息。
- 最后是过程中列举了 Rust 和 Python 典型的协程工作流,可以从实现上相互印证两种具体的实现思路。但在编程的使用方来看二者的 API 又没有太大的差异。
Python 的 coroutine 和 generator 基本是同一套实现机制,本文里有时会混用两个术语 ↩
如果用 await 则要求内层调用实现了
__await__
方法 ↩ref: https://peps.python.org/pep-0342/#new-generator-method-send-value ↩
推荐看这篇文章:https://os.phil-opp.com/async-await/#the-async-await-pattern ↩
这里说法不太准确,但不影响理解。
yield from
只是把各个 coroutine 连接在一起,不会真的返回栈帧 ↩在有 await 及编译器支持之前,基本是需要人肉做状态的保存和恢复的 ↩
代码改编自 https://os.phil-opp.com/async-await/#the-async-await-pattern ↩
这里的含义是 outer 方法也使用
await
来获取结果。 ↩如果调用方自己不做成异步,则需要在代码里“同步”等待 future.poll 返回 ready,或者等待统一轮询队列的就绪通知,无论如何,它所在的线程在内部的异步任务完成前是不会释放的,就达不到异步编程“节省线程”的目的了。 ↩
文中只展示了简单的模型,executor 的实现可以相当复杂,参考 Making the Tokio scheduler 10x faster ↩
当 future 刚被创建时我们并不知道它是否就绪,此时也需要放入队列触发第一次 poll,在 poll 里如果资源未就绪,由 future 来注册后续的回调,因此当 future 第二次通过回调再被加入队列时,就“有信心”它依赖的资源就绪了。 ↩