Kotlin data flow overview

Article directory

    • 1. What is data flow?
    • 2. Create data flow
    • 3. Modify data flow
    • 4. Collect from data streams
    • 5. Data flow capture exception
    • 6. Execute in different CoroutineContext
    • 7. Data flow in Jetpack library
    • 8. Convert callback-based APIs into data streams

1 What is data flow

Dataflows are built on coroutines and can provide multiple values. Conceptually, a data stream is a sequence of data that can be computationally processed asynchronously.

The data flow contains three entities:

  • Providers generate data that is added to the data flow. Thanks to coroutines, data streams can also generate data asynchronously.
  • (Optional) Mediators can modify the values sent to the data flow, or modify the data flow itself.
  • Consumers use the values from the data stream.

2 Create data flow

To create a data flow, use the data flow builder API. The flow builder function creates a new data flow, and you can use the emit function to manually send new values to the data flow.

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {<!-- -->
    val latestNews: Flow<List<ArticleHeadline>> = flow {<!-- -->
        while(true) {<!-- -->
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {<!-- -->
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

The flow builder executes within a coroutine. Therefore, it will benefit from the same asynchronous API, but with some limitations:

  • The data flow is ordered. When a provider within a coroutine calls a suspending function, the provider suspends until the suspending function returns. In this example, the provider hangs until the fetchLatestNews network request completes. Only then will the request results be sent to the data stream.
  • When using the flow builder, providers cannot provide emit values from different CoroutineContexts. Therefore, do not call emit in a different CoroutineContext by creating a new coroutine or using a withContext block. In these cases, other data flow builders can be used, such as callbackFlow.

Three Modify data flow

Mediators can use intermediate operators such as map to modify the data flow without using the value. These operators are functions that, when applied to a data stream, set up a chain of operations that are not executed yet until the value is used in the future.

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {<!-- -->
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map {<!-- --> news -> news.filter {<!-- --> userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach {<!-- --> news -> saveInCache(news) }
}

Four Collecting from data streams

Use the terminal operator to trigger the data stream to start listening for values. To get all emitted values in the data stream, use collect

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {<!-- -->

    init {<!-- -->
        viewModelScope.launch {<!-- -->
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect {<!-- --> favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

Data flow collection may stop for the following reasons:

  • As shown in the above example, the coroutine collection is cancelled. This action will also deactivate the underlying provider.
  • The provider completes issuing the data item. In this case, the data flow is closed and the coroutine that called collect continues execution.

5 Data flow capture exception

Use catch intermediate operator

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {<!-- -->

    init {<!-- -->
        viewModelScope.launch {<!-- -->
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch {<!-- --> exception -> notifyError(exception) }
                .collect {<!-- --> favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

Six execution in different CoroutineContext

The provider of a flow builder executes through the CoroutineContext of the coroutine it was collected from, and as mentioned before, it cannot emit values from a different CoroutineContext. To change the CoroutineContext of a data flow, use the intermediate operator flowOn

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {<!-- -->
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map {<!-- --> news -> // Executes on the default dispatcher
                news.filter {<!-- --> userData.isFavoriteTopic(it) }
            }
            .onEach {<!-- --> news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch {<!-- --> exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Data flow in seven Jetpack libraries

Flow with Room Receive notifications about database changes

@Dao
abstract class ExampleDao {<!-- -->
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

Eight Convert callback-based API to data stream

callbackFlow is a data flow builder that allows you to convert callback-based APIs into data flows.

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {<!-- -->
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {<!-- -->

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {<!-- -->
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {<!-- -->
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener {<!-- --> snapshot, _ ->
            if (snapshot == null) {<!-- --> return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {<!-- -->
                offer(snapshot.getEvents())
            } catch (e: Throwable) {<!-- -->
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or canceled.
        // In this case, remove the callback from Firestore
        awaitClose {<!-- --> subscription?.remove() }
    }
}