Asynchronous Programming in Rust: Exploring the futures library (futures-rs)

In the process of learning Rust asynchronous programming, whether active or passive, you will come into contact with many scenarios of using the futures library. To learn Rust asynchronous programming in depth, you need to explore the whole picture of futures in general, so as to better grasp Rust asynchronous programming.

futures-rs is a class library officially provided by rust (ie: futures project library), which is the basis of Rust asynchronous programming and provides an asynchronous runtime (Runtime). Including the definition of key traits such as Stream, and macros such as join!, select! and various future combinators are used to control asynchronous processes.

Note: The future type defined in futures-rs is the original implementation of future in the standard library. Rust moved the core Future trait into the standard library and changed it to std::future::Future to implement the async/await syntax. In a sense, std::future::Future can be seen as the smallest subset of futures::future::Future.

The futures-rs? library provides many functions, mastering them is helpful for asynchronous programming:

  • Many concurrency tools are provided: join!, select!
  • Provides TryFuture, FusedFuture, Stream, etc.?trait
  • Provides many extension methods of Future, TryFuture, Stream
  • Provides support for asynchronous IO
  • Provided?Sink
  • Provides thread pool, asynchronous executor

Official documentation: docs.rs/futures/lat…

future-rs contains a series of crates, such as:

crate name description
futures Only re-export the following crate, and provide API for external calls
futures-core contains core traits and types
futures-task Contains tools for processing tasks
futures-channel Contains various channels for asynchronous communication
futures-executor Asynchronous task executor based on future-rs library
futures-io contains IO-related abstract traits
futures-sink Includes Sink trait
futures-macro Includes join!, try_join!, select! and other macro implementations
futures-util contains some commonly used tools and extends some traits

Code Structure

Official code: github.com/rust-lang/f…

  • ├── ci // contains a sh file
  • ├── examples // Contains two demos
  • ├── futures-test // General tool library for testing futures-rs (not test cases)
  • ├── futures // Provides re-exported APIs that are called externally by the crate below, but not all
  • ├── futures-channel // Asynchronous communication channel implementation, including onshot and mpsc
  • ├── futures-core // core trait and type of futures library
  • ├── futures-executor // asynchronous task executor of futures library
  • ├── futures-io // Asynchronous IO-related abstract traits, such as AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, etc.
  • ├── futures-macro // Provide join!, try_join!, select!, pin_mut! and other macro implementations
  • ├── futures-sink // Sink trait
  • ├── futures-task // tools for processing tasks, such as FutureObj/LocalFutureObj struct, Spawn/LocalSpawn ArcWake trait,
  • ├── futures-util // General utility class library, extended traits (such as AsyncReadExt, SinkExt, SpawnExt)

Using the ?futures library

Using futures can import all the functions under futures-rs, or only import the required functions, and it can work without the standard library. The import only needs to add dependencies in cargo.toml, for example:

[dependencies]
// Solution 1: Import all crates of the futures library
futures = "0.3"
// Solution 2:: only introduce thread pool crate
futures = { version = "0.3", features = ["thread-pool"] }
// Scenario 3: To use ?`futures` in a `[no_std]` environment, use:
futures = { version = "0.3", default-features = false }

Sample code:

// main.rs
use futures::channel::mpsc; // message channel
use futures::executor; // future executor
use futures::executor::ThreadPool; // thread pool
use futures::StreamExt; // Stream stream extension (such as some stream operation combinations)

fn main() {<!-- -->
    let pool = ThreadPool::new(). expect("Failed to build pool");
    let (tx, rx) = mpsc::unbounded::<i32>();

    // Use async block to create a future and return a Future implementation
    // No executor has been provided for this future at this time, so it will not run
    let fut_values = async {<!-- -->
        // Create another async block, which will also generate the implementation of Future,
        // It is inside the parent async block, so after the execution of the parent async block, the executor is provided for the child async block to execute
        // The executor connection is done by the second parameter std::task::Context of Future::poll,
        // It represents our executor, and the Future generated by the child async block can be polled (using the executor of the parent async block)
        let fut_tx_result = async move {<!-- -->
            (0..100).for_each(|v| {<!-- -->
                tx.unbounded_send(v).expect("Failed to send");
            })
        };
        
        // Use the spawn method of the thread pool to transfer the generated future
        pool.spawn_ok(fut_tx_result);

        // operate the combination factor
        let fut_values = rx
            .map(|v| v * 2)
            .collect();

        //Use the executeue provided by the async block to wait for fut_values to complete
        fut_values. await
    };
    // Really call the fut_values future above, execute its poll method and the poll method of the sub-future in the Future, and finally drive fut_values to be driven to complete, and return the result
    let values: Vec<i32> = executor::block_on(fut_values);

    println!("Values={:?}", values);
}

futures Crate

The futures crate only re-exports other crates under futures-rs and provides APIs for external calls, but it does not export all of them, but provides the export of some commonly used modules, traits, functions and macros.

Note: futures-rs provides a wide variety of APIs, and we may only use a small part of them. Stjepang extracts some of them to form a lightweight futures version. For details, see github.com/stjepang/fu …

For example, provide re-exports (Re-exports) asynchronous related modules, traits, macros, etc.:

