Android Datastore dynamic creation and source code analysis

Knowledge points involved

1. Principle of coroutines —-> A very good blog introduction. A short story explains what is the relationship between processes, threads, and Kotlin coroutines?
2. Channel knowledge points—->Android-kotlin-Channel is explained in detail
3. Coroutines: CompletableDeferred and structured concurrency

Encapsulated DataStoreUtils tool->gitHub

Purpose of this blog

The company’s use of SharedPreferences can easily lead to ANR. We are investigating whether we can use DataStore to replace the company’s current SharedPreferences to solve the ANR problem, so we need to study the source code first.

Directory
  • Version introduction
  • Migrate SharedPreferences data to dataStore
  • Dynamically create DataStore
  • Store parameters
  • Summarize
Version introduction
implementation "androidx.datastore:datastore-preferences:1.0.0"
Migrate SharedPreferences data to dataStore

Since the data is being migrated, the data stored in SharedPreferences needs to be migrated to the dataStore, so the dataStore needs to be built first.
The current case demo of building and migrating DataStore online is as follows:

//Migration use
private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(
    name = "userSharePreFile",
    produceMigrations = { context ->
        listOf(
            SharedPreferencesMigration(
                context,
                 "userSharePreFile"
            )
        )
    }
)

//or 
//This way of constructing DataStore is available in the alpha version and cannot be found in the 1.0.0 version.
var dataStore: DataStore<Preferences> = context.createDataStore(
        name = "userSharePreFile"
 )
//or
//Build directly
private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(
        name = "userSharePreFile"
)

The above three ways of writing are all DataStores created by extending the Context. Therefore, the above creation methods have a disadvantage, that is, the name needs to be known in advance before creation. If the way you created SharedPreferences before was to pass in the name from the outside, The above method of directly creating a DataStore is obviously not suitable for you.

Look through the source code of the old version (alpha version) to find out how to build DataStore
//alpha version construction method
var dataStore: DataStore<Preferences> = context.createDataStore(
        name = "userSharePreFile"
 )

fun Context.createDataStore(
    name: String,
    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
    //①
    migrations: List<DataMigration<Preferences>> = listOf(),
    //②
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): DataStore<Preferences> =
    PreferenceDataStoreFactory.create(
        //③
        produceFile = {
            File(this.filesDir, "datastore/$name.preferences_pb")
        },
        corruptionHandler = corruptionHandler,
        migrations = migrations,
        scope = scope
    )

It can be clearly seen that PreferenceDataStoreFactory.create is used to return DataStore
① The name of the SharedPreferences file that needs to be migrated during the build
② Indicate that the coroutine is running in IO
③ The location where new files are stored
Let’s look at another way to create a DataStore through by preferencesDataStore

private val Context.dataStore: DataStore<Preferences> by preferencesDataStore(
        name = "userSharePreFile"
)

public fun preferencesDataStore(
    name: String,
    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
    //①
    produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
    //②
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): ReadOnlyProperty<Context, DataStore<Preferences>> {
    return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
}

internal class PreferenceDataStoreSingletonDelegate internal constructor(
    private val name: String,
    private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
    private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
    private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {

    private val lock = Any()

    @GuardedBy("lock")
    @Volatile
    private var INSTANCE: DataStore<Preferences>? = null

    override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
        return INSTANCE ?: synchronized(lock) {
            if (INSTANCE == null) {
                val applicationContext = thisRef.applicationContext

                INSTANCE = PreferenceDataStoreFactory.create(
                    corruptionHandler = corruptionHandler,
                    migrations = produceMigrations(applicationContext),
                    scope = scope
                ) {
                    applicationContext.preferencesDataStoreFile(name)
                }
            }
            INSTANCE!!
        }
    }
}

//File storage location
public fun Context.preferencesDataStoreFile(name: String): File =
    this.dataStoreFile("$name.preferences_pb")

Off topic: Here is the syntax for using kotlin delegate attributes by keywords
① SharedPreferences files that need to be migrated
② The coroutine runs on IO

It can be seen that the old version (alpha) and by preferencesDataStore two solutions ultimately return to the DataStore through PreferenceDataStoreFactory.create. Let’s continue to look at the specific implementation logic of PreferenceDataStoreFactory.kt.

//PreferenceDataStoreFactory.kt
 public fun create(
        corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
        //Migrated share file collection
        migrations: List<DataMigration<Preferences>> = listOf(),
         //IO
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
        //The directory location where the dataStore file is stored
        produceFile: () -> File
    ): DataStore<Preferences> {
        val delegate = DataStoreFactory.create(//Create SingleProcessDataStore
            serializer = PreferencesSerializer,
            corruptionHandler = corruptionHandler,
            migrations = migrations,
            scope = scope
        ) {
            //Omit code
        }
        //Pass in SingleProcessDataStore
        return PreferenceDataStore(delegate)
    }

