摘录与 Asynchronous Programming in Rust
一、Getting Started
1.1 Rust 的异步 vs 其他语言的
尽管很多语言都支持异步编程,但实现细节上有很多不一样。Rust
的异步实现和大部分语言的在以下方面有区别:
Rust
中 Futures 是惰性的,并且只有被轮询才会进一步执行。丢弃(Dropping
)一个future
可以阻止它继续执行。Rust
中的异步是零成本的,这意味着你只需要为你所使用的东西付出代价。特别来说,你使用异步时可以不需要堆分配或动态分发,这对性能来说是好事!这也使得你能够在约束环境下使用异步,例如嵌入式系统。Rust
不提供内置运行时。相反,运行时由社区维护的库提供。Rust
里单线程的和多线程的运行时都可用,而他们会有不同的优劣。
Demo
fn get_two_sites() {
// 生成两个线程来下载网页.
let thread_one = thread::spawn(|| download("https:://www.foo.com"));
let thread_two = thread::spawn(|| download("https:://www.bar.com"));
// 等待两个线程运行下载完成.
thread_one.join().expect("thread one panicked");
thread_two.join().expect("thread two panicked");
}
然而,下载网页是小任务,为了这么少量工作创建线程相当浪费。对更大的应用来说,这很容易就会变成瓶颈。在异步Rust
,我们能够并发地运行这些任务而不需要额外的线程:
async fn get_two_sites_async() {
// 创建两个不同的 "futures", 当创建完成之后将异步下载网页.
let future_one = download_async("https:://www.foo.com");
let future_two = download_async("https:://www.bar.com");
// 同时运行两个 "futures" 直到完成.
join!(future_one, future_two);
}
1.2 async/.await初步
async/.await
是Rust
内置语法,用于让异步函数编写得像同步代码。async
将代码块转化成实现了Future trait
的状态机。使用同步方法调用阻塞函数会阻塞整个线程,但阻塞Future
只会让出(yield
)线程控制权,让其他Future
继续执行。
[dependencies]
futures = "0.3"
use futures::executor::block_on;
// 你可以使用async fn语法创建异步函数:
async fn hello_world() {
println!("hello, world!");
}
fn main() {
let future = hello_world(); // Nothing is printed
// 需要执行器来执行这个
block_on(future); //`future`is run and "hello, world!" is printed
}
Async Demo
async fn learn_and_sing() {
let song = learn_song().await;
sing_song(song).await;
}
async fn async_main() {
let f1 = learn_and_sing();
let f2 = dance();
futures::join!(f1, f2);
}
fn main() {
block_on(async_main());
}
二、执行 Future 与任务(Task)
2.1 Future trait
Future trait
是Rust Async
编程的核心Future
是一种异步计算,它可以产生一个值实现了
Future
类型的表示目前可能还不可用的值下面是一个简化版的
Future trait
trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, }
Future
代表着一种你可以检验其是否完成的操作Future
可以通过调用poll
函数来取得进展poll
函数会驱动Future
尽可能接近完成- 如果
Future
完成了,就返回poll::Ready(result)
,其中result
就是最终结果 - 如果
Future
还无法完成:就返回poll::Pending
,并当Future
准备好去的更多进展时调用一个waker
的wake()
函数
针对
Future
,你唯一做的就是使用poll
来敲打它,知道一个值掉出来。
wake() 函数
- 当
wake()
函数被调用时:- 执行器将驱动
Future
再次调用poll
函数,以便Future
能取得更多的进展
- 执行器将驱动
- 没有
wake()
函数,执行器就不知道特定的Future
何时能取得进展(就得不断地poll
) - 通过
wake()
函数,执行器就确切的知道哪些Future
已经准备好进行poll()
的调用
伪代码:
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it.
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data.
//
// Arrange for`wake`to be called once data is available.
// When data becomes available,`wake`will be called, and the
// user of this`Future`will know to call`poll`again and
// receive data.
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
真正的Future代码
trait Future {
type Output;
fn poll(
// Note the change from`&mut self`to`Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from`wake: fn()`to`cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
/*
async fn read_from_file1() -> String {
sleep(Duration::new(4,0));
println!("{:?}", "Processing file 1");
String::from("Hello, there from file 1")
}
*/
// Future
// use std::thread::sleep;
// use std::time::Duration;
// 等同 async fn read_from_file1() -> String 代码
use std::future::Future;
fn read_from_file1() -> impl Future<Output = String> {
async {
sleep(Duration::new(4,0));
println!("{:?}", "Processing file 1");
String::from("Hello, there from file 1")
}
}
Future Demo1
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
struct ReadFileFuture {}
impl Future for ReadFileFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Tokio! Stop polling me");
cx.waker().wake_by_ref(); // first change
// Poll::Pending // orig design
Poll::Ready(String::from("Hello, there from file 1")) // 2nd change
// why the Poll::Ready cannot print in stdout?
}
}
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let future1 = ReadFileFuture {};
future1.await
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2,0));
println!("{:?}", "Processing file 2");
String::from("Hello, there from file 2")
}
}
Future Demo2
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::{Duration, Instant};
struct AsyncTimer { expiration_time: Instant, }
impl Future for AsyncTimer {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.expiration_time {
println!("Hello, it's time for Future 1");
Poll::Ready(String::from("Future 1 has completed"))
} else {
println!("Hello, it's not yet time for Future 1, Going to sleep");
let waker = cx.waker().clone();
let expiration_time = self.expiration_time;
std::thread::spawn(move || {
println!("start new thread with sleep blocking");
let curr_time = Instant::now();
if curr_time < expiration_time {
std::thread::sleep(expiration_time - curr_time);
}
println!("end sleep and wake.wake()");
waker.wake();
println!("end wake.wake()");
});
println!("end new thread and return Poll::Pending");
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
println!("start tokio::main");
let h1 = tokio::spawn(async {
println!("start future1 handle");
// let future1 = AsyncTimer{ expiration_time: Instant::now() + Duration::from_millis(4000), };
let future1 = AsyncTimer{ expiration_time: Instant::now() + Duration::from_secs(4), };
println!("generate future1 handle, then run future1.await");
println!("{:?}", future1.await);
println!("after future1.await");
});
let h2 = tokio::spawn(async {
println!("start future2 handle");
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
println!("end future2 handle");
});
let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(4,0));
String::from("Future 2 has completed")
}
}
2.2. 用 Waker 唤醒任务
Future
在第一次poll
的时候通常无法完成任务,所以Future
需要保证在准备好去的更多进展后,可以再次被poll
- 每次
Future
被poll
,它都是作为一个任务的一部分 - 任务(
Task
)就是被提交给执行者顶层的Future
2.3 应用:构建执行器
Rust
的Future
是惰性的:它们不会干任何事,除非它们被驱动执行。一个驱动future
类型的方法是在async
函数中使用.await
调用,但这只是将问题抛到上一层:谁来跑在顶层async
函数返回的future
实例呢?为此,我们需要执行Future
的执行器。
Future
执行器会拿一组顶层Future
去跑poll
方法,无论这些Future
能否进展。通常, 执行器会poll
一个future
实例来启动。当Future
通过调用wake()
方法来指示他们准备好继续进展,执行器就会把它们放入队列并再一次poll
,重复这一过程直到Future
完成。
src/main.rs
#![allow(unused)]
use {
futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
},
std::{
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
},
// // 引入之前实现的定时器模块
// async_tokio::TimerFuture,
};
/// 任务执行器,负责从通道中接收任务然后执行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
///`Spawner`负责创建新的`Future`然后将它发送到任务通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// 添加一个方法用于生成 Future , 然后将它放入任务通道中
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("任务队列已满");
}
}
/// 一个Future,它可以调度自己(将自己放入任务通道中),然后等待执行器去`poll`
struct Task {
/// 进行中的Future,在未来的某个时间点会被完成
///
/// 按理来说`Mutex`在这里是多余的,因为我们只有一个线程来执行任务。但是由于
/// Rust并不聪明,它无法知道`Future`只会在一个线程内被修改,并不会被跨线程修改。因此
/// 我们需要使用`Mutex`来满足这个笨笨的编译器对线程安全的执着。
///
/// 如果是生产级的执行器实现,不会使用`Mutex`,因为会带来性能上的开销,取而代之的是使用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// 可以将该任务自身放回到任务通道中,等待执行器的poll
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// 任务通道允许的最大缓冲数(任务队列的最大长度)
// 当前的实现仅仅是为了简单,在实际的执行中,并不会这么使用
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 通过发送任务到任务管道的方式来实现`wake`,这样`wake`后,任务就能被执行器`poll`
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("任务队列已满");
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// 获取一个future,若它还没有完成(仍然是Some,不是None),则对它进行一次poll并尝试完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 基于任务自身创建一个`LocalWaker`
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
//`BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的类型别名
// 通过调用`as_mut`方法,可以将上面的类型转换成`Pin<&mut dyn Future + Send + 'static>`
if future.as_mut().poll(context).is_pending() {
// Future还没执行完,因此将它放回任务中,等待下次被poll
*future_slot = Some(future);
}
}
}
}
}
use timer_future::TimerFuture;
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 生成一个任务
spawner.spawn(async {
println!("howdy!");
// 创建定时器Future,并等待它完成
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// drop掉任务,这样执行器就知道任务已经完成,不会再有新的任务进来
drop(spawner);
// 运行执行器直到任务队列为空
// 任务运行后,会先打印`howdy!`, 暂停2秒,接着打印`done!`
executor.run();
}
src/lib.rs
#![allow(unused)]
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// 在`future`线程和`awiting`线程之间共享状态
struct SharedState {
/// 是否已经达到休眠时间.
completed: bool,
///`TimerFuture`表示正在运行的`waker`.
/// 线程可以在设置完`completed = true`之后来通知`TimerFuture`任务被唤醒并
/// 检查`completed = true`,然后继续执行.
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 检查共享状态,检查定时器是否已经完成.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// 设置`waker`, 让线程可以在定时器完成时唤醒当前`waker`,确保
// 再次轮询`future`并获知`completed = true`.
//
// 这样做是非常不错的,而不用每次都重复`clone``waker`. 然而, 这个`TimerFuture`
// 可以在执行器之间移动, 这可能会导致旧的`waker`指向错误的`waker`, 这会阻止
//`TimerFuture`被正确得唤醒.
//
// 注意:可以使用`Waker::will_wake`函数来做检查, 但是
// 为了简单起见,我们忽略了他.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
/// 创建一个新的`TimerFuture`,它将在提供的超时之后完成.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 创建一个新的线程.
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// 设置状态,表示定时器已经完成,并唤醒轮询`future`中的最后一个
// 任务 (如果存在的话).
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
三、async/.await
async/.await
是Rust
的特殊语法,在发生阻塞的时,它让放弃当前线程控制权成为可能,这就允许在等待操作完成的时候,允许其他代码取得进展。
使用async
的两种方式
// This function:
async fn foo(x: &u8) -> u8 { *x }
// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
Demo
[dependencies]
async-std = { version = "1.9.0", features = ["unstable"] }
use async_std::io::prelude::*;
use async_std::net;
use async_std::task;
// async 申明函数
// Rust 编译器会自动把 std::io::Result<String> 包装成一个 Feature<OutPut=T> 返回
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
// .await 会等待,直到 future 变成 ready
// await 最终会解析出 future 的值
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
fn main() -> std::io::Result<()> {
let response = task::block_on(cheapo_request("baidu.com", 80, "/"))?;
println!("{}", response);
Ok(())
}
第一次对cheapo_request
进行poll
时:
- 从函数体顶部开始执行
- 直到第一个
await
(针对TcpStream::connect
返回的Future
) - 如果
TcpStream::connect
还没完成,就会返回Pending
- 针对
cheapo_request
的poll
也无法继续,直到connect
的Future
返回ready
await
:
- 获得
Future
的所有权,并对其进行poll
- 如果
Future Ready
,其最终值就是await
表达式的值,这时执行就可以继续了 - 否则就返回
Pending
给调用者
第二次对cheapo_request
的Future
进行poll
时
- 并不在函数体顶部开始执行
- 它会在
connect Future
进行poll
的地方继续执行,直到它变成Ready
,才会继续在函数体往下下走
随着cheapo_request
的Future
不断被poll
,其执行就是从一个await
到下一个await
,而且只有子Future
的await
变成Ready
之后才继续.
cheapo_request
的Future
会追踪:
- 下一次
poll
应恢复继续的那个点 - 以及所需的本地状态(变量、参数、临时变量等)
这种途中能暂停执行,然后恢复执行的能力是async
所独有的由于await
表达式依赖于“可恢复执行”这个特性,所以await
只能用在async
里。
async 的生命周期
- 与传统函数不同:
async fn
,如果它的参数是引用或是其他非'static
的,那么它返回的Future
就会绑定到参数的生命周期上。 - 这意味着
async fn
返回的future
,在.await
的同时,fn
的非'static
的参数必须保持有效
Demo
// This function:
async fn foo(x: &u8) -> u8 { *x }
// Is equivalent to this function:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
存储future
或传递future
- 通常,
async
的函数在调用后会立即.await
,这就不是问题:- 例如:
foo(&x).await
- 例如:
- 如果存储
future
或将其传递给其他任务或者线程,就有问题了。。 - 一种变通方法:
- 思路:把使用引用作为参数的
async fn
转为一个'static future
- 做法:在
async
块里,将参数和async fn
的调用捆绑到一起(延长参数的生命周期来匹配future
)
- 思路:把使用引用作为参数的
async
库和闭包都支持move
async move
块会获得其引用变量的所有权:- 允许其比当前所在的作用域活得长
- 但同时也放弃了与其它代码共享的这些变量的能力
Demo
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR:`x`does not live long enough
}
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
在多线程执行者上进行.await
- 当使用多线程
future
执行者时,future
就可以在线程间移:- 所以
async
体里面用的变量必须能够在线程间移动 - 因为任何的
.await
都可能导致切换到一个新的线程
- 所以
- 这意味着使用以下类型是不安全的:
Rc
,&RefCell
和任何其它没有实现Send trait
的类型,包括没实现Sync trait
的引用- 注意:调用
.await
时,只要这些类型不在作用域内,就可以使用他们。
- 在跨域一个
.await
期间,持有传统的、对future
无感知的锁,也不是好主意:- 可导致线程池锁定
- 为此,可使用
futures::lock
里的Mutex
而不是std:sync
里的
四、Pinning
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
}
}
fn init(&mut self) {
let self_ref: *const String = &self.a;
self.b = self_ref;
}
fn a(&self) -> &str {
&self.a
}
fn b(&self) -> &String {
assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
unsafe { &*(self.b) }
}
}
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
std::mem::swap(&mut test1, &mut test2);
println!("a: {}, b: {}", test1.a(), test1.b());
println!("a: {}, b: {}", test2.a(), test2.b());
}
Pin的实践
Pin
类型会包裹指针类型,保证指针指向的值不被移动- 例如:
Pin<&mut T>
,Pin<&T>
,Pin<Box<T>>
- 即使
T:!Unpin
,也能保证T
不被移动
- 即使
Unpin trait
- 大多数类型如果被移动,不会造成问题,它们实现了
Unpin
- 指向
Unpin
类型的指针,可自由的放入或从Pin
中取出- 例如:
u8
是Unpin
的,Pin<&mut u8>
和普通的&mut u8
一样。
- 例如:
- 如果类型拥有
!Unpin
标记,那么在Pin
之后它们就无法移动了。
Pin到栈内存的Demo
use std::marker::PhantomPinned;
use std::pin::Pin;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
}
}
fn init(self: Pin<&mut Self>) {
let self_ptr: *const String = &self.a;
let this = unsafe { self.get_unchecked_mut() };
this.b = self_ptr;
}
fn a(self: Pin<&Self>) -> &str {
&self.get_ref().a
}
fn b(self: Pin<&Self>) -> &String {
unsafe { &*self.b }
}
}
fn main() {
println!("Hello, world!");
let mut test1 = Test::new("test1");
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
std::mem::swap(&mut test1, &mut test2);
println!(
"test1: {}, {}",
Test::a(test1.as_ref()),
Test::b(test1.as_ref())
);
println!(
"test2: {}, {}",
Test::a(test2.as_ref()),
Test::b(test2.as_ref())
);
// std::mem::swap(test1.get_mut(), test2.get_mut());
}
Pin到堆内存的Demo
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
}
impl Test {
fn new(txt: &str) -> Box<Self> {
Box::new(Test {
a: String::from(txt),
b: std::ptr::null(),
})
}
fn init(self: &mut Self) {
let self_ptr: *const String = &self.a;
self.b = self_ptr;
}
fn a(self: &Self) -> &str {
&self.a
}
fn b(self: &Self) -> &String {
unsafe { &*self.b }
}
}
fn main() {
println!("Hello, world!");
let mut test1 = Test::new("test1");
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
Test::init(test2.as_mut());
println!("test1: {}, {}", test1.a(), test1.b());
println!("test2: {}, {}", test2.a(), test2.b());
std::mem::swap(&mut test1, &mut test2);
println!("test1: {}, {}", test1.a(), test1.b());
println!("test2: {}, {}", test2.a(), test2.b());
}
Demo 2
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let t = Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a(self: Pin<&Self>) -> &str {
&self.get_ref().a
}
fn b(self: Pin<&Self>) -> &String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let test1 = Test::new("test1");
let test2 = Test::new("test2");
println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}
总结
如果
T: Unpin
(默认会实现),那么Pin<'a, T>
完全等价于&'a mut T
。换言之:Unpin
意味着这个类型被移走也没关系,就算已经被固定了,所以Pin
对这样的类型毫无影响。如果
T: !Unpin
, 获取已经被固定的 T 类型示例的&mut T
需要 unsafe。标准库中的大部分类型实现
Unpin
,在Rust
中遇到的多数“平常”的类型也是一样。但是,async/await
生成的Future
是个例外。你可以在
nightly
通过特性标记来给类型添加!Unpin
约束,或者在stable
给你的类型加std::marker::PhatomPinned
字段。你可以将数据固定到栈上或堆上
固定
!Unpin
对象到栈上需要unsafe
固定
!Unpin
对象到堆上不需要unsafe
。Box::pin
可以快速完成这种固定。针对已经
Pin
的数据,如果它是T:!Unpin
的,则需要保证它从被Pin
后,内存一直有效且不会调整其用途,直到dorp
被调用,这是 Pin 协约 中的重要部分。
五、Streams
Stream trait
与Future
类似,但能在完成前返还(yield
)多个值,与标准库中的Iterator
类似:
trait Stream {
/// 由`stream`产生的值的类型.
type Item;
/// 尝试解析`stream`中的下一项.
/// 如果已经准备好,就重新运行`Poll::Pending`, 如果已经完成,就重新
/// 运行`Poll::Ready(Some(x))`,如果已经完成,就重新运行`Poll::Ready(None)`.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
一个常见的使用Stream
的例子是futures
库中通道的Receiver
。每次Sender
端发送一个值时,它就会返回一个Some(val)
,并且会在Sender
关闭且所有消息都接收后返还None
:
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
//`StreamExt::next`类似于`Iterator::next`, 但会返回一个实现
// 了`Future<Output = Option<T>>`的类型.
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // 对于`next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // 对于`try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // 对于`try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
六、同时执行多个 Future
可以同时执行多个异步操作的方式:
join!
,等待所有的future
完成select!
,等待所有的future
中的一个完成Spawning
,创建一个顶级任务,他会运行一个future
直至完成FuturesUnoredered
,一组Future
,他们会产生每个子Future
的结果
join!
use futures::join;
async fn get_book_and_music() -> (Book, Music) {
let book_fut = get_book();
let music_fut = get_music();
join!(book_fut, music_fut)
}
try_join!
对于那些返回Result
的future
,考虑使用try_join!
而非join
。因为join
只会在所有子future
都完成后才会完成,它甚至会在子future
返回Err
之后继续处理。
use futures::try_join;
async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
use futures::{
future::TryFutureExt,
try_join,
};
// 注意,传进 try_join! 的 future 必须要用相同的错误类型。考虑使用 futures::future::TryFutureExt 库的 .map_err(|e| ...) 或 err_into() 函数来统一错误类型:
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
select!
use futures::{
future::FutureExt, // 为了`.fuse()`
pin_mut,
select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();
pin_mut!(t1, t2);
select! {
() = t1 => println!("task one completed first"),
() = t2 => println!("task two completed first"),
}
}
// select 也支持 default 和 complete 分支。
// default : 如果选择的 future 尚未完成,就会允许 default 分之, 拥有 default 的 select 总是会立即返回
// complete:它用于所有选择的 future 都已经完成的情况
use futures::{future, select};
async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => unreachable!(), // 永远不会被执行(futures都准备好了,然后complete分支被执行)
};
}
assert_eq!(total, 10);
}
与 Unpin 和 FusedFuture 交互
- 前面的例子中,需要在返回的
future
上调用.fuse()
,也调用了pin_mut
。- 因为
select
里面的future
必须实现Unpin
和FusedFuture
这两个trait
。
- 因为
- 必须
Unpin:select
使用的future
不是按值的,而是按可变引用。- 未完成的
future
在调用select
后仍可使用
- 未完成的
- 必须
FusedFuture:
在future
完成后,select
不可以对它进行poll
- 实现
FusedFuture
的future
会追踪其完成状态,这样在select
循环里,就只会poll
没有完成的future
- 实现
Demo1
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}
total
}
带有 Fuse 和 FuturesUnordered 的 select 循环中的并发任务
有个不太好找但是很趁手的函数叫Fuse::terminated()
。这个函数允许构造已经被终止的空future
,并且能够在之后填进需要运行的future
。
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) { /* ... */ }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// 计时器已经完成了.
// 如果没有`get_new_num_fut`正在执行的话,就启动一个新的.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// 一个新的数字到达了
// 启动一个新的`run_on_new_num_fut`并且扔掉旧的.
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
// 执行`run_on_new_num_fut`
() = run_on_new_num_fut => {},
// 当所有都完成时panic,
// 因为理论上`interval_timer`会不断地产生值.
complete => panic!("`interval_timer`completed unexpectedly"),
}
}
}
当有很多份相同future
的拷贝同时执行时,使用FutureUnordered
类型。下面的例子和上面的例子很类似,但会运行run_on_new_num_fut
的所有拷贝都到完成状态,而不是当一个新拷贝创建时就中断他们。它也会打印run_on_new_num_fut
的返回值:
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }
// 用从`get_new_num`获取的最新的数字运行`run_on_new_num`.
//
// 每当定时器到期后,都会重新执行`get_new_num`,
// 并立即取消正在执行的`run_on_new_num`,随后用新返回值替换`run_on_new_num`.
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// 计时器已经完成了.
// 如果没有`get_new_num_fut`正在执行的话,就启动一个新的.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// 一个新的数字到达了,启动一个新的`run_on_new_num_fut`.
run_on_new_num_futs.push(run_on_new_num(new_num));
},
// 执行`run_on_new_num_futs`并检查有没有完成的.
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
// 当所有都完成时panic,
// 因为理论上`interval_timer`会不断地产生值.
complete => panic!("`interval_timer`completed unexpectedly"),
}
}
}
七、掌握并喜爱的解决方法
// ? in async Blocks
fn main() -> std::io::Result<()> {
struct MyError;
async fn foo() -> Result<(), MyError> {
Ok(())
}
async fn bar() -> Result<(), MyError> {
Ok(())
}
let fut = async {
foo().await?;
bar().await?;
Ok(()) // 报错,cannot infer type of the type parameter`E`declared on the enum`Result`
// Ok::<(), MyError>(()) // <- note the explicit type annotation here
};
Ok(())
}
// Send 模拟
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
// let x = NotSend::default(); // 报错,has type`NotSend`which is not`Send`
{
let x = NotSend::default();
}
bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
require_send(foo());
}
// 递归
async fn recursive() {
recursive().await; // 报错 recursion in an`async fn`requires boxing
recursive().await;
}
// 解决方法
use futures::future::{BoxFuture, FutureExt};
fn recursive() -> BoxFuture<'static, ()> {
async move {
recursive().await;
recursive().await;
}.boxed()
}
trait 中的 async
目前,async fn
不能在trait
中使用。原因一些复杂,但是有计划在未来移除这个限制。
这个问题可以用 async-trait 库来避免
八、Async 生态系统
Rust 没提供什么
Rust
目前只提供编写async
代码的基本要素,标准库中尚未提供执行器、任务、反应器、组合器以及低级I/O future
和trait
- 社区提供的
async
生态系统填补了这些空白
Async 运行时
Async
运行时是用于执行async
应用程序的库- 运行时通常将一个反应器与一个或多个执行器捆绑在一起
- 反应器为异步
I/O
、进程间通信和计时器等外部事件提供订阅机制 - 在
async
运行时中,订阅者通常是代表低级别I/O
操作的future
。 - 执行者负责安排和执行任务。
- 它们跟踪正在运行和暂停的任务,对
future
进行poll
直到完成,并在任务能够取得进展时唤醒任务 - “
执行者
”一词经常与“运行时
”互换使用。
- 它们跟踪正在运行和暂停的任务,对
- 我们使用“
生态系统
”一词来描述一个与兼容trait
和特性捆绑在一起的运行时。
社区提供的 async crates
futures crate
,提供了Stream
、Sink
、AsyncRead
、AsyncWrite
等trait
,以及组合器等工具。这些可能最终会成为标准库的一部分futures
有自己的执行器,但没有自己的反应器,因此它不支持async I/O
或计时器future
的执行。- 因此,它不被认为是完整的运行时。
- 常见的选择是:与另一个
crate
中的执行器一起使用来自futures
提供的工具
流行的运行时
Tokio
:一个流行的async
生态系统,包含HTTP
、gRPC
和跟踪框架async-std
:提供标准库的async
副本smol
:小型、简化的async
运行时。提供可用于包装UnixStream
或TcpListener
等结构的async trait
fuchsia-async
: 用于Fuchsia OS
的执行器
确定生态兼容性
- 与
async I/O
、计时器、进程间通信或任务交互的async
代码通常取决于特定的异步执行器或反应器 - 所有其他
async
代码,如异步表达式、组合器、同步类型和流,通常与生态系统无关,前提是任何嵌套的future
也与生态系统无关 - 在开始一个项目之前,建议研究相关的
async
框架和库,以确保与您选择的运行时以及彼此之间的兼容性。
九、并发 Web 服务器
非并发Demo
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main() {
// 在端口7878侦听传入链接
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// 一直阻塞,处理到达这个IP地址的每一个请求
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// 从流中读取前1024字节的数据
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
// 根据请求的数据决定响应问候还是404.
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
// 将响应写回流并刷新(flush)以确保响应被发送回客户端.
let response = format!("{}{}", status_line, contents);
stream.write(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
异步Demo
[dependencies]
futures = "0.3"
[dependencies.async-std]
version = "1.6"
features = ["attributes"]
src/main.rs
use async_std;
use async_std::io::ReadExt;
use async_std::net::{TcpListener };
use async_std::task::spawn;
use async_web_server::*;
use futures::{AsyncWriteExt, StreamExt};
use std::fs;
use std::time::Duration;
use async_std::io::Read;
use async_std::io::Write;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(None, |tcp_stream| async move {
let stream = tcp_stream.unwrap();
spawn(handle_connection(stream));
})
.await;
print!("Shutting down");
}
async fn handle_connection(mut stream: impl Read + Write + Unpin) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let mut file_type = HtmlFile::NotFound;
if buffer.starts_with(GET) {
file_type = HtmlFile::OK
} else if buffer.starts_with(SLEEP) {
async_std::task::sleep(Duration::from_secs(5)).await;
file_type = HtmlFile::SLEEP
}
build_content(file_type, stream).await;
}
async fn build_content(file: HtmlFile, mut stream: impl Read + Write + Unpin) {
let (path, status_line) = match file {
HtmlFile::OK => ("hello.html", "HTTP/1.1 200 OK"),
HtmlFile::SLEEP => ("hello.html", "HTTP/1.1 200 OK"),
_ => ("404.html", "HTTP/1.1 404 NOT FOUND"),
};
let contents = fs::read_to_string(path).unwrap();
let response = format!(
"{}\r\nContent-Length: {}\r\n\r\n{}",
status_line,
contents.len(),
contents
);
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
#[async_std::test]
async fn test_handle_connection() {
let input_bytes = b"GET / HTTP/1.1\r\n";
let mut contents = vec![0u8; 1024];
contents[..input_bytes.len()].clone_from_slice(input_bytes);
let mut stream = MockTcpStream {
read_data: contents,
write_data: Vec::new(),
};
handle_connection(&mut stream).await;
let mut buf = [0u8; 1024];
stream.read(&mut buf).await.unwrap();
let expected_contents = fs::read_to_string("hello.html").unwrap();
let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
assert!(stream.write_data.starts_with(expected_response.as_bytes()));
}
文件src/lib.rs
use async_std::io::{Read, Write};
use futures::task::{Context, Poll};
use std::cmp::min;
use std::pin::Pin;
pub const GET: &[u8] = b"GET / HTTP/1.1\r\n";
pub const SLEEP: &[u8] = b"GET /sleep HTTP/1.1\r\n";
pub enum HtmlFile {
OK,
SLEEP,
NotFound,
}
pub struct MockTcpStream {
pub read_data: Vec<u8>,
pub write_data: Vec<u8>,
}
impl Read for MockTcpStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let size: usize = min(self.read_data.len(), buf.len());
buf[..size].copy_from_slice(&self.read_data[..size]);
Poll::Ready(Ok(size))
}
}
impl Write for MockTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
self.write_data = Vec::from(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}
impl Unpin for MockTcpStream {}