异步入门
为什么需要异步?
异步操作是在非阻塞方案中执行的操作,允许主程序流继续处理。
假设需求场景为客户端从多个服务器下载多个文件。
下载方式 | 缺点 |
---|---|
依次按照顺序 | 必须等待前一个完成 |
多线程 | 为每一个下载任务创建线程,导致内存占满 |
线程池 | 发起下载请求后,需要等待服务端的响应,当前线程会阻塞 |
注意:多线程和线程池都可以是异步的一种实现方式。异步是和同步相对的概念。
异步的优缺点
因为异步操作无须额外的线程负担,并且使用回调的方式进行处理,在设计良好的情况下,处理函数可以不必使用共享变量(即使无法完全不用,最起码可以减少共享变量的数量),减少了死锁的可能。当然异步操作也并非完美无暇。编写异步操作的复杂程度较高,程序主要使用回调方式进行处理,与普通人的思维方式有些初入,而且难以调试。
简单使用
1 | use async_std::task; |
执行结果
1 | dance |
说明:
- 如果使用
sleep(Duration::from_secs(5))
,结果会是按照顺序执行。因为外部的阻塞不能主动唤醒异步内部的线程,所以直接在外部进行阻塞。如果使用异步的sleep
,learn song
会让出资源; - 通过join,能等待多个Future完成,并发执行;
.await
是在代码块中按顺序执行,会阻塞后面的代码,但是此时会让出线程;block_on
会阻塞直到Future执行完成。
Future
标准库定义
1 | use crate::marker::Unpin; |
poll
函数传递一个 &mut Context<'_>
类型参数, 返回一个 Poll
类型参数:
Context 主要包含一个
Waker
对象,由执行器提供,用于告诉执行器,重新执行当前poll
函数1
2
3
4
5
6
7
8
9
10
// Context的生命周期不会比它包含的waker引用更久
pub struct Context<'a> {
waker: &'a Waker,
// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}Poll 是一个枚举类型包含两个枚举
Ready<Output>
当任务已经就绪,返回该对象Pending
任务没有就绪时返回该对象,此Future将让出CPU,直到在其他线程或者任务执行调用Waker
为止
实现者需要保证 poll 是非阻塞,如果是阻塞的话会导致循环进行不下去
实现一个 Future 类型的方式
- 方式1:使用
async fn
,编译器会自动生成实现 Future Trait的类型 - 方式2:自定义结构体,并实现 Future Trait
Pin
默认情况下,Rust中所有类型都是可以 move
的,Rust允许按值传递所有类型,并且像 Box<T>
、&mut T
之类的智能指针或者引用允许你通过 mem::swap
进行拷贝交换(移动),这样,如果存在结构体存在自引用,将导致引用失效。
而 async 编译后的结构可能就会出现一种自引用的结构,如下所示:
1 | async { |
这样 AsyncFuture 构造出来后,就存在自引用(AsyncFuture.read_into_buf_fut.buf
指向 AsyncFuture.x
)。但是如果AsyncFuture发生移动,x肯定也会发生移动,如果read_into_buf_fut.buf
还是指向原来的值的话,则会变成无效。而Pin就是为了解决此问题的。
Pin 类型包着指针类型,保证指针背后的值将不被移动。例如 Pin<&mut T>
,Pin<&T>
, Pin<Box<T>>
都保证 T
不会移动(move)。
原理
1 | pub struct Pin<P> { |
首先
Pin<T>
和Box<T>
类似都是一种智能指针。不同点在于Pin<&mut T>
不能通过safe代码拿到&mut T
,因此保证mem::swap
无法调用,也就是P
所指向的T
在内存中固定住,不能移动。Pin::as_mut
返回的仍是Pin<T>
只有
Pin<DerefMut<T: Unpin>>
或者Pin<Deref<T: Unpin>>
或者Pin<T: Unpin>
可以通过get_mut
或者get_ref
拿到T
的引用Pin::new
只能是针对实现了Unpin
的类型(重要)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24impl<P: Deref<Target: Unpin>> Pin<P> {
/// Construct a new `Pin<P>` around a pointer to some data of a type that
/// implements [`Unpin`].
///
/// Unlike `Pin::new_unchecked`, this method is safe because the pointer
/// `P` dereferences to an [`Unpin`] type, which cancels the pinning guarantees.
pub fn new(pointer: P) -> Pin<P> {
// Safety: the value pointed to is `Unpin`, and so has no requirements
// around pinning.
unsafe { Pin::new_unchecked(pointer) }
}
/// Unwraps this `Pin<P>` returning the underlying pointer.
///
/// This requires that the data inside this `Pin` is [`Unpin`] so that we
/// can ignore the pinning invariants when unwrapping it.
pub fn into_inner(pin: Pin<P>) -> P {
pin.pointer
}
}只有
P<T>
的T: Unpin
,才可以new出一个Pin<P<T>>
。这里的T就是应该被pin的实例,可是由于T: Unpin
实际上T的实例并不会被pin。如果要创建不实现
Unpin
的Pin<P<T>>
,可以使用unsafe{ Pin::new_unchecked(&mut t) }
。
本质上实现不移动就是加了一层指针,并未违反任意值都是可以移动的规则。
- 比如
Pin<T>
发生移动时,仅仅是Pin
这个结构发生了移动,但是T
对象并没有移动
- 比如
Unpin
1 | pub auto trait Unpin {} |
定义在std::marker
中,如果T: Unpin
,那么T
在pin
后可以安全地移动,可以拿到&mut T
。Unpin
只对Pin<P<T>>
的T
起作用,不对P
本身起效,例如对Pin<Box<T>>
的Box<T>
是无效的。
默认为以下类型实现了Unpin
:
1 | impl<'a, T: ?Sized + 'a> Unpin for &'a T {} |
async
生成的匿名结构体(impl Future<Output=()>
)没有实现Unpin
。
!Unpin
对Unpin取反,!Unpin的双重否定就是pin。如果一个类型中包含了PhantomPinned,那么这个类型就是!Unpin。
1 |
|
一般在结构体中使用_marker: PhantomPinned
来实现!Unpin
。
完整示例
1 | use std::pin::Pin; |
如果使用Box::pin(t)
创建Pin<Box<T>>
,则会将数据固定到堆上。
1 | impl Test { |
为什么堆上的pin
对象可以进行swap
?
boxed
只是一个栈变量,所指的对象在堆上。通过swap
仅仅bitcopy and swap
两个栈变量test1
和test2
,相当于两者交换了所有权,交换了指向,而堆上的数据不受影响。T
类型对象内存位置固定,所有没有违反Pin
的语义要求。
async/await
async转化的Future对象和其它Future一样是具有惰性的,即在运行之前什么也不做。运行Future最常见的方式是.await
。
async
有如下代码:
1 | async fn async_main() { |
那么实际上会生成一个匿名的Future trait object
,包裹一个 Generator
。也就是一个实现了 Future
的 Generator
。Generator
实际上是一个状态机,配合.await
当每次async
代码块中任何返回 Poll::Pending
则即调用generator yeild
,让出执行权,一旦恢复执行,generator resume
继续执行剩余流程。
1 | pub const fn from_generator<T>(gen: T) -> impl Future<Output = T::Return> |
每一次gen.resume()
会顺序执行async block
中代码直到遇到yield
。async block
中的.await
语句在无法立即完成时会调用yield
交出控制权等待下一次resume
。而当所有代码执行完,也就是状态机入Complete
,async block
返回Poll::Ready
,代表Future
执行完毕。
生成的匿名对象类似如下:
1 | struct AsyncFuture { |
poll
进行轮询每个Future
的状态,如果Poll::Ready()
则进入下一个State
,直到Done
。
生命周期
1 | async fn foo(x: &u8) -> u8 { *x } |
通过将x
移动到async
中,延长x
的生命周期和foo
返回的Future生命周期一致。
move
async 块和闭包允许 move 关键字,就像普通的闭包一样。一个 async move 块将获取它引用变量的所有权,允许它活得比目前的范围长,但放弃了与其它代码分享那些变量的能力。
线程间移动
在使用多线程Future的excutor时,Future可能在线程之间移动,因此在async主体中使用的任何变量都必须能够在线程之间传输,因为任何.await变量都可能导致切换到新线程。
async fn Future是否为Send的取决于是否在.await点上保留非Send类型。编译器尽其所能地估计值在.await点上的保存时间。
1 | async fn foo() { |
如果x
不在await
前drop,那么该Future是非Send的。而Future的异步特性要求需要有Send
约束。
Stream
Stream
是由一系列的Future
组成,我们可以从Stream
读取各个Future
的结果,直到Stream
结束。
定义
1 | trait Stream { |
poll_next
函数有三种可能的返回值,分别如下:
Poll::Pending
说明下一个值还没有就绪,仍然需要等待。Poll::Ready(Some(val))
已经就绪,成功返回一个值,程序可以通过调用poll_next
再获取下一个值。Poll::Ready(None)
表示Stream
已经结束,不应该在调用poll_next
。
迭代
Stream
不支持使用for
,而while let
和 next/try_next
则是允许的。
1 | async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 { |
并发
1 | async fn jump_around( |
Select
select宏也允许并发的执行Future,但是和join、try_join不同的是,select宏只要有一个Future返回,就会返回。
1 | use futures::{select, future::FutureExt, pin_mut}; |
BoxFuture
一个拥有的动态类型[‘ Future ‘],在不是静态输入或需要添加一些间接类型的情况下使用。
比如在递归使用Future时:
1 | use futures::future::{BoxFuture, FutureExt}; |