摘录与 Asynchronous Programming in Rust

一、Getting Started

1.1 Rust 的异步 vs 其他语言的

尽管很多语言都支持异步编程,但实现细节上有很多不一样。Rust的异步实现和大部分语言的在以下方面有区别:

  • RustFutures 是惰性的,并且只有被轮询才会进一步执行。丢弃(Dropping)一个future可以阻止它继续执行。
  • Rust中的异步是零成本的,这意味着你只需要为你所使用的东西付出代价。特别来说,你使用异步时可以不需要堆分配或动态分发,这对性能来说是好事!这也使得你能够在约束环境下使用异步,例如嵌入式系统。
  • Rust不提供内置运行时。相反,运行时由社区维护的库提供。
  • Rust单线程的和多线程的运行时都可用,而他们会有不同的优劣。

Demo

fn get_two_sites() {
    // 生成两个线程来下载网页.
    let thread_one = thread::spawn(|| download("https:://www.foo.com"));
    let thread_two = thread::spawn(|| download("https:://www.bar.com"));

    // 等待两个线程运行下载完成.
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

然而,下载网页是小任务,为了这么少量工作创建线程相当浪费。对更大的应用来说,这很容易就会变成瓶颈。在异步Rust,我们能够并发地运行这些任务而不需要额外的线程:

async fn get_two_sites_async() {
    // 创建两个不同的 "futures", 当创建完成之后将异步下载网页.
    let future_one = download_async("https:://www.foo.com");
    let future_two = download_async("https:://www.bar.com");

    // 同时运行两个 "futures" 直到完成.
    join!(future_one, future_two);
}

1.2 async/.await初步

async/.awaitRust内置语法,用于让异步函数编写得像同步代码。async将代码块转化成实现了Future trait的状态机。使用同步方法调用阻塞函数会阻塞整个线程,但阻塞Future只会让出(yield)线程控制权,让其他Future继续执行。

[dependencies]
futures = "0.3"

use futures::executor::block_on;

// 你可以使用async fn语法创建异步函数:
async fn hello_world() {
    println!("hello, world!");
}

fn main() {
    let future = hello_world(); // Nothing is printed
    // 需要执行器来执行这个
    block_on(future); //`future`is run and "hello, world!" is printed
}

Async Demo

async fn learn_and_sing() {
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

二、执行 Future 与任务(Task)

2.1 Future trait

  • Future traitRust Async编程的核心

  • Future是一种异步计算,它可以产生一个值

  • 实现了Future类型的表示目前可能还不可用的值

  • 下面是一个简化版的Future trait

      trait SimpleFuture {
          type Output;
          fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
      }
      
      enum Poll<T> {
          Ready(T),
          Pending,
      }
    
  • Future代表着一种你可以检验其是否完成的操作

  • Future可以通过调用poll函数来取得进展

    • poll函数会驱动Future尽可能接近完成
    • 如果Future完成了,就返回poll::Ready(result),其中result就是最终结果
    • 如果Future还无法完成:就返回poll::Pending,并当Future准备好去的更多进展时调用一个wakerwake()函数
  • 针对Future,你唯一做的就是使用poll来敲打它,知道一个值掉出来。

wake() 函数

  • wake()函数被调用时:
    • 执行器将驱动Future再次调用poll函数,以便Future能取得更多的进展
  • 没有wake()函数,执行器就不知道特定的Future何时能取得进展(就得不断地poll
  • 通过wake()函数,执行器就确切的知道哪些Future已经准备好进行poll()的调用

伪代码:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for`wake`to be called once data is available.
            // When data becomes available,`wake`will be called, and the
            // user of this`Future`will know to call`poll`again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

真正的Future代码

trait Future {
    type Output;
    fn poll(
        // Note the change from`&mut self`to`Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from`wake: fn()`to`cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}


/*
async fn read_from_file1() -> String {
    sleep(Duration::new(4,0));
    println!("{:?}", "Processing file 1");
    String::from("Hello, there from file 1")
}
*/
// Future
// use std::thread::sleep;
// use std::time::Duration;
// 等同 async fn read_from_file1() -> String 代码
use std::future::Future;
fn read_from_file1() -> impl Future<Output = String> { 
    async {
        sleep(Duration::new(4,0));
        println!("{:?}", "Processing file 1");
        String::from("Hello, there from file 1")
    }
}

Future Demo1

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
struct ReadFileFuture {}
impl Future for ReadFileFuture {
    type Output = String;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("Tokio! Stop polling me");
        cx.waker().wake_by_ref(); // first change
        // Poll::Pending // orig design
        Poll::Ready(String::from("Hello, there from file 1")) // 2nd change
        // why the Poll::Ready cannot print in stdout?
    }
}
#[tokio::main]
async fn main() {
    println!("Hello before reading file!");
    let h1 = tokio::spawn(async {
        let future1 = ReadFileFuture {};
        future1.await
    });
    let h2 = tokio::spawn(async {
        let file2_contents = read_from_file2().await;
        println!("{:?}", file2_contents);
    });
    let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(2,0));
        println!("{:?}", "Processing file 2");
        String::from("Hello, there from file 2")
    }
}

Future Demo2

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::{Duration, Instant};

struct AsyncTimer { expiration_time: Instant, }

impl Future for AsyncTimer {
    type Output = String;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.expiration_time {
            println!("Hello, it's time for Future 1");
            Poll::Ready(String::from("Future 1 has completed"))
        } else {
            println!("Hello, it's not yet time for Future 1, Going to sleep");
            let waker = cx.waker().clone();
            let expiration_time = self.expiration_time;
            std::thread::spawn(move || {
                println!("start new thread with sleep blocking");
                let curr_time = Instant::now();
                if curr_time < expiration_time {
                    std::thread::sleep(expiration_time - curr_time);
                }
                println!("end sleep and wake.wake()");
                waker.wake();
                println!("end wake.wake()");
            });
            println!("end new thread and return Poll::Pending");
            Poll::Pending
        }
    }
}
#[tokio::main]
async fn main() {
    println!("start tokio::main");
    let h1 = tokio::spawn(async {
        println!("start future1 handle");
        // let future1 = AsyncTimer{ expiration_time: Instant::now() + Duration::from_millis(4000), };
        let future1 = AsyncTimer{ expiration_time: Instant::now() + Duration::from_secs(4), };
        println!("generate future1 handle, then run future1.await");
        println!("{:?}", future1.await);
        println!("after future1.await");
    });

    let h2 = tokio::spawn(async {
        println!("start future2 handle");
        let file2_contents = read_from_file2().await;
        println!("{:?}", file2_contents);
        println!("end future2 handle");
    });

    let _ = tokio::join!(h1, h2);
}

fn read_from_file2() -> impl Future<Output = String> {
    async {
        sleep(Duration::new(4,0));
        String::from("Future 2 has completed")
    }
}

2.2. 用 Waker 唤醒任务

  • Future在第一次poll的时候通常无法完成任务,所以Future需要保证在准备好去的更多进展后,可以再次被poll
  • 每次Futurepoll,它都是作为一个任务的一部分
  • 任务(Task)就是被提交给执行者顶层的Future

2.3 应用:构建执行器

RustFuture是惰性的:它们不会干任何事,除非它们被驱动执行。一个驱动future类型的方法是在async函数中使用.await调用,但这只是将问题抛到上一层:谁来跑在顶层async函数返回的future实例呢?为此,我们需要执行Future的执行器。

Future执行器会拿一组顶层Future去跑poll方法,无论这些Future能否进展。通常, 执行器会poll一个future实例来启动。当Future通过调用wake()方法来指示他们准备好继续进展,执行器就会把它们放入队列并再一次poll,重复这一过程直到Future完成。

src/main.rs

#![allow(unused)]
use {
    futures::{
        future::{BoxFuture, FutureExt},
        task::{waker_ref, ArcWake},
    },
    std::{
        future::Future,
        sync::mpsc::{sync_channel, Receiver, SyncSender},
        sync::{Arc, Mutex},
        task::{Context, Poll},
        time::Duration,
    },
    // // 引入之前实现的定时器模块
    // async_tokio::TimerFuture,
};

/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

///`Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// 添加一个方法用于生成 Future , 然后将它放入任务通道中
impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("任务队列已满");
    }
}

