AB

Rust Future 深入(Executor-2)

2025-09-01

Rust AsyncRuntime

在之前的实现中,我们接触的 executor 都只是一个 block_on ,其作用只是执行 Future 并阻塞,实际上没有让 Future 起到并发的作用。这次我们看看距离真正能够异步执行的 executor 还有多远。

目标

一个正常的 executor 应该具备什么 API 呢,我们参考一下 smol-rs/async-executor的例子:

RUST
use async_executor::Executor;
use futures_lite::future;

// Create a new executor.
let ex = Executor::new();

// Spawn a task.
let task = ex.spawn(async {
    println!("Hello world");
});

// Run the executor until the task completes.
future::block_on(ex.run(task));

他的示例 executor 包含了一个 spawn 方法,可以将一个 async block 或者任意 Future 包装成一个 Task,然后 run 它。从语义上看很不错,block_on 也没有白费,可以当作 join 使用,那么我们就从 spawn 开始吧。

定义 spawn

让我们先回忆一下 thread::spawn 是什么样的:将我们的逻辑包装成一个线程,放进线程池里,然后交给操作系统运行,返回一个 JoinHandle。我们的 spawn 也应该是这样,将 Future 参数包装成 Task 发送给一个 work loop 队列,由队列去 run 这个 Task。

先把函数签名写出来:

RUST
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;

pub fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
	todo!()
}

从外表上看,我们的 JoinHandle 完全就是这个 Future 本身,使用起来也很直观:从 spawn 开始,然后 block_on 结束。不过如果要将 Future 返回出去,同时还要发送给任务队列的话,就需要拷贝它,超出了 Pin<Box<F>> 的能力范畴,所以我们考虑将它替换成一个接收返回值的 oneshot

RUST
use futures::channel::oneshot;

pub fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };
	todo!();
    Box::pin(async { r.await.unwrap() })
}

补上 Task

到这里,我们发现还缺少对 Task 和任务队列的实现,我们先定义一下 Task 和任务队列。

RUST
use crossbeam::channel;
use once_cell::sync::Lazy;

struct Task {
    future: Pin<Box<dyn Future<Output = ()> + Send>>,
}

impl Task {
	fn run(self: Arc<Task>) {
		todo()!
	}
}

static QUEUE: Lazy<channel::Sender<Arc<Task>>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Arc<Task>>();

    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || receiver.iter().for_each(|task| task.run()));
    }

    sender
});

这个 struct Task 看起来也很草台,完全就是一个 Future 套壳,这一看就不太能满足我们的需求。但一个 Task 具体需要做什么呢?也许需要写更多一点代码才能知道。这里我们直接把任务队列定义成了 static 变量,因为我们没有打算把代码封装成 struct Executor

现在,我们可以把 spawn 补完了:

RUST
pub fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };

    let task = Arc::new(Task {
        future: Mutex::new(Box::pin(future)),
    });
    QUEUE.send(task).unwrap();

    Box::pin(async { r.await.unwrap() })
}

Task::run 内部实际是在调用 Future::poll,将 waker 传递给 Future,waker 的实际逻辑就是重新让任务进入队列。waker 是能够跨线程的,所以 task 需要使用 Arc 来包裹。

RUST
impl Task {
    fn run(self: Arc<Task>) {
        let task = self.clone();
        let waker = async_task::waker_fn(move || {
			QUEUE.send(task.clone()).unwrap();
        });

        let cx = &mut Context::from_waker(&waker);
        let poll = self.future.try_lock().unwrap().as_mut().poll(cx);
    }
}

处理 Task 的边缘场景

看起来好像完成了所有的逻辑,我们来理一下 Task 的完整生命周期是怎样的:

  1. 初始化,spawn 将 Task 发送到 QUEUE
  2. QUEUE 执行 task.run,触发 Future::poll
  3. future 开始执行,在 await 处暂停,返回 Pending
  4. future 被唤醒,执行 waker,将 Task 再次发送到 QUEUE

然后思考几个问题:

  1. 如果 future 返回了 Ready,之后 waker 依然被执行了,如何才能不 poll 一个已经完成的 Future?
  2. 如果某个 Task 被连续唤醒了两次,应该怎么办?
  3. 如果 future 执行的同时触发了 waker,它会不会立即被另一个线程捕获?

