When only one Future is executed, .await
? can be used directly inside the asynchronous function async fn
or the asynchronous code block async {}
, when When multiple Futures need to be executed concurrently, using .await
directly will block the concurrent tasks until the specific ?Future? is completed (serial execution). The futures
? package provides many practical tools that can execute Future concurrently, such as ?join!
? macro? and ?select!
? macro .
Note: The futures::future? module provides a series of functions that can operate on Future (much richer than operating macros). For details, see: docs.rs/futures/lat… docs.rs/futures/lat…
join! macro
The join!
macro allows to wait for the completion of multiple different Futures at the same time, and run these Futures concurrently.
Let’s first look at two wrong versions using .await
:
struct Book; struct Music; async fn enjoy_book() -> Book {<!-- --> /* ... */ Book } async fn enjoy_music() -> Music {<!-- --> /* ... */ Music} // Wrong version 1: Asynchronous internally executes serially, instead of running them at the same time async fn enjoy1_book_and_music() -> (Book, Music) {<!-- --> // actually inside the asynchronous function is executed serially let book = enjoy_book().await; // await triggers blocking execution let music = enjoy_music().await; // await triggers blocking execution (book, music) } // Wrong version 2: Asynchronous internally executes serially, instead of running them at the same time async fn enjoy2_book_and_music() -> (Book, Music) {<!-- --> // actually inside the asynchronous function is executed serially let book_future = enjoy_book(); // async function is lazy and not executed immediately let music_future = enjoy_music(); // async function is lazy and not executed immediately (book_future. await, music_future. await) }
The above two examples seem to be able to run smoothly and asynchronously, but in fact, you must read the book before you can listen to music. That is, the tasks inside the asynchronous function are executed serially (sequentially) instead of concurrently Run.
Because Future? is lazy in Rust, it will not start running until ?.await
? is called. And those two? await
? Because there is a sequence in the code, they run sequentially.
To properly concurrently run two ?Future?, let’s try the ?futures::join!
? macro:
use futures::join; // Using `join!` will return a tuple, the value inside is the output value after the execution of the corresponding `Future`. async fn enjoy_book_and_music() -> (Book, Music) {<!-- --> let book_fut = enjoy_book(); let music_fut = enjoy_music(); // The join! macro must wait for all Futures it manages to complete before it can complete join!(book_fut, music_fut) } fn main() {<!-- --> futures::executor::block_on(enjoy_book_and_music()); }
If you want to run multiple asynchronous tasks in an array at the same time, you can use the ?
futures::future::join_all
? method
try_join! macro
Since join!
must wait for all the ?Future?s managed by it to complete, if you want to stop the execution of all ?Futures immediately after a certain ?Future? reports an error, you can use ? try_join!
, especially when the ?Future? returns a ?Result
?.
Note: All ?Future? passed to ?try_join!
? must have the same error type. If the error type is different, consider using the ?map_err
and err_info
methods from the ?futures::future::TryFutureExt
? module to convert the error :
use futures::{<!-- --> future::TryFutureExt, try_join, }; struct Book; struct Music; async fn get_book() -> Result<Book, ()> {<!-- --> /* ... */ Ok(Book) } async fn get_music() -> Result<Music, String> {<!-- --> /* ... */ Ok(Music) } /** * All Futures passed to try_join! must have the same error type. * If the error type is different, consider using the map_err and err_info methods from the futures::future::TryFutureExt module to convert the error: */ async fn get_book_and_music() -> Result<(Book, Music), String> {<!-- --> let book_fut = get_book().map_err(|()| "Unable to get book".to_string()); let music_fut = get_music(); // Immediately stop the execution of all Futures after a certain Future reports an error, you can use try_join! try_join!(book_fut, music_fut) } async fn get_into_book_and_music() -> (Book, Music) {<!-- --> get_book_and_music(). await. unwrap() } fn main() {<!-- --> futures::executor::block_on(get_into_book_and_music()); }
select! macro
The join!
macro only waits for all the ?Future?s to finish before the results can be processed centrally. The select!
macro means waiting for multiple ?Future?s, as long as any ?Future? is completed , can be processed immediately:
use futures::{<!-- --> future::FutureExt, // for `.fuse()` pin_mut, select, }; async fn task_one() {<!-- --> /* ... */ } async fn task_two() {<!-- --> /* ... */ } /** * Race mode: run t1 and t2 concurrently, no matter which one is completed first, the function ends without waiting for another task to complete */ async fn race_tasks() {<!-- --> // The .fuse() method allows Future to implement the FusedFuture feature, let t1 = task_one().fuse(); let t2 = task_two().fuse(); // The pin_mut macro implements the Unpin trait for Future pin_mut!(t1, t2); // Wait for multiple Futures at the same time, and any Future can be processed immediately after the end, you can consider using futures::select!: select! {<!-- --> () = t1 => println!("Task 1 completed first"), () = t2 => println!("Task 2 completed first"), } }
The above code will run both ?t1
? and ?t2
concurrently, no matter which one is completed first, the corresponding ?println!
will be called ?Print the corresponding output, then the function ends without waiting for another task to complete.
Note: The necessary conditions for using the select
macro are: FusedFuture + Unpin, realized by fuse method and pin_mut macro
First, the .fuse()
method allows ?Future
? to implement the ?FusedFuture
trait, while ?pin_mut!
? The macro will implement the ?Unpin
trait for ?Future
?:
Note: The
select!
macro must satisfy:FusedStream
+Unpin
Two Trait constraints:
Unpin
: Because ?select
does not use Future by taking ownership, but by variable reference, when ?select
After code>? ends, if the ?Future is not completed, its ownership can continue to be used by other codes.FusedFuture
: Once the ?Future? is completed, then ?select
can no longer poll it.Fuse
means fuse, which is equivalent to ?Future? Once completed, callingpoll
again will directly returnPoll::Pending
.
Only when FusedFuture
is implemented, select
? can be used together with ?loop
?. If it is not implemented, even if a ?Future
? has been completed, it will still be executed by ?select
? non-stop polling.
Stream
? is slightly different, they use the characteristic?FusedStream
. The ?Stream
of this feature is realized by .fuse()
(can also be implemented manually), call .next()
? or? The .try_next()
method can get the Future
that implements the FusedFuture
feature:
use futures::{<!-- --> stream::{<!-- -->Stream, StreamExt, FusedStream}, select, }; async fn add_two_streams() -> u8 {<!-- --> // mut s1: impl Stream<Item = u8> + FusedStream + Unpin, // mut s2: impl Stream<Item = u8> + FusedStream + Unpin, // The .fuse() method allows Stream to implement the FusedStream feature, let s1 = futures::stream::once(async {<!-- --> 10 }).fuse(); let s2 = futures::stream::once(async {<!-- --> 20 }).fuse(); // The pin_mut macro implements the Unpin trait for Stream pin_mut!(s1, s2); let mut total = 0; loop {<!-- --> let item = select! {<!-- --> x = s1. next() => x, x = s2. next() => x, complete => break, default => panic!(), // This branch will never run, because `Future` will run first, then `complete` }; if let Some(next_num) = item {<!-- --> total += next_num; } } println!("add_two_streams, total = {total}"); total } fn main() {<!-- --> executor::block_on(add_two_streams()); }
Note: The
select!
macro also supports the ?default
? and ?complete
? branches:
complete
? branch: it will be executed when all ?Future
? and ?Stream
? are completed, it often cooperates withloop
Used,loop
is used to cycle through all ?Future
default
branch: if there is no ?Future
? or ?Stream
? in ?Ready
? state, then this branch will be executed immediately
In the process of using the select
macro, two very useful functions and types are recommended:
Fuse::terminated()
function?: You can construct an empty Future in theselect
loop (FusedFuture
has been implemented), and fill it later as needed new futureFuturesUnordered
type?: You can make a ?Future
? have multiple copies and can run concurrently
use futures::{<!-- --> future::{<!-- -->Fuse, FusedFuture, FutureExt}, stream::{<!-- -->FusedStream, FuturesUnordered, Stream, StreamExt}, pin_mut, select, }; async fn future_in_select() {<!-- --> // create an empty Future, and already implements FusedFuture let fut = Fuse::terminated(); // Create a FuturesUnordered type that can be copied multiple times let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new(); async_tasks.push(Box::pin(async {<!-- --> 1 })); pin_mut!(fut); let mut total = 0; loop {<!-- --> select! {<!-- --> // select_next_some function can be used on ?`select`?, and only run the ?`Some(_)`? value returned from stream and ignore ?`None` num = async_tasks. select_next_some() => {<!-- --> println!("first num is {num} and total is {total}"); total += num; println!("total is {total}"); if total >= 10 {<!-- --> break; } // Check if terminated if fut.is_terminated() {<!-- --> // Populate new future on demand fut.set(async {<!-- --> 1 }.fuse()); } }, num = fut => {<!-- --> println!("second num is {num} and total is {total}"); total += num; println!("now total is {total}"); async_tasks.push(Box::pin(async {<!-- --> 1 })); }, complete => break, default => panic!(), }; } println!("total finally is {total}"); } fn main() {<!-- --> executor::block_on(future_in_select()); }
Summary
The futures
?package provides many practical tools for concurrent execution of Future?, such as:
join!
macro: run multiple different Futures concurrently, and wait until all Futures are completed before it is considered over. It can be understood as a task that must be completed in concurrent modetry_join!
macro: run multiple different Futures concurrently, stop the execution of all Futures immediately after a certain Future reports an error, and the Future returnsResult
, which can end early Task Concurrent Modeselect!
macro: run multiple different Futures concurrently, and as long as any Future is completed, it can be processed immediately, which can be understood as a task race mode- Use the
select!
macro must meet:FusedFuture
+Unpin
, through thefuse
method andpin_mut
macro implementation
Reference
- course.rs/advance/asy…
- huangjj27.github.io/async-book/…
- docs.rs/futures/lat…
- docs.rs/futures/lat…