Rust AsyncRuntime
Futures
在 Rust 中,Future 指的是某个将在未来完成的操作。Rust 的 Async 语法使用基于 Poll 的方法实现,异步任务将会经历三个 Poll、Wait、Wake 三个阶段。
- Poll:Executor 轮询 Future,执行代码直到遇上一个阻塞点,无法继续
- Wait:Reactor 注册阻塞的 Future,等待某个信号发生时唤醒这个 Future
- Wake:Future 被 Reactor 唤醒,由 Executor 继续轮询该 Future
Future 还分为 leaf-future 和 non-leaf-future,简单来说,leaf-future 代表着会真实阻塞 CPU 运行的操作系统资源,non-leaf-future 代表着非阻塞的异步操作。
一般来说,我们不会自行实现一个 Leaf Future,这是 Runtime 需要做的事。Async Runtime 对 OS 资源的操作将是非阻塞的,并返回一个 Leaf Future,它是我们实际等待的 Future 实例。
Leaf Future 的例子:
// stream is a **leaf-future**
let mut stream = tokio::net::TcpStream::connect("127.0.0.1:3000");
Non-leaf Future 是使用 Runtime 的用户通过 async 关键字自行创建的 Future,它可以被 executor 执行。
// Non-leaf-future
let non_leaf = async {
let mut stream = TcpStream::connect("127.0.0.1:3000").await.unwrap();// <- yield
println!("connected!");
let result = stream.write(b"hello world\n").await; // <- yield
println!("message sent!");
...
};
这些任务的关键在于它们能够将控制权交还给 Runtime executor,然后在稍后的时间点重新从它离开的地方继续执行。
与 Leaf Future不同,这类 Future 本身并不代表I/O资源。当 Runtime 轮询这些 Future 时,要么运行一些代码,要么在等待某个资源向我们发出信号以便我们可以继续之前未完成的操作时让出 executor。
Runtimes
Rust 和其他语言(C#, JavaScript, Java, GO)的不同点在于程序语言本身没有提供 Async Runtime。通常,我们希望将通知 Future 继续任务和实际执行 Future 任务的两个职责分开,所以一个 Async Runtime 可以分为两部分:Executor 和 Reactor,这两个部分通过 Waker 类型进行交互。
目前比较流行的 Async Runtime 是 Tokio 和 async_std。Rust 标准库在这方面负责:
- trait Future 表示未来将要完成的操作
- async、await 关键字及挂起、恢复任务的创建方式
- type Waker 用于唤醒挂起的任务
也就是说,Rust 标准库没有提供非阻塞I/O的定义,也没有说明如何创建这些任务或如何运行它们。
IO 密集型任务
我们上面分析了 Async Runtime 的任务处理流程,接下来我们看一个简单的例子:
let non_leaf = async {
let mut stream = TcpStream::connect("127.0.0.1:3000").await.unwrap(); // <-- yield
// request a large dataset
let result = stream.write(get_dataset_request).await.unwrap(); // <-- yield
// wait for the dataset
let mut response = vec![];
stream.read(&mut response).await.unwrap(); // <-- yield
// do some CPU-intensive analysis on the dataset
let report = analyzer::analyze_data(response).unwrap();
// send the results back
stream.write(report).await.unwrap(); // <-- yield
};
根据我们上面对 Future 执行流程的解释,yield 点之间的逻辑会在 executor 线程执行,这意味着 executor 会被 CPU 密集型任务(例如分析数据)阻塞,这会影响其他任务的调度。我们有几种方法解决这个问题:
- 创建一个新的 Leaf Future 将这些 yield 之间的操作发送到另一个线程,并在任务完成时 resolve。
- Runtime 主线程是一个 supervisor,executor 在另一个独立线程当中,这样即使 CPU 密集型任务阻塞了 supervisor,也不影响调度其他任务。
- 可以自行创建一个与 Runtime 兼容的 Reactor,以你认为合适的方式执行分析,并返回一个可以等待的Future
大多数 Runtime 使用方法一,小部分使用方法二。大多数执行器都有办法通过像spawn_blocking 这样的函数来实现方法一。
Waker
Waker 是 Rust 原生提供的类型,为了让 Runtime 的 Executor 和 Reactor 两个部分能够以松耦合的形式组织。没有 Waker 的话,Executor 将是通知正在运行的任务的唯一方式,而有了 Waker ,我们获得了一种松耦合结构,使得用新的底层任务扩展生态系统变得更容易。
实现 Waker
当我们自己实现 Futures 时,遇到的最令人困惑的事情之一是如何实现 Waker。创建 Waker 涉及创建一个 vtable,这使我们能够使用动态分发(Dynamic Dispatch)来调用我们自己构造的类型擦除 trait 对象上的方法。
Rust 中的胖指针
下面这段代码可以让我们看到 Rust 中不同指针类型所占的空间大小:
use std::mem::size_of;
trait SomeTrait {}
fn main() {
println!("======== The size of different pointers in Rust: ========");
println!("&dyn Trait:-----{}", size_of::<&dyn SomeTrait>());
println!("&[&dyn Trait]:--{}", size_of::<&[&dyn SomeTrait]>());
println!("Box<Trait>:-----{}", size_of::<Box<dyn SomeTrait>>());
println!("&i32:-----------{}", size_of::<&i32>());
println!("&[i32]:---------{}", size_of::<&[i32]>());
println!("Box<i32>:-------{}", size_of::<Box<i32>>());
println!("&Box<i32>:------{}", size_of::<&Box<i32>>());
println!("[&dyn Trait;4]:-{}", size_of::<[&dyn SomeTrait; 4]>());
println!("[i32;4]:--------{}", size_of::<[i32; 4]>());
}
为了方便,下面贴出结果(运行环境:Macbook Pro M3Pro)
======== The size of different pointers in Rust: ========
&dyn Trait:-----16
&[&dyn Trait]:--16
Box<Trait>:-----16
&i32:-----------8
&[i32]:---------16
Box<i32>:-------8
&Box<i32>:------8
[&dyn Trait;4]:-64
[i32;4]:--------16
从输出中可以看出,引用的大小各不相同。 许多是8字节(这是64位系统上指针的大小),但有些是16字节。16字节的指针被称为胖指针,因为它们携带了额外的信息。
- 以
&[i32]为例:前 8 个字节是数组第一个元素的指针,后 8 个字节是切片的长度 - 以
&dyn SomeTrait为例:这是一个胖指针,本身是对一个 trait 的应用,前 8 个字节指向特征对象的数据,后 8 个字节指向特征对象的 vtable
这样做的原因是可以允许我们引用一个实际上我们一无所知的对象,除了它实现了我们的 trait 中定义的方法。为了做到这一点,我们需要使用动态分发。
下面的代码演示了如何手动通过纯代码和 vtable 实现一个 trait 对象,而不是直接派生。
// A reference to a trait object is a fat pointer: (data_ptr, vtable_ptr)
trait Test {
fn add(&self) -> i32;
fn sub(&self) -> i32;
fn mul(&self) -> i32;
}
// This will represent our home brewn fat pointer to a trait object
#[repr(C)]
struct FatPointer<'a> {
/// A reference is a pointer to an instantiated `Data` instance
data: &'a mut Data,
/// Since we need to pass in literal values like length and alignment it's
/// easiest for us to convert pointers to usize-integers instead of the other way around.
vtable: *const usize,
}
// This is the data in our trait object. It's just two numbers we want to operate on.
struct Data {
a: i32,
b: i32,
}
// ====== function definitions ======
fn add(s: &Data) -> i32 {
s.a + s.b
}
fn sub(s: &Data) -> i32 {
s.a - s.b
}
fn mul(s: &Data) -> i32 {
s.a * s.b
}
fn main() {
let mut data = Data {a: 3, b: 2};
// vtable is like special purpose array of pointer-length types with a fixed
// format where the three first values has a special meaning like the
// length of the array is encoded in the array itself as the second value.
let vtable = vec![
0, // pointer to `Drop` (which we're not implementing here)
6, // lenght of vtable
8, // alignment
// we need to make sure we add these in the same order as defined in the Trait.
add as usize, // function pointer - try changing the order of `add`
sub as usize, // function pointer - and `sub` to see what happens
mul as usize, // function pointer
];
let fat_pointer = FatPointer { data: &mut data, vtable: vtable.as_ptr()};
let test = unsafe { std::mem::transmute::<FatPointer, &dyn Test>(fat_pointer) };
// And voalá, it's now a trait object we can call methods on
println!("Add: 3 + 2 = {}", test.add());
println!("Sub: 3 - 2 = {}", test.sub());
println!("Mul: 3 * 2 = {}", test.mul());
}
我们可以看到,trait 派生其实可以通过一个包含 Data 值和 vtable 组合成的胖指针转换而成。看过之后,trait 是否显得不是那么神秘了?当我们实现 Waker 的时候,就需要使用到类似的技巧。