Rust AsyncRuntime
并发编程是很基础的内容,这次学习主要是系统了解一下常见的单机并发手段,以及相应的区别。最终的学习目标是手写一个简单的 Async Runtime。
参考资料主要是 cfsamson 的 Futures Explained in 200 lines of Rust。github.io 原文已不可追溯(原文被删除可能是因为该作者后来又写了一本书《Asynchronous Programming in Rust》),可以在 github 找到原文的 fork,或是从互联网档案馆找到原始网站。
先了解一些基础知识
1 OS 线程
最基础的并发手段,运行时就是操作系统本身,各种编程语言操作线程的方式都差不多。
| 优势 | 劣势 |
|---|---|
| 简单 | 线程的栈空间很大,OS 需要分配大量内存,任务数较多时会遇到内存瓶颈 |
| 易于使用 | 内核态操作多,开销大 |
| 线程间切换快 | 由操作系统调度,CPU 时间不可保证 |
| 无需任何其他库就可以处理并发 | 多系统兼容性不能保证 |
在很多场景下,OS 线程都是很好的解决方案,他有很多优势,但同样也有很多场景不适合他,例如Web场景需要应对大量 IO 和网络请求,小任务数量大。同时Web负载是动态的,可能会出现无法预测的任务数量,导致难以维护。
点击展开代码示例
use std::thread;
fn main() {
println!("So we start the program here!");
let t1 = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(200));
println!("We create tasks which gets run when they're finished!");
});
let t2 = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(100));
println!("We can even chain callbacks...");
let t3 = thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(50));
println!("...like this!");
});
t3.join().unwrap();
});
println!("While our tasks are executing we can do other stuff here.");
t1.join().unwrap();
t2.join().unwrap();
}
2 Green Threads
绿色线程是相对于 OS 线程而言的,它完全在用户空间中调度,基本思路和操作系统基本相同:为每个任务创建一个线程,设置堆栈,保存CPU的状态,并通过执行“上下文切换”在不同任务(线程)之间跳转。
| 优势 | 劣势 |
|---|---|
| 易于使用(看起来代码和 OS 线程差不多) | 堆栈可能需要增长 |
| 上下文切换很快 | 每次切换需要保存栈帧 |
| 每个线程只需要分配少量堆栈空间,节约内存资源 | 这不是零成本抽象(所以Rust去掉了它) |
| 可以集成抢占式调度,获得大量稳定的控制权 | 多系统兼容比较困难 |
此类系统的核心是调度器,每当线程执行到阻塞资源时,会触发上下文切换,CPU 会切换到主线程(调度器线程),然后由调度器指派下一个任务。Rust 曾经有绿色线程,但在 1.0 发布前就移除了。
点击展开代码示例
# #![feature(asm, naked_functions)]
# use std::ptr;
#
# const DEFAULT_STACK_SIZE: usize = 1024 * 1024 * 2;
# const MAX_THREADS: usize = 4;
# static mut RUNTIME: usize = 0;
#
# pub struct Runtime {
# threads: Vec<Thread>,
# current: usize,
# }
#
# #[derive(PartialEq, Eq, Debug)]
# enum State {
# Available,
# Running,
# Ready,
# }
#
# struct Thread {
# id: usize,
# stack: Vec<u8>,
# ctx: ThreadContext,
# state: State,
# task: Option<Box<dyn Fn()>>,
# }
#
# #[derive(Debug, Default)]
# #[repr(C)]
# struct ThreadContext {
# rsp: u64,
# r15: u64,
# r14: u64,
# r13: u64,
# r12: u64,
# rbx: u64,
# rbp: u64,
# thread_ptr: u64,
# }
#
# impl Thread {
# fn new(id: usize) -> Self {
# Thread {
# id,
# stack: vec![0_u8; DEFAULT_STACK_SIZE],
# ctx: ThreadContext::default(),
# state: State::Available,
# task: None,
# }
# }
# }
#
# impl Runtime {
# pub fn new() -> Self {
# let base_thread = Thread {
# id: 0,
# stack: vec![0_u8; DEFAULT_STACK_SIZE],
# ctx: ThreadContext::default(),
# state: State::Running,
# task: None,
# };
#
# let mut threads = vec![base_thread];
# threads[0].ctx.thread_ptr = &threads[0] as *const Thread as u64;
# let mut available_threads: Vec<Thread> = (1..MAX_THREADS).map(|i| Thread::new(i)).collect();
# threads.append(&mut available_threads);
#
# Runtime {
# threads,
# current: 0,
# }
# }
#
# pub fn init(&self) {
# unsafe {
# let r_ptr: *const Runtime = self;
# RUNTIME = r_ptr as usize;
# }
# }
#
# pub fn run(&mut self) -> ! {
# while self.t_yield() {}
# std::process::exit(0);
# }
#
# fn t_return(&mut self) {
# if self.current != 0 {
# self.threads[self.current].state = State::Available;
# self.t_yield();
# }
# }
#
# fn t_yield(&mut self) -> bool {
# let mut pos = self.current;
# while self.threads[pos].state != State::Ready {
# pos += 1;
# if pos == self.threads.len() {
# pos = 0;
# }
# if pos == self.current {
# return false;
# }
# }
#
# if self.threads[self.current].state != State::Available {
# self.threads[self.current].state = State::Ready;
# }
#
# self.threads[pos].state = State::Running;
# let old_pos = self.current;
# self.current = pos;
#
# unsafe {
# switch(&mut self.threads[old_pos].ctx, &self.threads[pos].ctx);
# }
# true
# }
#
# pub fn spawn<F: Fn() + 'static>(f: F){
# unsafe {
# let rt_ptr = RUNTIME as *mut Runtime;
# let available = (*rt_ptr)
# .threads
# .iter_mut()
# .find(|t| t.state == State::Available)
# .expect("no available thread.");
#
# let size = available.stack.len();
# let s_ptr = available.stack.as_mut_ptr();
# available.task = Some(Box::new(f));
# available.ctx.thread_ptr = available as *const Thread as u64;
# ptr::write(s_ptr.offset((size - 8) as isize) as *mut u64, guard as u64);
# ptr::write(s_ptr.offset((size - 16) as isize) as *mut u64, call as u64);
# available.ctx.rsp = s_ptr.offset((size - 16) as isize) as u64;
# available.state = State::Ready;
# }
# }
# }
#
# fn call(thread: u64) {
# let thread = unsafe { &*(thread as *const Thread) };
# if let Some(f) = &thread.task {
# f();
# }
# }
#
# #[naked]
# fn guard() {
# unsafe {
# let rt_ptr = RUNTIME as *mut Runtime;
# let rt = &mut *rt_ptr;
# println!("THREAD {} FINISHED.", rt.threads[rt.current].id);
# rt.t_return();
# };
# }
#
# pub fn yield_thread() {
# unsafe {
# let rt_ptr = RUNTIME as *mut Runtime;
# (*rt_ptr).t_yield();
# };
# }
#
# #[naked]
# #[inline(never)]
# unsafe fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
# asm!("
# mov %rsp, 0x00($0)
# mov %r15, 0x08($0)
# mov %r14, 0x10($0)
# mov %r13, 0x18($0)
# mov %r12, 0x20($0)
# mov %rbx, 0x28($0)
# mov %rbp, 0x30($0)
#
# mov 0x00($1), %rsp
# mov 0x08($1), %r15
# mov 0x10($1), %r14
# mov 0x18($1), %r13
# mov 0x20($1), %r12
# mov 0x28($1), %rbx
# mov 0x30($1), %rbp
# mov 0x38($1), %rdi
# ret
# "
# :
# : "r"(old), "r"(new)
# :
# : "alignstack"
# );
# }
# #[cfg(not(windows))]
fn main() {
let mut runtime = Runtime::new();
runtime.init();
Runtime::spawn(|| {
println!("I haven't implemented a timer in this example.");
yield_thread();
println!("Finally, notice how the tasks are executed concurrently.");
});
Runtime::spawn(|| {
println!("But we can still nest tasks...");
Runtime::spawn(|| {
println!("...like this!");
})
});
runtime.run();
}
# #[cfg(windows)]
# fn main() { }
3 基于回调的并发
这一节主要是在讲 JavaScript 的回调式异步编程。
基于回调的方法的核心思想是保存一个指向一组我们希望稍后运行的指令的指针,以及运行这些指令所需的所有状态,例如 JS 的回调函数或 Rust 的闭包。
| 优势 | 劣势 |
|---|---|
| 易于实现 | 每个任务都需要保存回调,这可能导致内存使用量随着回调数量增长而增长 |
| 没有上下文切换 | 难以 Debug,会产生回调地狱 |
| 内存开销比较低(大多数时候) | 编程方式完全不同,需要大量重写代码 |
| - | 不适合 Rust 的所有权模型 |
点击展开代码示例
fn program_main() {
println!("So we start the program here!");
set_timeout(200, || {
println!("We create tasks with a callback that runs once the task finished!");
});
set_timeout(100, || {
println!("We can even chain sub-tasks...");
set_timeout(50, || {
println!("...like this!");
})
});
println!("While our tasks are executing we can do other stuff instead of waiting.");
}
fn main() {
RT.with(|rt| rt.run(program_main));
}
use std::sync::mpsc::{channel, Receiver, Sender};
use std::{cell::RefCell, collections::HashMap, thread};
thread_local! {
static RT: Runtime = Runtime::new();
}
struct Runtime {
callbacks: RefCell<HashMap<usize, Box<dyn FnOnce() -> ()>>>,
next_id: RefCell<usize>,
evt_sender: Sender<usize>,
evt_reciever: Receiver<usize>,
}
fn set_timeout(ms: u64, cb: impl FnOnce() + 'static) {
RT.with(|rt| {
let id = *rt.next_id.borrow();
*rt.next_id.borrow_mut() += 1;
rt.callbacks.borrow_mut().insert(id, Box::new(cb));
let evt_sender = rt.evt_sender.clone();
thread::spawn(move || {
thread::sleep(std::time::Duration::from_millis(ms));
evt_sender.send(id).unwrap();
});
});
}
impl Runtime {
fn new() -> Self {
let (evt_sender, evt_reciever) = channel();
Runtime {
callbacks: RefCell::new(HashMap::new()),
next_id: RefCell::new(1),
evt_sender,
evt_reciever,
}
}
fn run(&self, program: fn()) {
program();
for evt_id in &self.evt_reciever {
let cb = self.callbacks.borrow_mut().remove(&evt_id).unwrap();
cb();
if self.callbacks.borrow().is_empty() {
break;
}
}
}
}
4 Promise
Promise、Future 这些名字基本上都在指代类似的东西,他是处理回调复杂度的一种方式:
function timer(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
timer(200)
.then(() => return timer(100))
.then(() => return timer(50))
.then(() => console.log("I'm the last one));
Promises 会返回一个状态机,它会处于三种状态中的一种:pending、fulfilled、rejected。上面示例中调用 timer 会生成一个 pending 状态的 Promise。得益于 Promise 状态机,我们可以获得更好的语法:
async function run() {
await timer(200);
await timer(100);
await timer(50);
console.log("I'm the last one");
}
运行函数可以被视为一个可暂停的任务,由多个子任务组成。在每个“await”点,它会将控制权让给调度器(在这种情况下,它是众所周知的Javascript事件循环)。一旦其中一个子任务的状态变为 fulfilled 或 rejected,调度器会继续执行下一个子任务。
在语法上,Rust 的 Futures 0.1 与上面的 promise 示例非常相似,而 Rust 的 Futures 0.3 则与我们上一个示例中的 async/await 非常相似。值得一提的是,JS 的 Promise 是积极求值的,而 Rust 的 Futures 是惰性求值的,需要被轮询之后才会开始执行操作。