Rust asynchronous programming uses join! and select! to run multiple Futures at the same time

When only one Future is executed, .await? can be used directly inside the asynchronous function async fn or the asynchronous code block async {}, when When multiple Futures need to be executed concurrently, using .await directly will block the concurrent tasks until the specific ?Future? is completed (serial execution). The futures? package provides many practical tools that can execute Future concurrently, such as ?join!? macro? and ?select!? macro .

Note: The futures::future? module provides a series of functions that can operate on Future (much richer than operating macros). For details, see: docs.rs/futures/lat… docs.rs/futures/lat…

join! macro

The join! macro allows to wait for the completion of multiple different Futures at the same time, and run these Futures concurrently.

Let’s first look at two wrong versions using .await:

struct Book;
struct Music;

async fn enjoy_book() -> Book {<!-- --> /* ... */ Book }
async fn enjoy_music() -> Music {<!-- --> /* ... */ Music}

// Wrong version 1: Asynchronous internally executes serially, instead of running them at the same time
async fn enjoy1_book_and_music() -> (Book, Music) {<!-- -->
    // actually inside the asynchronous function is executed serially
    let book = enjoy_book().await; // await triggers blocking execution
    let music = enjoy_music().await; // await triggers blocking execution
    (book, music)
}

// Wrong version 2: Asynchronous internally executes serially, instead of running them at the same time
async fn enjoy2_book_and_music() -> (Book, Music) {<!-- -->
    // actually inside the asynchronous function is executed serially
    let book_future = enjoy_book(); // async function is lazy and not executed immediately
    let music_future = enjoy_music(); // async function is lazy and not executed immediately
    (book_future. await, music_future. await)
}

The above two examples seem to be able to run smoothly and asynchronously, but in fact, you must read the book before you can listen to music. That is, the tasks inside the asynchronous function are executed serially (sequentially) instead of concurrently Run.

Because Future? is lazy in Rust, it will not start running until ?.await? is called. And those two? await? Because there is a sequence in the code, they run sequentially.

To properly concurrently run two ?Future?, let’s try the ?futures::join!? macro:

use futures::join;

// Using `join!` will return a tuple, the value inside is the output value after the execution of the corresponding `Future`.
async fn enjoy_book_and_music() -> (Book, Music) {<!-- -->
    let book_fut = enjoy_book();
    let music_fut = enjoy_music();
    // The join! macro must wait for all Futures it manages to complete before it can complete
    join!(book_fut, music_fut)
}

fn main() {<!-- -->
    futures::executor::block_on(enjoy_book_and_music());
}

If you want to run multiple asynchronous tasks in an array at the same time, you can use the ?futures::future::join_all? method

try_join! macro

Since join! must wait for all the ?Future?s managed by it to complete, if you want to stop the execution of all ?Futures immediately after a certain ?Future? reports an error, you can use ? try_join!, especially when the ?Future? returns a ?Result?.

Note: All ?Future? passed to ?try_join!? must have the same error type. If the error type is different, consider using the ?map_err and err_info methods from the ?futures::future::TryFutureExt? module to convert the error :

use futures::{<!-- -->
    future::TryFutureExt,
    try_join,
};

struct Book;
struct Music;

