Kotlin Flow (data flow) on Android, from shallow to deep

In coroutines, Flow is a type that emits multiple values sequentially, rather than a suspending function that returns a single value. For example, you can use Flow to receive real-time updates from a database.

Dataflows are built on top of coroutines and can provide multiple values. Flow is conceptually a data flow that can be computed asynchronously. The emitted values must be of the same type. For example, Flow is a flow that emits integer values.

A dataflow is very similar to an Iterator that produces a sequence of values, but uses suspending functions to produce and consume values asynchronously. This means, for example, that Flow can safely make a network request to generate the next value without blocking the main thread.

Data flow involves three entities:

  • Providers generate data that is added to the data stream. Dataflows can also generate data asynchronously thanks to coroutines.
  • **(Optional)** Intermediaries can modify values sent to the data stream, or modify the data stream itself.
  • Consumer consumes the value from the stream.

In Android, the code base is usually the provider of UI data, which uses the UI as the consumer of the final display data. Other times, the UI layer is a producer of user input events, and other layers of the hierarchy consume them. Layers between providers and consumers typically act as intermediaries, modifying the data flow to suit the requirements of the next layer.

Create a Flow

To create a flow, use the flow builder API. The flow builder function creates a new Flow where you can use the emit function to manually send new values into the data flow.

In the following example, the data source automatically fetches the latest news at regular intervals. Since the suspending function cannot return multiple consecutive values, the data source creates and returns a data stream to meet this requirement. In this case, the data source acts as the provider.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi. fetchLatestNews()
            emit(latestNews) // send the result of the request to the stream
            delay(refreshIntervalMs) // suspend the coroutine for a period of time
        }
    }
}

// Interface that provides a way to make network requests via the suspend function
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow builders are executed in coroutines. As such, it benefits from the same asynchronous API, with some limitations:

  • Data flow is sequential. When a provider within a coroutine calls a suspending function, the provider hangs until the suspending function returns. In the example, the provider hangs until the fetchLatestNews network request completes. Only then will the result of the request be sent into the data stream.
  • With the flow builder, the producer cannot emit values from a different CoroutineContext. Therefore, do not call emit in a different CoroutineContext by creating a new coroutine or using a withContext block. In these cases you can use other flow builders such as callbackFlow.

Modify data flow

Mediations can use intermediate operators to modify the data flow without using these values. These operators are functions that, when applied to a data stream, set up a chain of operations that will not be executed until the value is used in the future. Learn more about intermediate operators in the Flow reference documentation.

In the following example, the repository layer uses the intermediate operator map to transform the data to be displayed on the View :

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news feed for the corresponding stream conversion. These actions are lazy and do not trigger a process.
     * They are just the current values emitted by the transformation stream at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news. filter { userData. isFavoriteTopic(it) } }
            // Intermediate operation that saves the latest message in the cache
            .onEach { news -> saveInCache(news) }
}

Intermediate operators can be applied one after the other, forming a chain of operations that executes lazily as data items are sent to the data stream. Note that simply applying an intermediate operator to a dataflow does not initiate dataflow collection.

Collect from Flow

Use the terminal operator to trigger a data flow to start listening for a value. To get all emitted values in a stream, use collect. You can learn more about terminal operators in the official Flow documentation.

Since collect is a suspending function, it needs to be executed in a coroutine. It accepts a lambda as an argument to invoke on each new value. Since it is a suspending function, the coroutine calling collect may hang until the process is closed.

Continuing with the previous example, here is a simple implementation of a ViewModel that consumes data from the repository layer:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope. launch {
                // use collect to trigger the stream and use its elements
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update the view with the latest favorite newsletter
            }
        }
    }
}

Gathering data streams triggers the provider to refresh with the latest news and issue the results of network requests at regular intervals. Since the provider is always kept alive in the while(true) loop, the data flow will be closed when the ViewModel is cleared and the viewModelScope is canceled.

Collection data flow may stop for the following reasons:

  • As shown in the example above, coroutine collection is canceled. This action also inactivates the underlying provider.
  • The provider finishes emitting the data item. In this case, the data stream is closed and the coroutine that called collect continues execution.

Data streams are always cold and execute lazily unless the stream is specified with another intermediate operator. This means that the provider’s code is executed every time the terminal operator is invoked on the stream. In the previous example, having multiple stream collectors would cause the data source to fetch the latest messages multiple times at different regular intervals. To optimize and share data streams when multiple consumers collect at the same time, use the shareIn operator.