//Here we actively call the updateData method. If we do not actively call it, the migration logic will not be triggered.
//The extension function DataStore<Preferences>.edit below will talk about this.
internal class PreferenceDataStore(private val delegate: DataStore<Preferences>):
    DataStore<Preferences> by delegate {
    override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
        Preferences {
            return delegate.updateData {
                val transformed = transform(it)
                (transformed as MutablePreferences).freeze()
                transformed
            }
        }
}

Continue to look at DataStoreFactory.create

//DataStoreFactory.kt
fun <T> create(
        produceFile: () -> File,
        serializer: Serializer<T>,
        corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
        migrations: List<DataMigration<T>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    ): DataStore<T> =
        //Find the finally created class
        SingleProcessDataStore(
            produceFile = produceFile,
            serializer = serializer,
            corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
            initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
            scope = scope
        )

The truth is known so far, and the DataStore is ultimately returned through SingleProcessDataStore.

Let’s use a picture to summarize the creation of the old alpha version and the calling logic chain of the new version by preferencesDataStore.

DataStore.jpg

Okay, now that we know this much, let’s start dynamically building the DataStore

Dynamicly create DataStore
 fun preferencesMigrationDataStore(sharedPreferName: String) {
    val dataStore = PreferenceDataStoreFactory.create(
      corruptionHandler = ReplaceFileCorruptionHandler<Preferences>(
        produceNewData = { emptyPreferences() }
      ),
    //The name of the sharePrefer file that needs to be migrated
     migrations = listOf(SharedPreferencesMigration(mContext, sharedPreferName)),
    //IO
     scope = CoroutineScope(Dispatchers.IO + SupervisorJob())) {
    //dataStore file name
     mContext.preferencesDataStoreFile(sharedPreferName)
     }
  
    runBlocking {
        //This line of code must be executed, otherwise the migration logic will not be used
         dataStore.updateData {
              it.toPreferences()
           }
      }
    }

migrations: indicates the sharedPreference file you want to migrate
scope: indicates that writing data is in IO
After executing the above code, the .xml will disappear, and then there will be an additional /datastore/xxx.preferences_pb file in the files directory.
Do not repeatedly execute the file migration plan for a SharedPreferences, otherwise an error will be reported. For example, you were performing migration one second ago and continued to perform migration the next second.
SharedPrefs.png
dataStore_migrate.jpg

####Storage parameters

/**
 * @key parameter
 * @value specific value
 */
 private fun putInt(key:String, value: Int) {
    runBlocking {
         dataStore.edit {//①
                it[intPreferencesKey(key)] = value
          }
       }
   }
//Similar ones are as follows, these are parameters provided by Google
intPreferencesKey
doublePreferencesKey
stringPreferencesKey
....

Look at the details of ①, click edit, and find that it is an extension function

public suspend fun DataStore<Preferences>.edit(
    transform: suspend (MutablePreferences) -> Unit
): Preferences {
    return this.updateData {//The call is PreferenceDataStore.updateData()
        //it.toMutablePreferences() returns something like map
        it.toMutablePreferences().apply { transform(this) }
    }
}

transform is the content inside the caller {}. Next, let’s look at the code of the PreferenceDataStore class.

//From the previous part of the code, we can know that delegate = SingleProcessDataStore
internal class PreferenceDataStore(private val delegate: DataStore<Preferences>):
    DataStore<Preferences> by delegate {
    override suspend fun updateData(transform: suspend (t: Preferences) -> Preferences):
        Preferences {
            return delegate.updateData {//Call SingleProcessDataStore.updateData
                //Return to the previous {} which is it.toMutablePreferences().apply { transform(this) }
                val transformed = transform(it)
                (transformed as MutablePreferences).freeze()
                transformed //Get the user's content data that needs to be changed
            }
        }
}

delegate.updateData() is called in the code, so continue to look at the updateData of SingleProcessDataStore

SingleProcessDataStore.kt
 override suspend fun updateData(transform: suspend (t: T) -> T): T {
        val ack = CompletableDeferred<T>()
        val currentDownStreamFlowState = downstreamFlow.value
        //The coroutine body is encapsulated into Message.Update. coroutineContext is the context of the coroutine, which is the thread started by our runBlocking. Here is main
        val updateMsg = Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
        //Distribute messages, his class is SimpleActor
        actor.offer(updateMsg)
        //You will get Preferences here, how to get it? There will be an update.ack.completeWith method later, which will return
        returnack.await()
    }
