Rust AsyncRuntime
经过前面的学习,我们已经大致了解了实现 Future 的完整流程,接下来我们会从社区当中现有的实现方案中汲取经验,优化之前的草台 Runtime。如果你想回头看看,可以从Rust Future(起步-0)开始了解之前的内容。
拆解 Runtime
我们的初步实现大致有两个部分:Executor(包含 Waker)、Reactor(包含 Task)。在之前的草台实现中,我们的 Executor 极其简单,还用了隐患很明显的 std::thread::park(相关问题可以看#2010),这次我们看看如何实现一个更高性能的 block_on + Waker。
block_on 拆解
- 声明部分:
pub fn block_on<F: std::future::Future>(mut f: F) -> F::Output,也就是说block_on函数接受一个 Future,返回 Future 的返回值; - 核心逻辑:
block_on使用一个阻塞循环轮询调用传入 Future 的poll,直到 Future 返回 Ready 后结束 - Future 的
poll函数有两个要求,第一个是需要给 Future 添加 Pin 标记(使其能够自引用),第二个是传入用 Context 包装的 Waker - Waker 负责提供线程唤醒能力,所以需要持有线程的
unpark
Unsafe 版本
如果不依赖外部库,只使用 std 来实现的话,会需要 unsafe 来处理一些特殊逻辑:
- 将 Future Pin 在栈内存上需要
unsafe,Pin 在堆上可以直接使用Box::pin - RawWaker 构建需要
static VTABLE,实际上类似于实现trait - 从 RawWaker 创建 Waker 需要
unsafe
这里使用了 Condvar + Mutex 来处理工作线程暂停,而不是存在死锁隐患的 thread::park。整体来看,代码非常清晰简练,且不需要任何第三方库。
源码参考
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
#[derive(Default)]
struct Park(Mutex<bool>, Condvar);
fn unpark(park: &Park) {
*park.0.lock().unwrap() = true;
park.1.notify_one();
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|clone_me| unsafe {
let arc = Arc::from_raw(clone_me as *const Park);
std::mem::forget(arc.clone());
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
},
|wake_me| unsafe { unpark(&Arc::from_raw(wake_me as *const Park)) },
|wake_by_ref_me| unsafe { unpark(&*(wake_by_ref_me as *const Park)) },
|drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },
);
/// Run a `Future`.
pub fn run<F: std::future::Future>(mut f: F) -> F::Output {
let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
let park = Arc::new(Park::default());
let sender = Arc::into_raw(park.clone());
let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
let waker = unsafe { Waker::from_raw(raw_waker) };
let mut cx = Context::from_waker(&waker);
loop {
match f.as_mut().poll(&mut cx) {
Poll::Pending => {
let mut runnable = park.0.lock().unwrap();
while !*runnable {
runnable = park.1.wait(runnable).unwrap();
}
*runnable = false;
}
Poll::Ready(val) => return val,
}
}
}
Safe 版本
参考:Build your own block_on(),原文链接已失效,可以先看译文
借助一些包装好的依赖库,我们可以实现一个没有 unsafe 代码的 block_on 函数,同时,这个版本增加了一些额外优化内容,包括 thread_local! 的 Parker 和 Waker 缓存,以及当递归调用 block_on 时添加了 panic。
我们可以对比上面的 extreme 版本,看看第三方库为我们做了什么:
crossbeam::sync::Parker:一个安全且高性能的线程暂停工具,通过许可证机制解决了thread::park可能因为时序竞争导致死锁的问题。内部其实就是 Condvar + Mutex,额外添加了 AtomicUsize 用于标记当前的线程状态。
当先调用 unpark 后调用 park 时,会跳过 park
fn park(&self, deadline: Option<Instant>) {
// If we were previously notified then we consume this notification and return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst);
// similar to extreme
let mut runnable = self.lock.lock().unwrap()
loop {
runnable = self.cvar.wait(runnable).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok() {
return;
}
}
}
async_task::waker_fn:非常简单的waker包装,不再需要unsafe和 vtable,waker_fn只在async_task <= 3.0.0中存在,最新的4.0.0已经去掉了这个 API- 内部逻辑和
extreme创建 Waker 的方法一模一样
pin_utils::pin_mut:用宏包装了在栈上 Pin Future 所需要的unsafe代码,实际和extreme的代码一样
所以我们可以看到,除了对线程暂停的处理之外,其实 extreme 的实践已经几乎就是最佳实践了。
use std::{
cell::RefCell,
task::{Context, Poll, Waker},
};
use crossbeam::sync::Parker;
use async_task::waker_fn;
use pin_utils::pin_mut;
pub fn block_on<F: Future>(future: F) -> F::Output {
pin_mut!(future);
thread_local! {
static CACHE: RefCell<(Parker, Waker)> = {
let parker = Parker::new();
let unparker = parker.unparker().clone();
let waker = waker_fn(move || unparker.unpark());
RefCell::new((parker, waker))
};
}
CACHE.with(|cache| {
let (parker, waker) = &mut *cache.try_borrow_mut().expect("recursive `block_on`");
let cx = &mut Context::from_waker(waker);
loop {
match future.as_mut().poll(cx) {
Poll::Ready(output) => return output,
Poll::Pending => parker.park(),
}
}
})
}
跑分
最后,我们跑个分看看上面两个 block_on 实现在性能上是否能和官方的 futures-rs 比较。
Benchmark
use criterion::criterion_group;
use criterion::criterion_main;
use std::{
pin::Pin,
task::{Context, Poll},
};
use criterion::Criterion;
use extreme::run;
use my_block_on::block_on::block_on;
struct Yields(u32);
impl Future for Yields {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 == 0 {
Poll::Ready(())
} else {
self.0 -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("safe yield 00", |b| b.iter(|| block_on(Yields(0))));
c.bench_function("safe yield 10", |b| b.iter(|| block_on(Yields(10))));
c.bench_function("safe yield 50", |b| b.iter(|| block_on(Yields(50))));
c.bench_function("extreme yield 00", |b| b.iter(|| run(Yields(0))));
c.bench_function("extreme yield 10", |b| b.iter(|| run(Yields(10))));
c.bench_function("extreme yield 50", |b| b.iter(|| run(Yields(50))));
c.bench_function("futures yield 00", |b| {
b.iter(|| futures::executor::block_on(Yields(0)))
});
c.bench_function("futures yield 10", |b| {
b.iter(|| futures::executor::block_on(Yields(10)))
});
c.bench_function("futures yield 50", |b| {
b.iter(|| futures::executor::block_on(Yields(50)))
});
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
测试设备:Macbook Pro M3Pro 18G RAM
$ cargo bench
safe yield 00 time: [1.3001 ns 1.3342 ns 1.3785 ns]
safe yield 10 time: [34.781 ns 34.870 ns 34.941 ns]
safe yield 50 time: [174.84 ns 175.98 ns 177.02 ns]
extreme yield 00 time: [15.977 ns 16.080 ns 16.218 ns]
extreme yield 10 time: [167.13 ns 168.52 ns 170.03 ns]
extreme yield 50 time: [565.55 ns 573.47 ns 582.58 ns]
futures yield 00 time: [3.0052 ns 3.0777 ns 3.1550 ns]
futures yield 10 time: [23.077 ns 23.379 ns 23.671 ns]
futures yield 50 time: [119.90 ns 121.04 ns 122.28 ns]
其他参考材料
- withoutboats/juliex:一个“可能”最简的线程池 Executor 实现