/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
    /// 进行中的Future,在未来的某个时间点会被完成
    ///
    /// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
    /// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
    /// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
    ///
    /// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 可以将该任务自身放回到任务通道中,等待执行器的poll
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 任务通道允许的最大缓冲数(任务队列的最大长度)
    // 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("任务队列已满");
    }
}

impl Executor {
    fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // 基于任务自身创建一个`LocalWaker`
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                //`BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
                // 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
                if future.as_mut().poll(context).is_pending() {
                    // Future还没执行完,因此将它放回任务中,等待下次被poll
                    *future_slot = Some(future);
                }
            }
        }
    }
}


use timer_future::TimerFuture;

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    // 生成一个任务
    spawner.spawn(async {
        println!("howdy!");
        // 创建定时器Future,并等待它完成
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("done!");
    });

    // drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
    drop(spawner);

    // 运行执行器直到任务队列为空
    // 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印`done!`
    executor.run();
}

src/lib.rs

 #![allow(unused)]
use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// 在`future`线程和`awiting`线程之间共享状态
struct SharedState {
    /// 是否已经达到休眠时间.
    completed: bool,

    ///`TimerFuture`表示正在运行的`waker`.
    /// 线程可以在设置完`completed = true`之后来通知`TimerFuture`任务被唤醒并
    /// 检查`completed = true`,然后继续执行.
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // 检查共享状态,检查定时器是否已经完成.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // 设置`waker`, 让线程可以在定时器完成时唤醒当前`waker`,确保
            // 再次轮询`future`并获知`completed = true`.
            //
            // 这样做是非常不错的,而不用每次都重复`clone``waker`. 然而, 这个`TimerFuture`
            // 可以在执行器之间移动, 这可能会导致旧的`waker`指向错误的`waker`, 这会阻止
            //`TimerFuture`被正确得唤醒.
            //
            // 注意:可以使用`Waker::will_wake`函数来做检查, 但是
            // 为了简单起见,我们忽略了他.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    /// 创建一个新的`TimerFuture`,它将在提供的超时之后完成.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // 创建一个新的线程.
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // 设置状态,表示定时器已经完成,并唤醒轮询`future`中的最后一个
            // 任务 (如果存在的话).
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

Step 1

Step 2

Step 3

Step 4

三、async/.await

async/.awaitRust的特殊语法,在发生阻塞的时,它让放弃当前线程控制权成为可能,这就允许在等待操作完成的时候,允许其他代码取得进展。

使用async的两种方式

// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

Demo

[dependencies]
async-std = { version = "1.9.0", features = ["unstable"] }

use async_std::io::prelude::*;
use async_std::net;
use async_std::task;

// async 申明函数
// Rust 编译器会自动把 std::io::Result<String> 包装成一个 Feature<OutPut=T> 返回
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
    // .await 会等待,直到 future 变成 ready
    // await 最终会解析出 future 的值
    let mut socket = net::TcpStream::connect((host, port)).await?;
    let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\n", path, host);