Catch unexpected exceptions

The provider’s data implementation can come from a third-party library. This means it may throw unexpected exceptions. To handle these exceptions, use the catch intermediate operator.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope. launch {
            newsRepository.favoriteLatestNews
                // Intermediary capture operator. If an exception is thrown,
                // capture and update UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update the view with the latest favorite newsletter
                }
        }
    }
}

In the preceding example, when an exception occurs, the collect lambda is not invoked because the new data item has not been received.

catch also performs an emit operation, emitting data items to the data stream. The example repository layer could instead emit the cached value:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news. filter { userData. isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error occurs, emit the last cached value
            .catch { exception -> emit(lastCachedNews()) }
}

In this example, the collect lambda is invoked when an exception occurs because a new data item was sent into the data stream due to the exception.

Execute in different CoroutineContext

By default, the provider of a flow builder executes in the CoroutineContext of the coroutines it collects from, and as stated earlier, it cannot be accessed from a different The CoroutineContext performs an emit operation on the value. In some cases, this behavior may not be desirable. For example, in the example used in this article, the repository layer should not perform operations on Dispatchers.Main used by viewModelScope.

To change the flow’s CoroutineContext, use the intermediate operator flowOn. flowOn changes the CoroutineContext of the upstream flow, which means the provider and any intermediate operators applied before (or on top of) flowOn. Downstream dataflows (intermediate operators and consumers later than flowOn) are unaffected and will execute on the CoroutineContext to perform collect operations from the dataflow. If there are multiple flowOn operators, each operator changes the upstream data flow from its current position.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // execute on default scheduler
                news. filter { userData. isFavoriteTopic(it) }
            }
            .onEach { news -> // execute on default scheduler
                saveInCache(news)
            }
            // flowOn affects upstream flows ↑
            .flowOn(defaultDispatcher)
            // downstream stream ↓ is unaffected
            .catch { exception -> // Executed in the context of the consumer
                emit(lastCachedNews())
            }
}

Using this code, ·
onEach and map operators use defaultDispatcher, while catch operators and users are in viewModelScope code> is executed on Dispatchers.Main used.

Since the data source layer is doing I/O work, you should use a scheduler optimized for I/O operations:

class NewsRemoteDataSource(
    ...,
    private val ioDispatcher: CoroutineDispatcher
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        // execute on the IO scheduler
        ...
    }
        .flowOn(ioDispatcher)
}

Data Flow in the Jetpack Library

Flow has been integrated into many Jetpack libraries and is popular among Android third-party libraries. Flow is great for real-time data updates and unlimited data streams.

You can use Flow with Room to be notified when changes are made to your database. When using Data Access Objects (DAO), return the Flow type for real-time updates.

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Whenever the Example data table changes, a new list is emitted containing new data items from the database.

Transform callback-based APIs into data streams

callbackFlow is a dataflow builder that lets you convert callback-based APIs into dataflows.
Unlike the flow builder, callbackFlow allows sending from a different CoroutineContext or within a coroutine using the offer function The value is emitted externally.
Inside the coroutine, callbackFlow uses a channel, which is conceptually very similar to a blocking queue. Channels have a capacity configuration, which limits the upper limit of the number of elements that can be buffered. Channels created in callbackFlow have a default capacity of 64 elements. When you try to add a new element to a full channel, send will suspend the data provider until there is room for the new element, while offer will not add the related element into the channel and will immediately return false.

A Kotlin Channel is similar to a blocking queue, except that a Channel uses a pending send operation instead of a blocked put, and a pending receive operation instead of a blocked take.

Using the launchWhenX method in the lifecycle-runtime-ktx library, the collection coroutine for Channel will be suspended when the component lifecycle < X, so as to avoid exceptions. You can also use repeatOnLifecycle(State) to collect at the UI layer. When the life cycle < State, the coroutine will be canceled, and the coroutine will be restarted when it is restored.

It seems that using Channel to carry events is a good choice, and generally speaking, event distribution is one-to-one, so there is no need to support one-to-many BroadcastChannel (the latter has been gradually abandoned and replaced by SharedFlow)

How to create a Channel? Take a look at the available construction methods exposed by Channel, and consider passing in appropriate parameters.

public fun <E> Channel(

    // Buffer capacity, when the capacity is exceeded, the strategy specified by onBufferOverflow will be triggered
    capacity: Int = RENDEZVOUS,

    // Buffer overflow strategy, the default is suspended, there are DROP_OLDEST and DROP_LATEST
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,

    // Handle the situation where the element fails to be delivered successfully, such as the subscriber is canceled or an exception is thrown
    onUndeliveredElement: ((E) -> Unit)? = null

): Channel<E>