async fn get_book() -> Result<Book, ()> {<!-- --> /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> {<!-- --> /* ... */ Ok(Music) }
/**
 * All Futures passed to try_join! must have the same error type.
 * If the error type is different, consider using the map_err and err_info methods from the futures::future::TryFutureExt module to convert the error:
 */
async fn get_book_and_music() -> Result<(Book, Music), String> {<!-- -->
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    // Immediately stop the execution of all Futures after a certain Future reports an error, you can use try_join!
    try_join!(book_fut, music_fut)
}

async fn get_into_book_and_music() -> (Book, Music) {<!-- -->
    get_book_and_music(). await. unwrap()
}

fn main() {<!-- -->
    futures::executor::block_on(get_into_book_and_music());
}

select! macro

The join! macro only waits for all the ?Future?s to finish before the results can be processed centrally. The select! macro means waiting for multiple ?Future?s, as long as any ?Future? is completed , can be processed immediately:

use futures::{<!-- -->
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() {<!-- --> /* ... */ }
async fn task_two() {<!-- --> /* ... */ }

/**
 * Race mode: run t1 and t2 concurrently, no matter which one is completed first, the function ends without waiting for another task to complete
 */
async fn race_tasks() {<!-- -->
    // The .fuse() method allows Future to implement the FusedFuture feature,
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    // The pin_mut macro implements the Unpin trait for Future
    pin_mut!(t1, t2);

    // Wait for multiple Futures at the same time, and any Future can be processed immediately after the end, you can consider using futures::select!:
    select! {<!-- -->
        () = t1 => println!("Task 1 completed first"),
        () = t2 => println!("Task 2 completed first"),
    }
}

The above code will run both ?t1? and ?t2 concurrently, no matter which one is completed first, the corresponding ?println! will be called ?Print the corresponding output, then the function ends without waiting for another task to complete.

Note: The necessary conditions for using the select macro are: FusedFuture + Unpin, realized by fuse method and pin_mut macro

First, the .fuse() method allows ?Future? to implement the ?FusedFuture trait, while ?pin_mut!? The macro will implement the ?Unpintrait for ?Future?:

Note: The select! macro must satisfy: FusedStream + Unpin Two Trait constraints:

  • Unpin: Because ?select does not use Future by taking ownership, but by variable reference, when ?select After code>? ends, if the ?Future is not completed, its ownership can continue to be used by other codes.
  • FusedFuture: Once the ?Future? is completed, then ?select can no longer poll it. Fuse means fuse, which is equivalent to ?Future? Once completed, calling poll again will directly return Poll::Pending.

Only when FusedFuture is implemented, select? can be used together with ?loop?. If it is not implemented, even if a ?Future? has been completed, it will still be executed by ?select? non-stop polling.

Stream? is slightly different, they use the characteristic?FusedStream. The ?Stream of this feature is realized by .fuse() (can also be implemented manually), call .next()? or? The .try_next() method can get the Future that implements the FusedFuture feature:

use futures::{<!-- -->
    stream::{<!-- -->Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams() -> u8 {<!-- -->
    // mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    // mut s2: impl Stream<Item = u8> + FusedStream + Unpin,

    // The .fuse() method allows Stream to implement the FusedStream feature,
    let s1 = futures::stream::once(async {<!-- --> 10 }).fuse();
    let s2 = futures::stream::once(async {<!-- --> 20 }).fuse();

    // The pin_mut macro implements the Unpin trait for Stream
    pin_mut!(s1, s2);

    let mut total = 0;

    loop {<!-- -->
        let item = select! {<!-- -->
            x = s1. next() => x,
            x = s2. next() => x,
            complete => break,
            default => panic!(), // This branch will never run, because `Future` will run first, then `complete`
        };
        if let Some(next_num) = item {<!-- -->
            total += next_num;
        }
    }
    println!("add_two_streams, total = {total}");
    total
}
fn main() {<!-- -->
    executor::block_on(add_two_streams());
}

Note: The select! macro also supports the ?default? and ?complete? branches:

  • complete? branch: it will be executed when all ?Future? and ?Stream? are completed, it often cooperates with loopUsed, loop is used to cycle through all ?Future
  • default branch: if there is no ?Future? or ?Stream? in ?Ready? state, then this branch will be executed immediately

In the process of using the select macro, two very useful functions and types are recommended:

  • Fuse::terminated() function?: You can construct an empty Future in the select loop (FusedFuture has been implemented), and fill it later as needed new future
  • FuturesUnordered type?: You can make a ?Future? have multiple copies and can run concurrently
use futures::{<!-- -->
    future::{<!-- -->Fuse, FusedFuture, FutureExt},
    stream::{<!-- -->FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn future_in_select() {<!-- -->
    // create an empty Future, and already implements FusedFuture
    let fut = Fuse::terminated();
    // Create a FuturesUnordered type that can be copied multiple times
    let mut async_tasks: FuturesUnordered<Pin<Box<dyn Future<Output = i32>>>> = FuturesUnordered::new();
    async_tasks.push(Box::pin(async {<!-- --> 1 }));
    
    pin_mut!(fut);
    
    let mut total = 0;
    loop {<!-- -->
        select! {<!-- -->
            // select_next_some function can be used on ?`select`?, and only run the ?`Some(_)`? value returned from stream and ignore ?`None`
            num = async_tasks. select_next_some() => {<!-- -->
                println!("first num is {num} and total is {total}");
                total += num;
                println!("total is {total}");
                if total >= 10 {<!-- --> break; }
                // Check if terminated
                if fut.is_terminated() {<!-- -->
                    // Populate new future on demand
                    fut.set(async {<!-- --> 1 }.fuse());
                }
            },
            num = fut => {<!-- -->
                println!("second num is {num} and total is {total}");
                total += num;
                println!("now total is {total}");
                async_tasks.push(Box::pin(async {<!-- --> 1 }));
            },
            complete => break,
            default => panic!(),
        };
    }

    println!("total finally is {total}");
}

fn main() {<!-- -->
    executor::block_on(future_in_select());
}

Summary

The futures?package provides many practical tools for concurrent execution of Future?, such as:

  1. join! macro: run multiple different Futures concurrently, and wait until all Futures are completed before it is considered over. It can be understood as a task that must be completed in concurrent mode
  2. try_join! macro: run multiple different Futures concurrently, stop the execution of all Futures immediately after a certain Future reports an error, and the Future returns Result, which can end early Task Concurrent Mode
  3. select! macro: run multiple different Futures concurrently, and as long as any Future is completed, it can be processed immediately, which can be understood as a task race mode
  4. Use the select! macro must meet: FusedFuture + Unpin, through the fuse method and pin_mut macro implementation

Reference

  • course.rs/advance/asy…
  • huangjj27.github.io/async-book/…
  • docs.rs/futures/lat…
  • docs.rs/futures/lat…