    socket.write_all(request.as_bytes()).await?;
    socket.shutdown(net::Shutdown::Write)?;

    let mut response = String::new();
    socket.read_to_string(&mut response).await?;

    Ok(response)
}


fn main() -> std::io::Result<()> {
    let response = task::block_on(cheapo_request("baidu.com", 80, "/"))?;
    println!("{}", response);
    Ok(())
}

第一次对cheapo_request进行poll时:

  • 从函数体顶部开始执行
  • 直到第一个await(针对TcpStream::connect返回的Future)
  • 如果TcpStream::connect还没完成,就会返回Pending
  • 针对cheapo_requestpoll也无法继续,直到connectFuture返回ready

await:

  • 获得Future的所有权,并对其进行poll
  • 如果Future Ready,其最终值就是await表达式的值,这时执行就可以继续了
  • 否则就返回Pending给调用者

第二次对cheapo_requestFuture进行poll

  • 并不在函数体顶部开始执行
  • 它会在connect Future进行poll的地方继续执行,直到它变成Ready,才会继续在函数体往下下走

随着cheapo_requestFuture不断被poll,其执行就是从一个await到下一个await,而且只有子Futureawait变成Ready之后才继续.

cheapo_requestFuture会追踪:

  • 下一次poll应恢复继续的那个点
  • 以及所需的本地状态(变量、参数、临时变量等)