internal class SimpleActor<T>(
    private val scope: CoroutineScope,//Dispatchers.IO + SupervisorJob()
    onComplete: (Throwable?) -> Unit,
    onUndeliveredElement: (T, Throwable?) -> Unit,
    private val consumeMessage: suspend (T) -> Unit
) {
    private val messageQueue = Channel<T>(capacity = UNLIMITED)
    private val remainingMessages = AtomicInteger(0)
    //... omit
    //Here is to add the message body just encapsulated here.
    fun offer(msg: T) {
        check(
            //Send the encapsulated message body
            messageQueue.trySend(msg)
                .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") }
                .isSuccess
        )
        if (remainingMessages.getAndIncrement() == 0) {
            scope.launch {
                check(remainingMessages.get() > 0)
                do {
                   // scope = Dispatchers.IO + SupervisorJob()
                    scope.ensureActive()
                    //Take out the encapsulated message body and then perform task processing
                    consumeMessage(messageQueue.receive())
                } while (remainingMessages.decrementAndGet() != 0)
            }
        }
    }
}

tip: Here is the use of Channel for coroutine communication. Channel can handle concurrency situations.
At this point, we can know that we distribute tasks from the runBlocking (main main thread) coroutine to Dispatchers.IO

private val actor = SimpleActor<Message<T>>(
        scope = scope,// CoroutineScope(Dispatchers.IO + SupervisorJob())
        onComplete = {//....omitted},
        onUndeliveredElement = { msg, ex ->
          //.....omitted
      ) { msg ->
        //Process distributed tasks, msg is the updateMsg just encapsulated
        when (msg) {
            is Message.Read -> {//Read
                handleRead(msg)
            }
            is Message.Update -> {//Update
                handleUpdate(msg)
            }
        }
    }
 private suspend fun handleUpdate(update: Message.Update<T>) {
        update.ack.completeWith(
            runCatching {
                when (val currentState = downstreamFlow.value) {
                    is Data -> {
                        //Write data to file
                        transformAndWrite(update.transform, update.callerContext)
                    }
                    is ReadException, is UnInitialized -> {
                        if (currentState === update.lastState) {
                            //Read file file ①
                            readAndInitOrPropagateAndThrowFailure()
                            //Write data to file ②
                            transformAndWrite(update.transform, update.callerContext)
                        } else {
                            throw (currentState as ReadException).readException
                        }
                    }

                    is Final -> throw currentState.finalException // won't happen
                }
            }
        )
    }

Use downstreamFlow.value = UnInitialized for the first time.
Here we should pay attention to the update.ack.completeWith function, which returns the result successfully.

It is shown here again to tell everyone where to wait for the result to be returned.
 override suspend fun updateData(transform: suspend (t: T) -> T): T {
        val ack = CompletableDeferred<T>()
        val currentDownStreamFlowState = downstreamFlow.value
        val updateMsg =
            Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
        actor.offer(updateMsg)
        return ack.await() //Here is waiting for the result of update.ack.completeWith to return, so if you don't add this line, the main thread will not be stuck.
    }

Therefore, using runBlocking will block the main thread. If you still have UI refresh, serious cases will cause ANR problems.

Let’s not talk about the previous one, let’s continue and look at the reading of ①

 private suspend fun readAndInitOrPropagateAndThrowFailure() {
        try {
            readAndInit()
        } catch (throwable: Throwable) {
            downstreamFlow.value = ReadException(throwable)
            throw throwable
        }
    }

 private suspend fun readAndInit() {
        check(downstreamFlow.value == UnInitialized || downstreamFlow.value is ReadException)
        //This is a lock, exclusive to coroutines. For details, please see https://www.kotlincn.net/docs/reference/coroutines/shared-mutable-state-and-concurrency.html
        val updateLock = Mutex()
        //Read dataStore file
        var initData = readDataOrHandleCorruption()
        var initializationComplete: Boolean = false
        
        //Here is shareprefence converted to dataStore
        val api = object : InitializerApi<T> {
            override suspend fun updateData(transform: suspend (t: T) -> T): T {
                return updateLock.withLock() {
                    if (initializationComplete) {
                        throwIllegalStateException(
                            "InitializerApi.updateData should not be " +
                                "called after initialization is complete."
                        )
                    }
                    //Transform is the method to migrate data
                    val newData = transform(initData)
                    //What is done here is to compare the old and new values. If they are different, write them.
                    if (newData != initData) {
                        //write file
                        writeData(newData)
                        initData = newData
                    }
                    initData
                }
            }
        }
        //initTasks contains the SharedPreferences collection that needs to be converted.
        initTasks?.forEach { it(api) }
        initTasks = null
        updateLock.withLock {
            initializationComplete = true
        }
        //Here is the data after migration is completed, stored in flow.value
        downstreamFlow.value = Data(initData, initData.hashCode())
    }

//Read dataStore file
private suspend fun readDataOrHandleCorruption(): T {
        try {
            return readData()
        } catch (ex: CorruptionException) {
            val newData: T = corruptionHandler.handleCorruption(ex)
            try {
                writeData(newData)
            } catch (writeEx: IOException) {
                ex.addSuppressed(writeEx)
                throw ex
            }
            return newData
        }
    }

 private suspend fun readData(): T {
        try {
            FileInputStream(file).use { stream ->
                return serializer.readFrom(stream)
            }
        } catch (ex: FileNotFoundException) {
            if (file.exists()) {
                throw ex
            }
            return serializer.defaultValue
        }
    }

file is the dataStore we store, and the directory is in “datastore/$name.preferences_pb”

After reading ①, let’s take a look at ② Write data to file. The method of writing data is transformAndWrite()

//....
transformAndWrite(update.transform, update.callerContext)
//...

 private suspend fun transformAndWrite(
         //Comes from Message.Update.transform package
        transform: suspend (t: T) -> T,
        //Comes from Message.Update.callerContext encapsulation
        callerContext: CoroutineContext
    ): T {
        val curDataAndHash = downstreamFlow.value as Data<T>
        curDataAndHash.checkHashCode()

        val curData = curDataAndHash.value
        //Here callerContext is our runBlocking, main (main thread)
        //Here, the old value is returned to the caller, and then the new parameters are obtained from the caller.
        val newData = withContext(callerContext) { transform(curData) }

        curDataAndHash.checkHashCode()
        //Here is data comparison
        return if (curData == newData) {
            curData
        } else {
            //data input
            writeData(newData)
            //Save into flow.value
            downstreamFlow.value = Data(newData, newData.hashCode())
            newData
        }
    }

private val SCRATCH_SUFFIX = ".tmp"
//data input
internal suspend fun writeData(newData: T) {
        file.createParentDirectories()
        //The file created here is "datastore/$name.preferences_pb.tmp"
        val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
        try {
            FileOutputStream(scratchFile).use { stream ->
                serializer.writeTo(newData, UncloseableOutputStream(stream))
                stream.fd.sync()
            }
            //Rename back to file, where file is the name of our target file dataStore
            if (!scratchFile.renameTo(file)) {
                //Rename fails and throws exception
                throwIOException(
                    "Unable to rename $scratchFile." +
                        "This likely means that there are multiple instances of DataStore " +
                        "for this file. Ensure that you are only creating a single instance of " +
                        "datastore for this file."
                )
            }
        } catch (ex: IOException) {
            if (scratchFile.exists()) {
                scratchFile.delete()
            }
            throw ex
        }
    }

At this point, we have completed the entire process of updating the value.

Summary

1. File writing occurs at the IO level
2. Using runBlocking will block the main thread. If there is a need to refresh the UI at this time, serious ANR will occur.

/**
 * @key parameter
 * @value specific value
 */
 private fun putInt(key:String, value: Int) {
    runBlocking {
         dataStore.edit {
                it[intPreferencesKey(key)] = value
          }
       }
   }

public suspend fun DataStore<Preferences>.edit(
    transform: suspend (MutablePreferences) -> Unit
): Preferences {
    return this.updateData {
        it.toMutablePreferences().apply { transform(this) }
    }
}

//update logic
 private suspend fun handleUpdate(update: Message.Update<T>) {
        update.ack.completeWith(//Notification result callback
            //.....save
        )
    }

//transform is the content in {} above
 override suspend fun updateData(transform: suspend (t: T) -> T): T {
        val ack = CompletableDeferred<T>()
        val currentDownStreamFlowState = downstreamFlow.value
        val updateMsg =
            Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
        actor.offer(updateMsg)
        return ack.await() //Here is waiting for the result of update.ack.completeWith to return, so if you don't add this line, the main thread will not be stuck.
        //Off topic, the code will be executed without adding ack.await()
    }

Therefore, you can consider using withContext(IO){read/update waiting operation}

3. When updating parameters, it will be compared with the old value. If the values are the same, they will not be written. Otherwise, they will be written to the file and the value of flow.value will be updated.

 return if (curData == newData) {
            curData
        } else {
            writeData(newData)
            downstreamFlow.value = Data(newData, newData.hashCode())
            newData
        }

4. Solve concurrency problems, use channels to solve communication and concurrency between coroutines, single-threaded IO update files and concurrency

5. If SharedPreference has been migrated to DataStore, you should not continue to use SharedPreferences. If you continue to use SharedPreferences, the value will be different from that of DataStore.