In coroutines, Flow is a type that emits multiple values sequentially, rather than a suspending function that returns a single value. For example, you can use Flow to receive real-time updates from a database.
Dataflows are built on top of coroutines and can provide multiple values. Flow is conceptually a data flow that can be computed asynchronously. The emitted values must be of the same type. For example, Flow
is a flow that emits integer values.
A dataflow is very similar to an Iterator
that produces a sequence of values, but uses suspending functions to produce and consume values asynchronously. This means, for example, that Flow can safely make a network request to generate the next value without blocking the main thread.
Data flow involves three entities:
- Providers generate data that is added to the data stream. Dataflows can also generate data asynchronously thanks to coroutines.
- **(Optional)** Intermediaries can modify values sent to the data stream, or modify the data stream itself.
- Consumer consumes the value from the stream.
In Android, the code base is usually the provider of UI data, which uses the UI as the consumer of the final display data. Other times, the UI layer is a producer of user input events, and other layers of the hierarchy consume them. Layers between providers and consumers typically act as intermediaries, modifying the data flow to suit the requirements of the next layer.
Create a Flow
To create a flow, use the flow
builder API. The flow builder function creates a new Flow where you can use the emit function to manually send new values into the data flow.
In the following example, the data source automatically fetches the latest news at regular intervals. Since the suspending function cannot return multiple consecutive values, the data source creates and returns a data stream to meet this requirement. In this case, the data source acts as the provider.
class NewsRemoteDataSource( private val newsApi: NewsApi, private val refreshIntervalMs: Long = 5000 ) { val latestNews: Flow<List<ArticleHeadline>> = flow { while(true) { val latestNews = newsApi. fetchLatestNews() emit(latestNews) // send the result of the request to the stream delay(refreshIntervalMs) // suspend the coroutine for a period of time } } } // Interface that provides a way to make network requests via the suspend function interface NewsApi { suspend fun fetchLatestNews(): List<ArticleHeadline> }
flow
builders are executed in coroutines. As such, it benefits from the same asynchronous API, with some limitations:
- Data flow is sequential. When a provider within a coroutine calls a suspending function, the provider hangs until the suspending function returns. In the example, the provider hangs until the
fetchLatestNews
network request completes. Only then will the result of the request be sent into the data stream. - With the
flow
builder, the producer cannot emit values from a different CoroutineContext. Therefore, do not call emit in a different CoroutineContext by creating a new coroutine or using a withContext block. In these cases you can use other flow builders such as callbackFlow.
Modify data flow
Mediations can use intermediate operators to modify the data flow without using these values. These operators are functions that, when applied to a data stream, set up a chain of operations that will not be executed until the value is used in the future. Learn more about intermediate operators in the Flow reference documentation.
In the following example, the repository layer uses the intermediate operator map to transform the data to be displayed on the View
:
class NewsRepository( private val newsRemoteDataSource: NewsRemoteDataSource, private val userData: UserData ) { /** * Returns the favorite latest news feed for the corresponding stream conversion. These actions are lazy and do not trigger a process. * They are just the current values emitted by the transformation stream at that point in time. */ val favoriteLatestNews: Flow<List<ArticleHeadline>> = newsRemoteDataSource.latestNews // Intermediate operation to filter the list of favorite topics .map { news -> news. filter { userData. isFavoriteTopic(it) } } // Intermediate operation that saves the latest message in the cache .onEach { news -> saveInCache(news) } }
Intermediate operators can be applied one after the other, forming a chain of operations that executes lazily as data items are sent to the data stream. Note that simply applying an intermediate operator to a dataflow does not initiate dataflow collection.
Collect from Flow
Use the terminal operator to trigger a data flow to start listening for a value. To get all emitted values in a stream, use collect. You can learn more about terminal operators in the official Flow documentation.
Since collect
is a suspending function, it needs to be executed in a coroutine. It accepts a lambda as an argument to invoke on each new value. Since it is a suspending function, the coroutine calling collect
may hang until the process is closed.
Continuing with the previous example, here is a simple implementation of a ViewModel that consumes data from the repository layer:
class LatestNewsViewModel( private val newsRepository: NewsRepository ) : ViewModel() { init { viewModelScope. launch { // use collect to trigger the stream and use its elements newsRepository.favoriteLatestNews.collect { favoriteNews -> // Update the view with the latest favorite newsletter } } } }
Gathering data streams triggers the provider to refresh with the latest news and issue the results of network requests at regular intervals. Since the provider is always kept alive in the while(true) loop, the data flow will be closed when the ViewModel
is cleared and the viewModelScope
is canceled.
Collection data flow may stop for the following reasons:
- As shown in the example above, coroutine collection is canceled. This action also inactivates the underlying provider.
- The provider finishes emitting the data item. In this case, the data stream is closed and the coroutine that called
collect
continues execution.
Data streams are always cold and execute lazily unless the stream is specified with another intermediate operator. This means that the provider’s code is executed every time the terminal operator is invoked on the stream. In the previous example, having multiple stream collectors would cause the data source to fetch the latest messages multiple times at different regular intervals. To optimize and share data streams when multiple consumers collect at the same time, use the shareIn operator.
Catch unexpected exceptions
The provider’s data implementation can come from a third-party library. This means it may throw unexpected exceptions. To handle these exceptions, use the catch intermediate operator.
class LatestNewsViewModel( private val newsRepository: NewsRepository ) : ViewModel() { init { viewModelScope. launch { newsRepository.favoriteLatestNews // Intermediary capture operator. If an exception is thrown, // capture and update UI .catch { exception -> notifyError(exception) } .collect { favoriteNews -> // Update the view with the latest favorite newsletter } } } }
In the preceding example, when an exception occurs, the collect
lambda is not invoked because the new data item has not been received.
catch
also performs an emit
operation, emitting data items to the data stream. The example repository layer could instead emit
the cached value:
class NewsRepository(...) { val favoriteLatestNews: Flow<List<ArticleHeadline>> = newsRemoteDataSource.latestNews .map { news -> news. filter { userData. isFavoriteTopic(it) } } .onEach { news -> saveInCache(news) } // If an error occurs, emit the last cached value .catch { exception -> emit(lastCachedNews()) } }
In this example, the collect
lambda is invoked when an exception occurs because a new data item was sent into the data stream due to the exception.
Execute in different CoroutineContext
By default, the provider of a flow
builder executes in the CoroutineContext
of the coroutines it collects from, and as stated earlier, it cannot be accessed from a different The CoroutineContext
performs an emit
operation on the value. In some cases, this behavior may not be desirable. For example, in the example used in this article, the repository layer should not perform operations on Dispatchers.Main
used by viewModelScope
.
To change the flow’s CoroutineContext
, use the intermediate operator flowOn
. flowOn
changes the CoroutineContext
of the upstream flow, which means the provider and any intermediate operators applied before (or on top of) flowOn
. Downstream dataflows (intermediate operators and consumers later than flowOn
) are unaffected and will execute on the CoroutineContext to perform collect
operations from the dataflow. If there are multiple flowOn
operators, each operator changes the upstream data flow from its current position.
class NewsRepository( private val newsRemoteDataSource: NewsRemoteDataSource, private val userData: UserData, private val defaultDispatcher: CoroutineDispatcher ) { val favoriteLatestNews: Flow<List<ArticleHeadline>> = newsRemoteDataSource.latestNews .map { news -> // execute on default scheduler news. filter { userData. isFavoriteTopic(it) } } .onEach { news -> // execute on default scheduler saveInCache(news) } // flowOn affects upstream flows ↑ .flowOn(defaultDispatcher) // downstream stream ↓ is unaffected .catch { exception -> // Executed in the context of the consumer emit(lastCachedNews()) } }
Using this code, ·
onEach
and map
operators use defaultDispatcher
, while catch
operators and users are in viewModelScope
code> is executed on Dispatchers.Main
used.
Since the data source layer is doing I/O work, you should use a scheduler optimized for I/O operations:
class NewsRemoteDataSource( ..., private val ioDispatcher: CoroutineDispatcher ) { val latestNews: Flow<List<ArticleHeadline>> = flow { // execute on the IO scheduler ... } .flowOn(ioDispatcher) }
Data Flow in the Jetpack Library
Flow has been integrated into many Jetpack libraries and is popular among Android third-party libraries. Flow is great for real-time data updates and unlimited data streams.
You can use Flow with Room to be notified when changes are made to your database. When using Data Access Objects (DAO), return the Flow type for real-time updates.
@Dao abstract class ExampleDao { @Query("SELECT * FROM Example") abstract fun getExamples(): Flow<List<Example>> }
Whenever the Example
data table changes, a new list is emitted containing new data items from the database.
Transform callback-based APIs into data streams
callbackFlow is a dataflow builder that lets you convert callback-based APIs into dataflows.
Unlike the flow
builder, callbackFlow
allows sending from a different CoroutineContext
or within a coroutine using the offer
function The value is emitted externally.
Inside the coroutine, callbackFlow
uses a channel, which is conceptually very similar to a blocking queue. Channels have a capacity configuration, which limits the upper limit of the number of elements that can be buffered. Channels created in callbackFlow
have a default capacity of 64 elements. When you try to add a new element to a full channel, send
will suspend the data provider until there is room for the new element, while offer
will not add the related element into the channel and will immediately return false.
A Kotlin Channel is similar to a blocking queue, except that a Channel uses a pending send operation instead of a blocked put, and a pending receive operation instead of a blocked take.
Using the launchWhenX method in the lifecycle-runtime-ktx library, the collection coroutine for Channel will be suspended when the component lifecycle < X, so as to avoid exceptions. You can also use repeatOnLifecycle(State) to collect at the UI layer. When the life cycle < State, the coroutine will be canceled, and the coroutine will be restarted when it is restored.
It seems that using Channel to carry events is a good choice, and generally speaking, event distribution is one-to-one, so there is no need to support one-to-many BroadcastChannel (the latter has been gradually abandoned and replaced by SharedFlow)
How to create a Channel? Take a look at the available construction methods exposed by Channel, and consider passing in appropriate parameters.
public fun <E> Channel( // Buffer capacity, when the capacity is exceeded, the strategy specified by onBufferOverflow will be triggered capacity: Int = RENDEZVOUS, // Buffer overflow strategy, the default is suspended, there are DROP_OLDEST and DROP_LATEST onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, // Handle the situation where the element fails to be delivered successfully, such as the subscriber is canceled or an exception is thrown onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E>
First of all, the Channel is hot, that is, sending elements to the Channel at any time will be executed even if there are no subscribers. Therefore, considering that there is a situation where an event is sent when the subscriber coroutine is canceled, that is, there is a situation where the Channel receives an event when there is no subscriber. For example, when the Activity uses the repeatOnLifecycle method to start the coroutine to consume the event messages in the Channel held by the ViewModel, the current Activity cancels the coroutine because it is in the STOPED state.
StateFlow (state flow) and SharedFlow (shared flow)
StateFlow
and SharedFlow
are Flow APIs that allow dataflows to optimally emit state updates and emit values to multiple consumers.
StateFlow and SharedFlow, both of which have many features of Channel, can be seen as an important operation to push Flow to the front and hide Channel behind the scenes.
First of all, both are hot streams, and support emitting data outside the constructor. Just look at their construction methods
public fun <T> MutableSharedFlow( // The number of replays received when each new subscriber subscribes, default 0 replay: Int = 0, // In addition to the replay number, the cache capacity defaults to 0 extraBufferCapacity: Int = 0, // The strategy when the buffer area overflows, the default is to suspend. onBufferOverflow will only take effect if there is at least one subscriber. When there are no subscribers, only the value of the latest replay number will be saved, and onBufferOverflow is invalid. onBufferOverflow: BufferOverflow = BufferOverflow. SUSPEND )
//MutableStateFlow is equivalent to SharedFlow using the following construction parameters MutableSharedFlow( replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST )
StateFlow
StateFlow
is a state-container observable flow that emits current state updates and new state updates to its collectors. The current state value can also be read through its value
attribute. To update the state and send it to the data flow, assign a new value to the value
property of the MutableStateFlow
class.
In Android, StateFlow
is great for classes that need to keep mutable state observable.
Following the example in Kotlin Data Flow, you can expose StateFlow
from LatestNewsViewModel
so that View
can listen to interface state updates and make the screen state itself in the configuration It remains in effect after the change.
class LatestNewsViewModel( private val newsRepository: NewsRepository ) : ViewModel() { // Backing property to avoid state updates from other classes private val _uiState = MutableStateFlow(LatestNewsUiState. Success(emptyList())) // The UI collects from this StateFlow to get its state updates val uiState: StateFlow<LatestNewsUiState> = _uiState init { viewModelScope. launch { newsRepository.favoriteLatestNews // Update View with the latest favorite news // Writes to the value property of MutableStateFlow, // adding a new element to the flow and updating all // of its collectors .collect { favoriteNews -> _uiState.value = LatestNewsUiState.Success(favoriteNews) } } } } // Represents different states for the LatestNews screen sealed class LatestNewsUiState { data class Success(news: List<ArticleHeadline>): LatestNewsUiState() data class Error(exception: Throwable): LatestNewsUiState() }
The class responsible for updating the MutableStateFlow
is the provider, and all the classes collected from the StateFlow
are the consumers. Unlike cold flows built using the flow
builder, StateFlow
is a hot flow: collecting data from such flows does not trigger any provider code. A StateFlow
is always alive and in memory, and is only eligible for garbage collection if no other references to it are involved in the garbage collection root.
When a new consumer starts collecting data from a stream, it receives the most recent state and any subsequent states in the stream. You can find this behavior in other observable classes like LiveData
.
As with any other data flow, the View
listens to the StateFlow
:
class LatestNewsActivity : AppCompatActivity() { private val latestNewsViewModel = // getViewModel() override fun onCreate(savedInstanceState: Bundle?) { ... // start the coroutine in the scope of the life cycle lifecycleScope. launch { // each time ifecycle is in STARTED state (or higher), repeatOnLifecycle starts the block in a new coroutine, // and cancel it when it stops. repeatOnLifecycle(Lifecycle. State. STARTED) { // Trigger the process and start listening for values. // Note that this happens when the lifecycle starts and stops collection when the lifecycle stops latestNewsViewModel.uiState.collect { uiState -> // new value received when (uiState) { is LatestNewsUiState.Success -> showFavoriteNews(uiState.news) is LatestNewsUiState.Error -> showError(uiState.exception) } } } } } }
Warning: If the UI needs to be updated, never use the launch
or launchIn
extension functions to collect streams directly from the UI. These functions handle events even when the View is not visible. This behavior may cause the app to crash. To avoid this, use the repeatOnLifecycle
API (shown above).
Note: The repeatOnLifecycle
API is only available in the androidx.lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01
library and higher.
To transform any dataflow into a StateFlow
, use the stateIn
intermediate operator.
StateFlow, Flow, and LiveData
StateFlow
and LiveData
share similarities. Both are observable data container classes and both follow similar patterns when used in application architecture.
But note that StateFlow
and LiveData
do behave differently:
StateFlow
requires an initial state to be passed to the constructor, whileLiveData
does not.- When the View enters the
STOPPED
state,LiveData.observe()
will automatically unregister the consumer and collect data fromStateFlow
or any other data flow operation does not stop automatically. To achieve the same behavior, you need to collect the stream from theLifecycle.repeatOnLifecycle
block.
Use shareIn
to make cold data flow into hot data flow
A StateFlow
is a hot dataflow that will remain in memory as long as it is collected, or any other reference to it exists in the garbage collection root. You can use the shareIn
operator to turn cold data streams into hot data streams.
Take the callbackFlow
created in Kotlin data flow as an example, instead of creating a new data flow for each collector, you can use shareIn
to share between collectors Data retrieved from Firestore. You need to pass in the following:
CoroutineScope
for shared data flow. The lifetime of this scoped function should outlive any consumers, so that the shared data stream remains alive long enough.- The number of data items to replay to each new collector.
- “Startup” conduct policy.
class NewsRemoteDataSource(..., private val externalScope: CoroutineScope, ) { val latestNews: Flow<List<ArticleHeadline>> = flow { ... }.shareIn( externalScope, replay = 1, started = SharingStarted. WhileSubscribed() ) }
In this example, the latestNews
stream replays the last emitted data item to the new collector, which will is always active. The SharingStarted.WhileSubscribed()
“start” policy will keep the upstream provider active while there are active subscribers. Other startup policies can be used, such as SharingStarted.Eagerly
to start the provider immediately, SharingStarted.Lazily
to start sharing data after the first subscriber appears, and use Data streams remain active forever.
SharedFlow
The shareIn
function returns a hot data flow SharedFlow
that emits data to all consumers that collect values from it. SharedFlow
is a highly configurable generalized data flow of StateFlow
.
You don’t need to use shareIn
to create a SharedFlow
. For example, you could use SharedFlow
to send tick information to the rest of your app so that everything refreshes at the same time periodically. In addition to getting the latest news, you may also want to refresh the user info section with a collection of the user’s favorite topics. In the following snippet, TickHandler
exposes SharedFlow
so that other classes know when to refresh their content. As with StateFlow
, use a fallback property of type MutableSharedFlow
in your class to send data items to the data flow:
// The class that is focused when the content of the application needs to be refreshed class TickHandler( private val externalScope: CoroutineScope, private val tickIntervalMs: Long = 5000 ) { // Backing property to avoid flow emissions from other classes private val _tickFlow = MutableSharedFlow<Unit>(replay = 0) val tickFlow: SharedFlow<Event<String>> = _tickFlow init { externalScope. launch { while(true) { _tickFlow. emit(Unit) delay(tickIntervalMs) } } } } class NewsRepository( ..., private val tickHandler: TickHandler, private val externalScope: CoroutineScope ) { init { externalScope. launch { // Listen for tick updates tickHandler. tickFlow. collect { refreshLatestNews() } } } suspend fun refreshLatestNews() { ... } ... }
You can customize SharedFlow
behavior in the following ways:
With replay
, you can resend multiple previously emitted values to new subscribers.
onBufferOverflow
allows you to specify policies for handling when the buffer is full of items to be sent. The default is BufferOverflow.SUSPEND
, which causes the caller to hang. Other options include DROP_LATEST
or DROP_OLDEST
.
MutableSharedFlow
also has a subscriptionCount
property that contains the number of active collectors so you can optimize your business logic accordingly. MutableSharedFlow
also includes a resetReplayCache
function for use in cases where you do not want to replay the most recent information that has been sent to the flow.
SharedFlow
discards data when there are no subscribers. SharedFlow
is similar to BroadcastChannel, which supports subscription by multiple subscribers, allowing the same event to be consumed multiple times.
Related official documents:
https://developer.android.com/kotlin/flow
https://kotlinlang.org/docs/flow.html
Welfare at the end of the article
If you want to become an architect or want to break through the 20-30K salary range, then don’t be limited to coding and business, but you must be able to select models, expand, and improve programming thinking. In addition, a good career plan is also very important, and the habit of learning is very important, but the most important thing is to be able to persevere. Any plan that cannot be implemented consistently is empty talk.
If you have no direction, here I would like to share with you a set of “Advanced Notes on the Eight Major Modules of Android” written by the senior architect of Ali, to help you organize the messy, scattered and fragmented knowledge systematically, so that you can systematically and efficiently Master the various knowledge points of Android development.
Compared with the fragmented content we usually read, the knowledge points of this note are more systematic, easier to understand and remember, and are arranged strictly according to the knowledge system.
Full set of video materials:
1. Interview Collection
2. Collection of source code analysis
3. Collection of open source frameworks
Welcome everyone to support with one click and three links. If you need the information in the article, you can directly scan the CSDN official certification WeChat card at the end of the article to get it for free↓↓↓