这种途中能暂停执行,然后恢复执行的能力是async所独有的由于await表达式依赖于“可恢复执行”这个特性,所以await只能用在async里。

async 的生命周期

  • 与传统函数不同:async fn,如果它的参数是引用或是其他非'static的,那么它返回的Future就会绑定到参数的生命周期上。
  • 这意味着async fn返回的future,在.await的同时,fn的非'static的参数必须保持有效

Demo

// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

存储future或传递future

  • 通常,async的函数在调用后会立即.await,这就不是问题:
    • 例如:foo(&x).await
  • 如果存储future或将其传递给其他任务或者线程,就有问题了。。
  • 一种变通方法:
    • 思路:把使用引用作为参数的async fn转为一个'static future
    • 做法:在async块里,将参数和async fn的调用捆绑到一起(延长参数的生命周期来匹配future
  • async库和闭包都支持move
  • async move块会获得其引用变量的所有权:
    • 允许其比当前所在的作用域活得长
    • 但同时也放弃了与其它代码共享的这些变量的能力

Demo

fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR:`x`does not live long enough
}

fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

在多线程执行者上进行.await

  • 当使用多线程future执行者时,future就可以在线程间移:
    • 所以async体里面用的变量必须能够在线程间移动
    • 因为任何的.await都可能导致切换到一个新的线程
  • 这意味着使用以下类型是不安全的:
    • Rc&RefCell和任何其它没有实现Send trait的类型,包括没实现Sync trait的引用
    • 注意:调用.await时,只要这些类型不在作用域内,就可以使用他们。
  • 在跨域一个.await期间,持有传统的、对future无感知的锁,也不是好主意:
    • 可导致线程池锁定
    • 为此,可使用futures::lock里的Mutex而不是std:sync里的

四、Pinning

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
        }
    }

    fn init(&mut self) {
        let self_ref: *const String = &self.a;
        self.b = self_ref;
    }

    fn a(&self) -> &str {
        &self.a
    }

    fn b(&self) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}



fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();
    std::mem::swap(&mut test1, &mut test2);

    println!("a: {}, b: {}", test1.a(), test1.b());
    println!("a: {}, b: {}", test2.a(), test2.b());

}

image.png

Pin的实践

  • Pin类型会包裹指针类型,保证指针指向的值不被移动
  • 例如:Pin<&mut T>,Pin<&T>,Pin<Box<T>>
    • 即使T:!Unpin,也能保证T不被移动

Unpin trait

  • 大多数类型如果被移动,不会造成问题,它们实现了Unpin
  • 指向Unpin类型的指针,可自由的放入或从Pin中取出
    • 例如:u8Unpin的,Pin<&mut u8>和普通的&mut u8一样。
  • 如果类型拥有!Unpin标记,那么在Pin之后它们就无法移动了。

Pin到栈内存的Demo

use std::marker::PhantomPinned;
use std::pin::Pin;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned,
        }
    }

    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        unsafe { &*self.b }
    }
}

fn main() {
    println!("Hello, world!");

    let mut test1 = Test::new("test1");
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());
    std::mem::swap(&mut test1, &mut test2);

    println!(
        "test1: {}, {}",
        Test::a(test1.as_ref()),
        Test::b(test1.as_ref())
    );
    println!(
        "test2: {}, {}",
        Test::a(test2.as_ref()),
        Test::b(test2.as_ref())
    );
    // std::mem::swap(test1.get_mut(), test2.get_mut());
}

Pin到堆内存的Demo

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Box<Self> {
        Box::new(Test {
            a: String::from(txt),
            b: std::ptr::null(),
        })
    }

    fn init(self: &mut Self) {
        let self_ptr: *const String = &self.a;
        self.b = self_ptr;
    }

    fn a(self: &Self) -> &str {
        &self.a
    }

    fn b(self: &Self) -> &String {
        unsafe { &*self.b }
    }
}

fn main() {
    println!("Hello, world!");

    let mut test1 = Test::new("test1");
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    Test::init(test2.as_mut());

    println!("test1: {}, {}", test1.a(), test1.b());
    println!("test2: {}, {}", test2.a(), test2.b());

    std::mem::swap(&mut test1, &mut test2);
    println!("test1: {}, {}", test1.a(), test1.b());
    println!("test2: {}, {}", test2.a(), test2.b());
}

