日常学习

Rust Async

May 09, 2021

并发模型

Rust Async 的现状

  1. Rust Async 的特点:
    • Futures是rust 内置的
    • Async是zero-cost 的,即: 并不需要 在堆上 进行内存分配 和动态分发,即高性能(you can use async without heap allocations and dynamic dispatch, which is great for performance!)
    • 没有内建的runtime, 由社区提供
    • 可以支持 单线程 或者 多线程 的runtime
  2. Rust Async vs threads 的比较
    • threads: 适合少量的工作, 不用改变代码结构, 生成 线程 与线程之间的切换 是非常昂贵的, 线程池可以一定程度上减轻此类开销
    • Async: 可以显著的减少CPU 与 内存的开销, 尤其是在对于 大量IO任务时。 比thread 模式能够处理更多的任务,因为 该模型,使用 较少的 threads 处理大量的 task。 但是其二进制的文件会比传统的非异步编码的代码要大。
  3. 异步编程的支持:
    • 标准库: 提供 Future trait 抽象
    • Rust Compiler: 提供对 async/await 语法的支持
    • futures crate: 提供 工具类型, macros, 以及 方法
    • async runtime: 提供对 async code, IO, task spawn 的运行。有 TOkio, async-std
  4. 编译 与 debug:
    • 为了支持异步代码: rust 需要使用异常一些更复杂的 语言特性, 比如lifetimes pinning。 你可能将经常遇到此类错误
    • runtime errors: 编译器 遇到 async function,将产生一个 状态机(state machine), Stack traces 将包含状态机内部的详细信息, 对比 Rust 同步代码, runtime errors debug 要复杂不少
  5. New failure modes: 异步Rust中可能会出现一些新颖的故障模式,例如,如果您从异步上下文中调用了阻止函数,或者您错误地实现了Future特性。 这样的错误可以无声地通过编译器,有时甚至可以通过单元测试。 本书旨在为您提供对底层概念的深刻理解,可以帮助您避免这些陷阱。

Rust 异步编程 简单示例:

// Cargo.toml
[dependencies]
futures = "0.3"

// main.rs
use futures::executor::block_on;

async fn hello_world() {
  println!("hello, world!");
}

fn main() {
  let future = hello_world(); // Nothing is printed
  block_on(future); // `future` is run and "hello, world!" is printed
}


// await 示例: 

async fn learn_song() -> Song { /* ... */ }
async fn sing_song(song: Song) { /* ... */ }
async fn dance() { /* ... */ }


// 这里使用 block_on 导致  learn_song  aing_song dance 的 串行执行, 因为block_on 将阻塞thread,直到 Future 执行完成
fn main() {
  let song = block_on(learn_song());
  block_on(sing_song(song));
  block_on(dance());
}



// 这里使用await 可以将 thread 让出,以便  Future f2 执行。 join 能够同时执行 两个 future
async fn learn_and_sing() {
  // Wait until the song has been learned before singing it.
  // We use `.await` here rather than `block_on` to prevent blocking the // thread, which makes it possible to `dance` at the same time.
  let song = learn_song().await;
  sing_song(song).await;
}


async fn async_main() {
  let f1 = learn_and_sing(); 
  let f2 = dance();
  // `join!` is like `.await` but can wait for multiple futures concurrently. 
  // If we're temporarily blocked in the `learn_and_sing` future, the `dance`
  // future will take over the current thread. If `dance` becomes blocked, 
  // `learn_and_sing` can take back over. If both futures are blocked, then 
  // `async_main` is blocked and will yield to the executor. 
  futures::join!(f1, f2);
}



fn main() { 
  block_on(async_main());
}

Future Trait: rust 异步编程的 核心点, Future 即是一个可以产生 value 的异步计算抽象。简单的 Future 可以如下:

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
    Ready(T),
    Pending,
}
pub struct SocketRead<'a> {
    socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data. //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data. self.socket.set_readable_callback(wake); Poll::Pending
        }
    }
}


use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};


pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>, // 需要使用锁, 跨 thread 更改变量
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    // The waker for the task that `TimerFuture` is running on.
    // The thread can use this after setting `completed = true` to tell
    // `TimerFuture`'s task to wake up, see that `completed = true`, and
    // move forward.
    waker: Option<Waker>,
}


impl Future for TimerFuture {
    type Output = ();
    // 需要注意 这里面 self参数类型为Pin<&mut Self> 以及cx 为Context
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}


impl TimerFuture {
    // Create a new `TimerFuture` which will complete after the provided
    // timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        // 这个 Future 实现的 比较奇怪,直接 使用另一个 thread 中sleep 进行 timer 的实现, 所以 在上面 TimerFuture 中的 Sharedstate 需要进行 Mutex 进行保护
        // 这也是后面提到 Runtime  需要提供 Timer的 重要原因吧
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}