First of all, the Channel is hot, that is, sending elements to the Channel at any time will be executed even if there are no subscribers. Therefore, considering that there is a situation where an event is sent when the subscriber coroutine is canceled, that is, there is a situation where the Channel receives an event when there is no subscriber. For example, when the Activity uses the repeatOnLifecycle method to start the coroutine to consume the event messages in the Channel held by the ViewModel, the current Activity cancels the coroutine because it is in the STOPED state.

StateFlow (state flow) and SharedFlow (shared flow)

StateFlow and SharedFlow are Flow APIs that allow dataflows to optimally emit state updates and emit values to multiple consumers.

StateFlow and SharedFlow, both of which have many features of Channel, can be seen as an important operation to push Flow to the front and hide Channel behind the scenes.

First of all, both are hot streams, and support emitting data outside the constructor. Just look at their construction methods

public fun <T> MutableSharedFlow(

    // The number of replays received when each new subscriber subscribes, default 0
    replay: Int = 0,

    // In addition to the replay number, the cache capacity defaults to 0
    extraBufferCapacity: Int = 0,

    // The strategy when the buffer area overflows, the default is to suspend. onBufferOverflow will only take effect if there is at least one subscriber. When there are no subscribers, only the value of the latest replay number will be saved, and onBufferOverflow is invalid.
    onBufferOverflow: BufferOverflow = BufferOverflow. SUSPEND
)

//MutableStateFlow is equivalent to SharedFlow using the following construction parameters

MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

StateFlow

StateFlow is a state-container observable flow that emits current state updates and new state updates to its collectors. The current state value can also be read through its value attribute. To update the state and send it to the data flow, assign a new value to the value property of the MutableStateFlow class.

In Android, StateFlow is great for classes that need to keep mutable state observable.

Following the example in Kotlin Data Flow, you can expose StateFlow from LatestNewsViewModel so that View can listen to interface state updates and make the screen state itself in the configuration It remains in effect after the change.

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState. Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope. launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
    data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(exception: Throwable): LatestNewsUiState()
}

The class responsible for updating the MutableStateFlow is the provider, and all the classes collected from the StateFlow are the consumers. Unlike cold flows built using the flow builder, StateFlow is a hot flow: collecting data from such flows does not trigger any provider code. A StateFlow is always alive and in memory, and is only eligible for garbage collection if no other references to it are involved in the garbage collection root.

When a new consumer starts collecting data from a stream, it receives the most recent state and any subsequent states in the stream. You can find this behavior in other observable classes like LiveData .

As with any other data flow, the View listens to the StateFlow:

class LatestNewsActivity : AppCompatActivity() {
    private val latestNewsViewModel = // getViewModel()

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
        // start the coroutine in the scope of the life cycle
        lifecycleScope. launch {
            // each time ifecycle is in STARTED state (or higher), repeatOnLifecycle starts the block in a new coroutine,
            // and cancel it when it stops.
            repeatOnLifecycle(Lifecycle. State. STARTED) {
                // Trigger the process and start listening for values.
                // Note that this happens when the lifecycle starts and stops collection when the lifecycle stops
                latestNewsViewModel.uiState.collect { uiState ->
                    // new value received
                    when (uiState) {
                        is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
                        is LatestNewsUiState.Error -> showError(uiState.exception)
                    }
                }
            }
        }
    }
}

Warning: If the UI needs to be updated, never use the launch or launchIn extension functions to collect streams directly from the UI. These functions handle events even when the View is not visible. This behavior may cause the app to crash. To avoid this, use the repeatOnLifecycle API (shown above).

Note: The repeatOnLifecycle API is only available in the androidx.lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01 library and higher.

To transform any dataflow into a StateFlow, use the stateIn intermediate operator.

StateFlow, Flow, and LiveData

StateFlow and LiveData share similarities. Both are observable data container classes and both follow similar patterns when used in application architecture.

But note that StateFlow and LiveData do behave differently:

  • StateFlow requires an initial state to be passed to the constructor, while LiveData does not.
  • When the View enters the STOPPED state, LiveData.observe() will automatically unregister the consumer and collect data from StateFlow or any other data flow operation does not stop automatically. To achieve the same behavior, you need to collect the stream from the Lifecycle.repeatOnLifecycle block.