Demo 2

use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Pin<Box<Self>> {
        let t = Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned,
        };
        let mut boxed = Box::pin(t);
        let self_ptr: *const String = &boxed.a;
        unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };

        boxed
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        unsafe { &*(self.b) }
    }
}

pub fn main() {
    let test1 = Test::new("test1");
    let test2 = Test::new("test2");

    println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
    println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}

总结

  1. 如果T: Unpin(默认会实现),那么Pin<'a, T>完全等价于&'a mut T。换言之:Unpin意味着这个类型被移走也没关系,就算已经被固定了,所以Pin对这样的类型毫无影响。

  2. 如果T: !Unpin, 获取已经被固定的 T 类型示例的&mut T需要 unsafe。

  3. 标准库中的大部分类型实现Unpin,在Rust中遇到的多数“平常”的类型也是一样。但是,async/await生成的Future是个例外。

  4. 你可以在nightly通过特性标记来给类型添加!Unpin约束,或者在stable给你的类型加std::marker::PhatomPinned字段。

  5. 你可以将数据固定到栈上或堆上

  6. 固定!Unpin对象到栈上需要unsafe

  7. 固定!Unpin对象到堆上不需要unsafeBox::pin可以快速完成这种固定。

  8. 针对已经Pin的数据,如果它是T:!Unpin的,则需要保证它从被Pin后,内存一直有效且不会调整其用途,直到dorp被调用,这是 Pin 协约 中的重要部分。

五、Streams

Stream traitFuture类似,但能在完成前返还(yield)多个值,与标准库中的Iterator类似:

trait Stream {
    /// 由`stream`产生的值的类型.
    type Item;

    /// 尝试解析`stream`中的下一项.
    /// 如果已经准备好,就重新运行`Poll::Pending`, 如果已经完成,就重新
    /// 运行`Poll::Ready(Some(x))`,如果已经完成,就重新运行`Poll::Ready(None)`.
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

一个常见的使用Stream的例子是futures库中通道的Receiver。每次Sender端发送一个值时,它就会返回一个Some(val),并且会在Sender关闭且所有消息都接收后返还None:

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    //`StreamExt::next`类似于`Iterator::next`, 但会返回一个实现
    // 了`Future<Output = Option<T>>`的类型.
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // 对于`next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // 对于`try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}


async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // 对于`try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

六、同时执行多个 Future

可以同时执行多个异步操作的方式:

  • join!,等待所有的future完成
  • select!,等待所有的future中的一个完成
  • Spawning,创建一个顶级任务,他会运行一个future直至完成
  • FuturesUnoredered,一组Future,他们会产生每个子Future的结果

join!

use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}

try_join!

对于那些返回Resultfuture,考虑使用try_join!而非join。因为join只会在所有子future都完成后才会完成,它甚至会在子future返回Err之后继续处理。

use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

use futures::{
    future::TryFutureExt,
    try_join,
};

// 注意,传进 try_join! 的 future 必须要用相同的错误类型。考虑使用 futures::future::TryFutureExt 库的 .map_err(|e| ...) 或 err_into() 函数来统一错误类型:
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

select!

use futures::{
    future::FutureExt, // 为了`.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

// select 也支持 default 和 complete 分支。
// default : 如果选择的 future 尚未完成,就会允许 default 分之, 拥有 default 的 select 总是会立即返回
// complete:它用于所有选择的 future 都已经完成的情况
use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // 永远不会被执行(futures都准备好了,然后complete分支被执行)
        };
    }
    assert_eq!(total, 10);
}

与 Unpin 和 FusedFuture 交互

  • 前面的例子中,需要在返回的future上调用.fuse(),也调用了pin_mut
    • 因为select里面的future必须实现UnpinFusedFuture这两个trait
  • 必须Unpin:select使用的future不是按值的,而是按可变引用。
    • 未完成的future在调用select后仍可使用
  • 必须FusedFuture:future完成后,select不可以对它进行poll
    • 实现FusedFuturefuture会追踪其完成状态,这样在select循环里,就只会poll没有完成的future

Demo1

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}