// Executor 的实现:

/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
    /// In-progress future that should be pushed to completion.
    ///
    /// The `Mutex` is not necessary for correctness, since we only have
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread,
    /// so we need to use the `Mutex` to prove thread-safety. A production
    /// executor would not need this, and could use `UnsafeCell` instead.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once.
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}


impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}


impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // Create a `LocalWaker` from the task itself
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>` is a type alias for
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                if let Poll::Pending = future.as_mut().poll(context) {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future.
                    *future_slot = Some(future);
                }
            }
        }
    }
}



fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer.
    spawner.spawn(async {
        println!("howdy!");
        // Wait for our timer future to complete after two seconds.
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run.
    drop(spawner);

    // Run the executor until the task queue is empty.
    // This will print "howdy!", pause, and then print "done!".
    executor.run();
}

Executors and System IO:

Executor: 谁来调用 Future 的poll 方法? 答案是 Future Executor。 executor 调用一大堆 Futures 的poll方法 以便让Future 取得进展, 当 Future 能够 取得进一步进展时,通过调用wake 方法, 以便 executor 再次执行 Future。

System IO: 在上面的 SimpleFuture 代码中, 谁来执行 wake 方法呢? self.socket.set_readable_callback(wake) 又是如何触发的呢? 答案是 epoll 的IO多路复用,可以让我们 使用thread 对 socket文件进行 监听,循环检测 IO 事件。

Executors: 单线程 与 多线程

多线程执行程序可同时在多个任务上取得进展。 对于具有许多任务的工作负载,它可以极大地加快执行速度,但是在任务之间同步数据通常会更加昂贵。 在单线程和多线程运行时之间进行选择时,建议测量应用程序的性能。

任务可以在创建任务的线程上运行,也可以在单独的线程上运行。
异步运行时通常提供将任务生成到单独线程上的功能。 即使任务在单独的线程上执行,它们也应该是非阻塞的。 为了在多线程执行器上安排任务,它们也必须是Send。 一些运行时提供了 生成并发送任务 的功能,以确保每个任务都在生成它的线程上执行。 它们还可以提供用于将阻塞任务生成到专用线程上的功能,这对于从其他库运行阻塞同步代码很有用。

工具以及 trait:

Pin:

The Pin type wraps pointer types, guaranteeing that the values behind the pointer won’t be moved. For example, Pin<&mut T> , Pin<&T> , Pin<Box> all guarantee that T won't be moved if T: !Unpin . Most types don't have a problem being moved. These types implement a trait called Unpin . Pointers to Unpin types can be freely placed into or taken out of Pin . For example, u8 is Unpin , so Pin<&mut u8> behaves just like a normal &mut u8 . However, types that can't be moved after they're pinned have a marker called !Unpin . Futures created by async/await is an example of this.

Stream Trait: like Future 但是能够 在完成之前 传递多个数值 like Iterator,即: 返回对象为 Poll<Option<Self::Item> >。 stream 可以实现并行,函数有 for_each_concurrent, try_for_each_concurrent

多个Future 同时执行:

// book 与 music 串行执行, music 等待book 执行完 才能执行
async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}
// 错误的尝试,将 book 与 music 并行执行
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}
// 真正的并行执行
async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}


#![allow(unused)]
fn main() {
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}
}
#![allow(unused)]
fn main() {
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}
}

Async Blocks 编码存在的一些问题:

// 产生 编译错误
let fut = async {
    foo().await?;
    bar().await?;
    Ok(())
};

// 正确的解决方法
#![allow(unused)]
fn main() {
struct MyError;
async fn foo() -> Result<(), MyError> { Ok(()) }
async fn bar() -> Result<(), MyError> { Ok(()) }
let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- note the explicit type annotation here
};
}
// 通过编译
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

// 产生编译错误
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    let x = NotSend::default();
    bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
   require_send(foo());
}


// 正确的解决方法
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
   require_send(foo());
}

// This function:
async fn foo() {
    step_one().await;
    step_two().await;
}
// generates a type like this:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// So this function:
async fn recursive() {
    recursive().await;
    recursive().await;
}

// generates a type like this:  因为类型对象存在递归,导致 无法通过编译,需要通过 Box 进行封装 来进行规避
enum Recursive {
    First(Recursive),
    Second(Recursive),
}

// --------------------
// In order to allow this, we have to introduce an indirection using Box. Unfortunately, compiler limitations mean that just wrapping the calls to recursive() in Box::pin isn't enough. To make this work, we have to make recursive into a non-async function which returns a .boxed() async block:

use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}

Async 生态:

相关资料: