Rust AsyncRuntime
这一章主要讲 Rust 当中的 Generators 的前世今生(目前版本已经改名为 Coroutines),然后再从生成器推广到 Async。
在最初设计 Rust 的并发时,主要的方向有以下三个:
- 有栈协程,类似于 Green Threads
- 组合子(Combinators)
- 无栈协程,类似于 Generator
Green Threads 在Rust Future(起步-0)文章中讨论过,所以我们重点介绍一下其他方向。
组合子 Combinators
Futures 0.1 使用组合子实现,它看上去类似于 JS 的 Promise。
let future = Connection::connect(conn_str).and_then(|conn| {
conn.query("somerequest").map(|row|{
SomeStruct::from(row)
}).collect::<Vec<SomeStruct>>()
});
let rows: Result<Vec<SomeStruct>, SomeLibraryError> = block_on(future);
其主要有三个缺点:
- 产生的错误信息可能非常长且难以理解
- 内存占用情况不理想
- 不支持跨组合子借用
由于第三点,该方案最终被弃用了。不允许在组合子之间借用会导致使用起来非常不便,产生很多额外的内存分配和拷贝,最终效率低下。
产生较高的内存使用率的原因是,组合子是一种基于回调的方法,每个闭包都会存储计算所需的所有数据。这意味着,当我们把这些闭包链接在一起时,所需的内存会随着每一步的增加而增加。
无栈协程 Generator
这是目前 Rust 在使用的模型,有几个显著的优势:
- 使用 async/await 作为关键字,将普通的 Rust 代码转换为无堆栈协程很容易(甚至可以使用宏来完成)。
- 无需上下文切换和保存/恢复 CPU 状态
- 无需处理动态堆栈分配
- 非常节省内存
- 允许在 yield 点之间借用
async fn myfn() {
let text = String::from("Hello world");
let borrowed = &text[0..5];
somefuture.await;
println!("{}", borrowed);
}
这种实现方式不会产生任何额外的内存和上下文切换开销,编写代码时也不会有很大的差异。
生成器如何工作
在闭包中使用 yield 关键字会将其转换为生成器,也就是说,yield 关键字本身是一种语法糖,它会按照 yield 出现的位置将代码分割成多个部分,使调度器能够在 yield 位置移交控制权。我们可以看一个简单的闭包生成器例子:
#![feature(generators, generator_trait)]
use std::ops::{Generator, GeneratorState};
fn main() {
let a: i32 = 4;
let mut gen = move || {
println!("Hello");
yield a * 2;
println!("world!");
};
if let GeneratorState::Yielded(n) = gen.resume() {
println!("Got value {}", n);
}
if let GeneratorState::Complete(()) = gen.resume() {
()
};
}
在没有 Pin 这个概念的时候,它编译出来可能像是这样:
fn main() {
let mut gen = GeneratorA::start(4);
if let GeneratorState::Yielded(n) = gen.resume() {
println!("Got value {}", n);
}
if let GeneratorState::Complete(()) = gen.resume() {
()
};
}
// If you've ever wondered why the parameters are called Y and R the naming from
// the original rfc most likely holds the answer
enum GeneratorState<Y, R> {
Yielded(Y), // originally called `Yield(Y)`
Complete(R), // originally called `Return(R)`
}
trait Generator {
type Yield;
type Return;
fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return>;
}
enum GeneratorA {
Enter(i32),
Yield1(i32),
Exit,
}
impl GeneratorA {
fn start(a1: i32) -> Self {
GeneratorA::Enter(a1)
}
}
impl Generator for GeneratorA {
type Yield = i32;
type Return = ();
fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return> {
// lets us get ownership over current state
match std::mem::replace(self, GeneratorA::Exit) {
GeneratorA::Enter(a1) => {
/*----code before yield----*/
println!("Hello");
let a = a1 * 2;
*self = GeneratorA::Yield1(a);
GeneratorState::Yielded(a)
}
GeneratorA::Yield1(_) => {
/*-----code after yield-----*/
println!("world!");
*self = GeneratorA::Exit;
GeneratorState::Complete(())
}
GeneratorA::Exit => panic!("Can't advance an exited generator!"),
}
}
}
我们可以看到编译前后的代码差异很大,这都归功于 yield 指令,让代码重写成了状态机的形式。当你理解了 yield 的作用,也就理解了 await 的作用,它们非常相似。不过在看 await 之前,我们再回到上面的代码。想象一下,如果在不同 yield 点之间发生借用的话会发生什么?显然,这是不可能做到的。
我们来看更多一点代码然后继续讨论吧,首先,是一个简单的包含借用操作的 Generator。
let mut generator = move || {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow;
yield borrowed.len();
println!("{} world!", borrowed);
};
它重写之后是什么样子?
# enum GeneratorState<Y, R> {
# Yielded(Y),
# Complete(R),
# }
#
# trait Generator {
# type Yield;
# type Return;
# fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return>;
# }
enum GeneratorA {
Enter,
Yield1 {
to_borrow: String,
borrowed: &String, // uh, what lifetime should this have?
},
Exit,
}
# impl GeneratorA {
# fn start() -> Self {
# GeneratorA::Enter
# }
# }
impl Generator for GeneratorA {
type Yield = usize;
type Return = ();
fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return> {
// lets us get ownership over current state
match std::mem::replace(self, GeneratorA::Exit) {
GeneratorA::Enter => {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow; // <--- NB!
let res = borrowed.len();
*self = GeneratorA::Yield1 {to_borrow, borrowed};
GeneratorState::Yielded(res)
}
GeneratorA::Yield1 {to_borrow, borrowed} => {
println!("Hello {}", borrowed);
*self = GeneratorA::Exit;
GeneratorState::Complete(())
}
GeneratorA::Exit => panic!("Can't advance an exited generator!"),
}
}
}
这段代码过不了编译,问题出在 &String 类型没有生命周期标识,Rust 编译器无法推断他是什么生命周期。to_borrow 在 Enter 中被创建,Enter 执行完成之后,to_borrow 就应该被销毁,所以 borrowed 就成为了悬垂指针,而这是 unsafe 的
所以我们重新用 unsafe 实现一下这个生成器,最终的效果看起来有点像是自引用结构。
enum GeneratorState<Y, R> {
Yielded(Y),
Complete(R),
}
trait Generator {
type Yield;
type Return;
fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return>;
}
enum GeneratorA {
Enter,
Yield1 {
to_borrow: String,
borrowed: *const String, // NB! This is now a raw pointer!
},
Exit,
}
impl GeneratorA {
fn start() -> Self {
GeneratorA::Enter
}
}
impl Generator for GeneratorA {
type Yield = usize;
type Return = ();
fn resume(&mut self) -> GeneratorState<Self::Yield, Self::Return> {
match self {
GeneratorA::Enter => {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow;
let res = borrowed.len();
*self = GeneratorA::Yield1 {to_borrow, borrowed: std::ptr::null()};
// NB! And we set the pointer to reference the to_borrow string here
if let GeneratorA::Yield1 {to_borrow, borrowed} = self {
*borrowed = to_borrow;
}
GeneratorState::Yielded(res)
}
GeneratorA::Yield1 {borrowed, ..} => {
let borrowed: &String = unsafe {&**borrowed};
println!("{} world", borrowed);
*self = GeneratorA::Exit;
GeneratorState::Complete(())
}
GeneratorA::Exit => panic!("Can't advance an exited generator!"),
}
}
}
可以看到 Yield1 当中有一个 unsafe,这次代码可以通过编译了,我们让他跑起来。
pub fn main() {
let mut gen = GeneratorA::start();
let mut gen2 = GeneratorA::start();
if let GeneratorState::Yielded(n) = gen.resume() {
println!("Got value {}", n);
}
if let GeneratorState::Yielded(n) = gen2.resume() {
println!("Got value {}", n);
}
if let GeneratorState::Complete(()) = gen.resume() {
()
};
}
Ok,然后再添加一行代码运行一下:
pub fn main() {
let mut gen = GeneratorA::start();
let mut gen2 = GeneratorA::start();
if let GeneratorState::Yielded(n) = gen.resume() {
println!("Got value {}", n);
}
std::mem::swap(&mut gen, &mut gen2); // <--- Big problem!
if let GeneratorState::Yielded(n) = gen2.resume() {
println!("Got value {}", n);
}
if let GeneratorState::Complete(()) = gen.resume() {
()
};
}
添加一行 swap 代码之后运行 main 函数,我们观测到了 SIGSEGV 段错误!虽然 unsafe 能正常编译,但是他会让我们的代码存在 UB(未定义行为),很可能出错!那么该如何解决这个问题呢?答案就是下一节要讲解的 Pin。
Async 和 Generator
最后,我们回顾一下 Async 和 Generator 之间的关系。对比一下 Async 和 Generator 代码,就可以看到它们的相似之处。
let mut gen = move || {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow;
yield borrowed.len();
println!("{} world!", borrowed);
};
let mut fut = async {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow;
SomeResource::some_task().await;
println!("{} world!", borrowed);
};
两者都是状态机实现,语法也相似,不同之处主要是状态定义。Generator::resume 类似于 Future::poll,Yielded 和 Complete 类似于 Pending 和 Ready。
std 中的 generators
Rust Nightly 中提供了 generators 的实现(当然在最新版本中改名叫 coroutines 了),我们来直接使用 std 实现上面的例子吧:
#![feature(coroutines)]
#![feature(coroutine_trait)]
#![feature(stmt_expr_attributes)]
use std::ops::{Coroutine, CoroutineState};
pub fn main() {
let gen1 = #[coroutine]
static || {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow;
yield borrowed.len();
println!("{} world!", borrowed);
};
let gen2 = #[coroutine]
static || {
let to_borrow = String::from("Hello");
let borrowed = &to_borrow;
yield borrowed.len();
println!("{} world!", borrowed);
};
let mut pinned1 = Box::pin(gen1);
let mut pinned2 = Box::pin(gen2);
if let CoroutineState::Yielded(n) = pinned1.as_mut().resume(()) {
println!("Gen1 got value {}", n);
}
if let CoroutineState::Yielded(n) = pinned2.as_mut().resume(()) {
println!("Gen2 got value {}", n);
};
let _ = pinned1.as_mut().resume(());
let _ = pinned2.as_mut().resume(());
}