带有 Fuse 和 FuturesUnordered 的 select 循环中的并发任务

有个不太好找但是很趁手的函数叫Fuse::terminated()。这个函数允许构造已经被终止的空future,并且能够在之后填进需要运行的future

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // 计时器已经完成了.
                // 如果没有`get_new_num_fut`正在执行的话,就启动一个新的.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // 一个新的数字到达了
                // 启动一个新的`run_on_new_num_fut`并且扔掉旧的.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // 执行`run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // 当所有都完成时panic,
            // 因为理论上`interval_timer`会不断地产生值.
            complete => panic!("`interval_timer`completed unexpectedly"),
        }
    }
}

当有很多份相同future的拷贝同时执行时,使用FutureUnordered类型。下面的例子和上面的例子很类似,但会运行run_on_new_num_fut的所有拷贝都到完成状态,而不是当一个新拷贝创建时就中断他们。它也会打印run_on_new_num_fut的返回值:

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// 用从`get_new_num`获取的最新的数字运行`run_on_new_num`.
//
// 每当定时器到期后,都会重新执行`get_new_num`,
// 并立即取消正在执行的`run_on_new_num`,随后用新返回值替换`run_on_new_num`.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // 计时器已经完成了.
                // 如果没有`get_new_num_fut`正在执行的话,就启动一个新的.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // 一个新的数字到达了,启动一个新的`run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // 执行`run_on_new_num_futs`并检查有没有完成的.
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // 当所有都完成时panic,
            // 因为理论上`interval_timer`会不断地产生值.
            complete => panic!("`interval_timer`completed unexpectedly"),
        }
    }
}

七、掌握并喜爱的解决方法

// ? in async Blocks

fn main() -> std::io::Result<()> {
    struct MyError;
    async fn foo() -> Result<(), MyError> {
        Ok(())
    }
    async fn bar() -> Result<(), MyError> {
        Ok(())
    }

    let fut = async {
        foo().await?;
        bar().await?;
        Ok(()) // 报错,cannot infer type of the type parameter`E`declared on the enum`Result`
        // Ok::<(), MyError>(()) // <- note the explicit type annotation here
    };


    Ok(())
}

// Send 模拟

#[derive(Default)]
struct NotSend(Rc<()>);

async fn bar() {}
async fn foo() {
    // let x = NotSend::default(); // 报错,has type`NotSend`which is not`Send`
    {
        let x = NotSend::default();
    }
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}
    
// 递归
async fn recursive() {
    recursive().await; // 报错 recursion in an`async fn`requires boxing
    recursive().await;
}

// 解决方法
use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}

trait 中的 async

目前,async fn不能在trait中使用。原因一些复杂,但是有计划在未来移除这个限制。

这个问题可以用 async-trait 库来避免

八、Async 生态系统

Rust 没提供什么

  • Rust目前只提供编写async代码的基本要素,标准库中尚未提供执行器、任务、反应器、组合器以及低级I/O futuretrait
  • 社区提供的async生态系统填补了这些空白

Async 运行时

  • Async运行时是用于执行async应用程序的库
  • 运行时通常将一个反应器与一个或多个执行器捆绑在一起
  • 反应器为异步I/O、进程间通信和计时器等外部事件提供订阅机制
  • async运行时中,订阅者通常是代表低级别I/O操作的future
  • 执行者负责安排和执行任务。
    • 它们跟踪正在运行和暂停的任务,对future进行poll直到完成,并在任务能够取得进展时唤醒任务
    • 执行者”一词经常与“运行时”互换使用。
  • 我们使用“生态系统”一词来描述一个与兼容trait和特性捆绑在一起的运行时。

社区提供的 async crates

  • futures crate,提供了StreamSinkAsyncReadAsyncWritetrait,以及组合器等工具。这些可能最终会成为标准库的一部分
  • futures有自己的执行器,但没有自己的反应器,因此它不支持async I/O或计时器future的执行。
  • 因此,它不被认为是完整的运行时。
  • 常见的选择是:与另一个crate中的执行器一起使用来自futures提供的工具

流行的运行时

  • Tokio:一个流行的async生态系统,包含HTTPgRPC和跟踪框架
  • async-std:提供标准库的async副本
  • smol:小型、简化的async运行时。提供可用于包装UnixStreamTcpListener等结构的async trait
  • fuchsia-async: 用于Fuchsia OS的执行器