Use shareIn to make cold data flow into hot data flow

A StateFlow is a hot dataflow that will remain in memory as long as it is collected, or any other reference to it exists in the garbage collection root. You can use the shareIn operator to turn cold data streams into hot data streams.

Take the callbackFlow created in Kotlin data flow as an example, instead of creating a new data flow for each collector, you can use shareIn to share between collectors Data retrieved from Firestore. You need to pass in the following:

  • CoroutineScope for shared data flow. The lifetime of this scoped function should outlive any consumers, so that the shared data stream remains alive long enough.
  • The number of data items to replay to each new collector.
  • “Startup” conduct policy.
class NewsRemoteDataSource(...,
    private val externalScope: CoroutineScope,
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        ...
    }.shareIn(
        externalScope,
        replay = 1,
        started = SharingStarted. WhileSubscribed()
    )
}

In this example, the latestNews stream replays the last emitted data item to the new collector, which will is always active. The SharingStarted.WhileSubscribed() “start” policy will keep the upstream provider active while there are active subscribers. Other startup policies can be used, such as SharingStarted.Eagerly to start the provider immediately, SharingStarted.Lazily to start sharing data after the first subscriber appears, and use Data streams remain active forever.

SharedFlow

The shareIn function returns a hot data flow SharedFlow that emits data to all consumers that collect values from it. SharedFlow is a highly configurable generalized data flow of StateFlow.

You don’t need to use shareIn to create a SharedFlow. For example, you could use SharedFlow to send tick information to the rest of your app so that everything refreshes at the same time periodically. In addition to getting the latest news, you may also want to refresh the user info section with a collection of the user’s favorite topics. In the following snippet, TickHandler exposes SharedFlow so that other classes know when to refresh their content. As with StateFlow , use a fallback property of type MutableSharedFlow in your class to send data items to the data flow:

// The class that is focused when the content of the application needs to be refreshed
class TickHandler(
    private val externalScope: CoroutineScope,
    private val tickIntervalMs: Long = 5000
) {
    // Backing property to avoid flow emissions from other classes
    private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
    val tickFlow: SharedFlow<Event<String>> = _tickFlow

    init {
        externalScope. launch {
            while(true) {
                _tickFlow. emit(Unit)
                delay(tickIntervalMs)
            }
        }
    }
}

class NewsRepository(
    ...,
    private val tickHandler: TickHandler,
    private val externalScope: CoroutineScope
) {
    init {
        externalScope. launch {
            // Listen for tick updates
            tickHandler. tickFlow. collect {
                refreshLatestNews()
            }
        }
    }

    suspend fun refreshLatestNews() { ... }
    ...
}

You can customize SharedFlow behavior in the following ways:

With replay, you can resend multiple previously emitted values to new subscribers.
onBufferOverflow allows you to specify policies for handling when the buffer is full of items to be sent. The default is BufferOverflow.SUSPEND, which causes the caller to hang. Other options include DROP_LATEST or DROP_OLDEST.
MutableSharedFlow also has a subscriptionCount property that contains the number of active collectors so you can optimize your business logic accordingly. MutableSharedFlow also includes a resetReplayCache function for use in cases where you do not want to replay the most recent information that has been sent to the flow.

SharedFlow discards data when there are no subscribers. SharedFlow is similar to BroadcastChannel, which supports subscription by multiple subscribers, allowing the same event to be consumed multiple times.

Related official documents:
https://developer.android.com/kotlin/flow
https://kotlinlang.org/docs/flow.html

Welfare at the end of the article

If you want to become an architect or want to break through the 20-30K salary range, then don’t be limited to coding and business, but you must be able to select models, expand, and improve programming thinking. In addition, a good career plan is also very important, and the habit of learning is very important, but the most important thing is to be able to persevere. Any plan that cannot be implemented consistently is empty talk.

If you have no direction, here I would like to share with you a set of “Advanced Notes on the Eight Major Modules of Android” written by the senior architect of Ali, to help you organize the messy, scattered and fragmented knowledge systematically, so that you can systematically and efficiently Master the various knowledge points of Android development.

Compared with the fragmented content we usually read, the knowledge points of this note are more systematic, easier to understand and remember, and are arranged strictly according to the knowledge system.

Full set of video materials:

1. Interview Collection


2. Collection of source code analysis

3. Collection of open source frameworks

Welcome everyone to support with one click and three links. If you need the information in the article, you can directly scan the CSDN official certification WeChat card at the end of the article to get it for free↓↓↓