// trait export
pub use futures_core::future::Future;
pub use futures_core::future::TryFuture;
pub use futures_util::future::FutureExt;
pub use futures_util::future::TryFutureExt;
pub use futures_core::stream::Stream;
pub use futures_core::stream::TryStream;
pub use futures_util::stream::StreamExt;
pub use futures_util::stream::TryStreamExt;
pub use futures_sink::Sink;
pub use futures_util::sink::SinkExt;
pub use futures_io::AsyncBufRead;
pub use futures_io::AsyncRead;
pub use futures_io::AsyncSeek;
pub use futures_io::AsyncWrite;
pub use futures_util::AsyncBufReadExt;
pub use futures_util::AsyncReadExt;
pub use futures_util::AsyncSeekExt;
pub use futures_util::AsyncWriteExt;

// module export
pub use futures_util::{<!-- -->future, sink, stream, task};

// macro export
pub use futures_util::{<!-- -->join, try_join, select, select_biased, pin_mut};

The source code of futures/src/lib.rs is as follows:

//! Abstractions for asynchronous programming.
//! This crate provides a number of core abstractions for writing asynchronous code:
// The futures crate provides a series of core abstractions related to asynchronous programming. It only re-exports other crates under futures-rs and provides APIs for external calls

#[doc(no_inline)]
pub use futures_core::future::{<!-- -->Future, TryFuture}; // export Future and TryFuture
#[doc(no_inline)]
pub use futures_util::future::{<!-- -->FutureExt, TryFutureExt}; // export FutureExt and FutureExt

#[doc(no_inline)]
pub use futures_core::stream::{<!-- -->Stream, TryStream}; // export Stream and TryStream
#[doc(no_inline)]
pub use futures_util::stream::{<!-- -->StreamExt, TryStreamExt};// export StreamExt and TryStreamExt

#[doc(no_inline)]
pub use futures_sink::Sink; // export Sink
#[doc(no_inline)]
pub use futures_util::sink::SinkExt; // export SinkExt

#[cfg(feature = "std")]
#[doc(no_inline)]
// export AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite
pub use futures_io::{<!-- -->AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
#[cfg(feature = "std")]
#[doc(no_inline)]
// export AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt
pub use futures_util::{<!-- -->AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};

// Macro reexports
pub use futures_core::ready; // export ready macro
pub use futures_util::pin_mut; // export pin_mut macro
#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::select; // export select macro
#[cfg(feature = "async-await")]
// Export some macros, such as: join, pending, poll, select_biased, try_join
pub use futures_util::{<!-- -->join, pending, poll, select_biased, try_join}; // Async-await

// Module reexports
#[doc(inline)]
// Export a series of modules, such as: future, sink, stream, task
pub use futures_util::{<!-- -->future, sink, stream, task};

#[cfg(feature = "std")]
#[cfg(feature = "async-await")]
pub use futures_util::stream_select; // export stream_select macro

#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_channel as channel; // export futures_channel
#[cfg(feature = "alloc")]
#[doc(inline)]
pub use futures_util::lock; // export lock tool

#[cfg(feature = "std")]
#[doc(inline)]
pub use futures_util::io; // export io tool

#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
pub mod executor {<!-- -->
    //! Built-in executors and related tools.
    //! This module is only available when the `executor` feature of this library is activated.
    // Export some tools under the executor, such as block_on, block_on_stream, etc.
    pub use futures_executor::{<!-- -->
        block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
        LocalSpawner,
    };

    #[cfg(feature = "thread-pool")]
    #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
    // Export the thread pool under the executor: ThreadPool and ThreadPoolBuilder
    pub use futures_executor::{<!-- -->ThreadPool, ThreadPoolBuilder};
}

#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
pub mod compat {<!-- -->
    //! Interop between `futures` 0.1 and 0.3.
    //! This module is only available when the `compat` feature of this library is activated.
    // Export feature compatibility related tools under the futures_util library
    pub use futures_util::compat::{<!-- -->
        Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
        Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
    };

    #[cfg(feature = "io-compat")]
    #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
     // Export some compatible tools related to IO under the futures_util library
    pub use futures_util::compat::{<!-- -->AsyncRead01CompatExt, AsyncWrite01CompatExt};
}

/// Export some pre-imported common items of futures crate, which need to be imported manually: use futures::prelude::*;
pub mod prelude {<!-- -->
    //! A "prelude" for crates using the `futures` crate.
    
    pub use crate::future::{<!-- -->self, Future, TryFuture};
    pub use crate::sink::{<!-- -->self, Sink};
    pub use crate::stream::{<!-- -->self, Stream, TryStream};

    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::future::{<!-- -->FutureExt as _, TryFutureExt as _};
    #[doc(no_inline)]
    pub use crate::sink::SinkExt as _;
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::stream::{<!-- -->StreamExt as _, TryStreamExt as _};

    #[cfg(feature = "std")]
    pub use crate::io::{<!-- -->AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};

    #[cfg(feature = "std")]
    #[doc(no_inline)]
    #[allow(unreachable_pub)]
    pub use crate::io::{<!-- -->
        AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
    };
}

Official Documentation:

  • futures-rs: github.com/rust-lang/f…
  • futures crates: crates.io/crates/futu…
  • futures doc: docs.rs/futures/lat…
  • futures-futures: docs.rs/futures/lat…
  • Future trait: docs.rs/futures/lat…
  • FutureExt tait: docs.rs/futures/lat…
  • TryFutureExt tait: docs.rs/futures/lat…