确定生态兼容性

  • async I/O、计时器、进程间通信或任务交互的async代码通常取决于特定的异步执行器或反应器
  • 所有其他async代码,如异步表达式、组合器、同步类型和流,通常与生态系统无关,前提是任何嵌套的future也与生态系统无关
  • 在开始一个项目之前,建议研究相关的async框架和库,以确保与您选择的运行时以及彼此之间的兼容性。

九、并发 Web 服务器

非并发Demo

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    // 在端口7878侦听传入链接
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    // 一直阻塞,处理到达这个IP地址的每一个请求
    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // 从流中读取前1024字节的数据
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    // 根据请求的数据决定响应问候还是404.
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    // 将响应写回流并刷新(flush)以确保响应被发送回客户端.
    let response = format!("{}{}", status_line, contents);
    stream.write(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

异步Demo

[dependencies]
futures = "0.3"

[dependencies.async-std]
version = "1.6"
features = ["attributes"]

src/main.rs

use async_std;
use async_std::io::ReadExt;
use async_std::net::{TcpListener };
use async_std::task::spawn;
use async_web_server::*;
use futures::{AsyncWriteExt, StreamExt};
use std::fs;
use std::time::Duration;
use async_std::io::Read;
use async_std::io::Write;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener
        .incoming()
        .for_each_concurrent(None, |tcp_stream| async move {
            let stream = tcp_stream.unwrap();
            spawn(handle_connection(stream));
        })
        .await;

    print!("Shutting down");
}

async fn handle_connection(mut stream: impl Read + Write + Unpin) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let mut file_type = HtmlFile::NotFound;

    if buffer.starts_with(GET) {
        file_type = HtmlFile::OK
    } else if buffer.starts_with(SLEEP) {
        async_std::task::sleep(Duration::from_secs(5)).await;
        file_type = HtmlFile::SLEEP
    }

    build_content(file_type, stream).await;
}

async fn build_content(file: HtmlFile, mut stream: impl Read + Write + Unpin) {
    let (path, status_line) = match file {
        HtmlFile::OK => ("hello.html", "HTTP/1.1 200 OK"),
        HtmlFile::SLEEP => ("hello.html", "HTTP/1.1 200 OK"),
        _ => ("404.html", "HTTP/1.1 404 NOT FOUND"),
    };

    let contents = fs::read_to_string(path).unwrap();
    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

#[async_std::test]
async fn test_handle_connection() {
    let input_bytes = b"GET / HTTP/1.1\r\n";
    let mut contents = vec![0u8; 1024];
    contents[..input_bytes.len()].clone_from_slice(input_bytes);
    let mut stream = MockTcpStream {
        read_data: contents,
        write_data: Vec::new(),
    };

    handle_connection(&mut stream).await;
    let mut buf = [0u8; 1024];
    stream.read(&mut buf).await.unwrap();

    let expected_contents = fs::read_to_string("hello.html").unwrap();
    let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
    assert!(stream.write_data.starts_with(expected_response.as_bytes()));
}

文件src/lib.rs

use async_std::io::{Read, Write};
use futures::task::{Context, Poll};

use std::cmp::min;
use std::pin::Pin;

pub const GET: &[u8] = b"GET / HTTP/1.1\r\n";
pub const SLEEP: &[u8] = b"GET /sleep HTTP/1.1\r\n";

pub enum HtmlFile {
    OK,
    SLEEP,
    NotFound,
}

pub struct MockTcpStream {
    pub read_data: Vec<u8>,
    pub write_data: Vec<u8>,
}

impl Read for MockTcpStream {
    fn poll_read(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<std::io::Result<usize>> {
        let size: usize = min(self.read_data.len(), buf.len());
        buf[..size].copy_from_slice(&self.read_data[..size]);
        Poll::Ready(Ok(size))
    }
}

impl Write for MockTcpStream {
    fn poll_write(
        mut self: Pin<&mut Self>,
        _: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        self.write_data = Vec::from(buf);
        Poll::Ready(Ok(buf.len()))
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

impl Unpin for MockTcpStream {}