为了解决这几个问题,我们应该给 Task 添加一个状态标记,来区分当前 Task 的状态。

RUST
struct Task {
    state: AtomicUsize,
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

const WOKEN: usize = 0b01;
const RUNNING: usize = 0b10;

pub fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (s, r) = oneshot::channel();
    let future = async move {
        let _ = s.send(future.await);
    };

    let task = Arc::new(Task {
		//////////////////////////
		// 缓存 Task 的运行状态,初始状态为 0
        state: AtomicUsize::new(0),
        //////////////////////////
        future: Mutex::new(Box::pin(future)),
    });
    QUEUE.send(task).unwrap();

    Box::pin(async { r.await.unwrap() })
}

在每个调用 Task 和 waker 的关键位置,都需要根据当前的状态判断是否可以继续操作。

  1. run 将状态设置为 0b10
  2. waker 判断当前状态为 0b00,将 task 发送到队列
  3. waker 将第一位设置为 1
  4. poll 执行完成之后,判断当前状态是否为 0b11,也就是 poll 执行完成前被 waker 唤醒,前一次唤醒会失效,所以重新唤醒
  5. poll 执行完成后将第二位设置为 0
RUST
impl Task {
    fn run(self: Arc<Task>) {
        let task = self.clone();
        let waker = async_task::waker_fn(move || {
            if task.state.fetch_or(WOKEN, Ordering::SeqCst) == 0 {
                QUEUE.send(task.clone()).unwrap();
            }
        });

        self.state.store(RUNNING, Ordering::SeqCst);
        let cx = &mut Context::from_waker(&waker);
        let poll = self.future.try_lock().unwrap().as_mut().poll(cx);

        if poll.is_pending() && self.state.fetch_and(!RUNNING, Ordering::SeqCst) == WOKEN | RUNNING
        {
            QUEUE.send(self).unwrap();
        }
    }
}

简化

Task 的状态流转代码不多,但是理解起来很复杂。幸运的是,我们可以直接使用 async_task

RUST
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Task>();

    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || receiver.iter().for_each(|task| task.run()));
    }

    sender
});

type Task = async_task::Task<()>;
type JoinHandle<R> = Pin<Box<dyn Future<Output = R> + Send>>;

fn spawn<F, R>(future: F) -> JoinHandle<R>
where
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
    task.schedule();
    Box::pin(async { handle.await.unwrap() })
}

当然,async_task 的好处不止是简化了代码,同时它也提高了我们代码的可靠性(毕竟它的实现不是这么寥寥几行)。例如,当 async_task 完成 task 后,会直接 drop future,而不是等到所有引用都失效再回收。

处理 panic

最后,我们还没有处理 future 当中可能出现的 panic,我们使用一下 async_task 提供的类型定义,它会返回 Option<R> 而不是 R

RUST
type JoinHandle<R> = async_task::JoinHandle<R, ()>;

impl<R> Future for JoinHandle<R> {
    type Output = R;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match Pin::new(&mut self.0).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
        }
    }
}

同时,我们在 QUEUE 当中 catch 一下可能出现的 panic,避免因为任务导致整个 Runtime 挂掉。

RUST
use std::panic::catch_unwind;

static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
    let (sender, receiver) = channel::unbounded::<Task>();

    for _ in 0..num_cpus::get().max(1) {
        let receiver = receiver.clone();
        thread::spawn(move || {
            receiver.iter().for_each(|task| {
                let _ = catch_unwind(|| task.run());
            })
        });
    }

    sender
});

总结

以上大致就实现了一个高性能的 async Executor,其任务分发逻辑和 tokioasync_stdsmol 没有本质区别。同时,由于我们的实现没有包含任何多余的内存分发和回收逻辑,其性能是非常好的。

唯一缺少的部分是关于 Work Stealing 的。目前我们只有一个任务队列,所有的线程都需要从一个队列中竞争获取任务,这会影响性能表现。比较好的实践是每个线程拥有一个自己的队列,当自己队列为空时,从其他队列中窃取任务,这就减少了竞争的出现。

后续,我们将关注如何将现有的 IO 操作转化为异步操作。