Coroutine Overview
Subprograms, or functions, are hierarchical calls in all languages. For example: A calls B, B calls C during execution, C returns after execution, B returns after execution, and finally A completes execution. So subroutines are implemented through the stack, and a thread executes a subroutine. Subroutine calls always have one entrance and one return, and the calling sequence is clear.
The coroutine looks like a subprogram. During the execution process, you can interrupt in the subprogram to execute other subprograms, and you can come back and continue execution at the appropriate time.
How Kotlin coroutines work
The general execution process of Kotlin coroutines is shown in the figure above. This process is roughly followed when executing various types of coroutines. It is not a strictly precise execution process.
Create and start the coroutine
fun create.main() { //1. Create coroutine body val coroutine = suspend { println("in coroutine") 5 }.createCoroutine(object: Continuation<Int> { override fun resumeWith(result: Result<Int>) { println("coroutine end: $result") } ? override val context: CoroutineContext get() = EmptyCoroutineContext ? }) ? //2. Execute coroutine coroutine.resume(Unit) }
The output of the above code:
in coroutine coroutine end: Success(5)
Execution process of coroutine
The call stack flow is as follows
- The coroutine we get through suspend block#createCoroutine is actually a SafeContinuation object
- SafeContinuation is actually a proxy class, and the delegate property is the real Continuation object.
- The code in the suspend block is executed in BaseContinuationImpl
- Our anonymous inner class object Continuation is called back
How does the suspend block become a coroutine body and be executed?
We analyze the call stack and know that resumeWith is ultimately executed in BaseContinuationImpl. Let’s take a look at the code.
@SinceKotlin("1.3") internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result: Result<Any?>) { var current = this var param = result while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) //1. The suspend block is executed here if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() if (completion is BaseContinuationImpl) { current = completion param=outcome } else { completion.resumeWith(outcome) //2. Our anonymous inner class is called back here return } } } } ? protected abstract fun invokeSuspend(result: Result<Any?>): Any? //3. Abstract method }
At code comment 1., current.invokeSuspend is called to execute the coroutine body we defined, proving that the suspend block is actually a subclass of BaseContinuationImpl
At 2., after the coroutine body is executed, our code receives the completion callback
At 3., you can find that invokeSuspend is an abstract method, and suspend block is the specific implementation of this method.
Next, I will use breakpoints to further analyze which subclass the suspend block is executed through.
You can see that current is a file named {file}
method
{method}
The object in the format of method {variable}$1 proves that the kotlin compiler will help us generate a subclass of BaseContinuationImpl after encountering the suspend keyword.
So, what exactly is this subcategory? After compiling kt into .class and opening it through jadx, the resulting java code is as follows
public final class CreateCoroutineKt { public static final void create.main() { Continuation coroutine = ContinuationKt.createCoroutine(new CreateCoroutineKt$create.main$coroutine$1(null), new CreateCoroutineKt$create.main$coroutine$2()); Unit unit = Unit.INSTANCE; Result.Companion companion = Result.Companion; coroutine.resumeWith(Result.constructor-impl(unit)); } } final class CreateCoroutineKt$create.main$coroutine$1 extends SuspendLambda implements Function1<Continuation<? super Integer>, Object> { int label; ? CreateCoroutineKt$create.main$coroutine$1(Continuation<? super CreateCoroutineKt$create.main$coroutine$1> continuation) { super(1, continuation); } ? @NotNull public final Continuation<Unit> create(@NotNull Continuation<?> continuation) { return new CreateCoroutineKt$create.main$coroutine$1(continuation); } ? @Nullable public final Object invoke(@Nullable Continuation<? super Integer> continuation) { return create(continuation).invokeSuspend(Unit.INSTANCE); } ? @Nullable public final Object invokeSuspend(@NotNull Object obj) { IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ResultKt.throwOnFailure(obj); System.out.println((Object) "in coroutine"); //Logic of coroutine body return Boxing.boxInt(5); default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } } }
It is obvious that the kt compiler helped us change the suspend keyword into a subclass of SuspendLambda, and rewrote the invokeSuspend method. It is not difficult to guess that SuspendLambda inherits from BaseContinuationImp.
Kotlin full analysis document: “Kotlin Manual”Click to view detailed categories
Coroutine context and scheduler
The context of a coroutine is usually represented by the CoroutineContext type. This type is defined in the Kotlin standard library.
In a coroutine, the context is a collection of various elements. The leading element among them is Job.
When we understand the concurrency and scheduling of coroutines, we involve Job. Kotlin coroutine combines the suspend function and the async keyword to implement concurrent operations of the coroutine (zinyan.com)
This article continues to learn more about Job.
Schedulers and threads
What is a scheduler? The scheduler is a control object that determines which thread or threads a coroutine will execute on.
It can limit the execution of the coroutine to a specific thread, assign the coroutine to a thread pool, or allow the coroutine to run without restrictions.
Coroutine context object: CoroutineContext.
Coroutine dispatcher object: CoroutineDispatcher.
And we usually can define the CoroutineContext object through optional parameters when using launch or async. Then it will help us specify a scheduler object. You can also use the Dispatchers object to define the scheduler
Example:
import kotlinx.coroutines.* ? fun main() = runBlocking<Unit> { //Run in the context of the parent coroutine, that is, runBlocking main coroutine launch { println("main runBlocking : My working thread ${Thread.currentThread().name}") } //Scheduled into the mainline layer and not restricted launch(Dispatchers.Unconfined) { // println("Unconfined : My working thread ${Thread.currentThread().name}") println("What is the end of this node") } //Scheduled to the default thread launch(Dispatchers.Default) { println("Default : My working thread ${Thread.currentThread().name}") } // Schedule to a new thread launch(newSingleThreadContext("ZinyanThread")) { println("ZinyanThreadContext: My working thread ${Thread.currentThread().name}") } } //Output Unconfined : The thread main where I work Default: My working thread DefaultDispatcher-worker-1 ZinyanThreadContext: The thread I work on ZinyanThread main runBlocking : the thread I work on main
The following describes the above four scheduling logics.
launch{…}: By default, it will inherit the context and scheduler from the coroutine object that launched it.
Our example above is that the context is inherited from the runBlocking coroutine object in the main thread, and the result shows that it is running in the main thread.
Dispatchers.Unconfined: It is a special scheduler. In the above example, it runs on the main line layer. But there is a note called unrestricted scheduler. Then you can see that its output is the fastest and earliest. But it just runs to the first suspension point. After suspending, it resumes the coroutine in the thread, and this is entirely determined by the suspending function that was called. The unconstrained scheduler is ideal for coroutines that perform tasks that do not consume CPU time, and that do not update any shared data (such as UI) that is restricted to a specific thread.
It will inherit the external coroutine object by default. When it is restricted to the caller thread, inheriting from it will effectively restrict the coroutine to run on that thread and have predictable FIFO scheduling.
example:
fun main() = runBlocking<Unit> { // Unrestricted - will work with the main thread launch(Dispatchers.Unconfined) { println("Unconfined : worker thread ${Thread.currentThread().name}") delay(500) println("Unconfined : Thread delay ${Thread.currentThread().name}") } launch { //Context of parent coroutine, main runBlocking coroutine println("main runBlocking: worker thread ${Thread.currentThread().name}") delay(1000) println("main runBlocking: thread delay ${Thread.currentThread().name}") } } //Output Unconfined: worker thread main main runBlocking: worker thread main Unconfined: thread delay kotlinx.coroutines.DefaultExecutor main runBlocking: thread delay main
Therefore, the context of the coroutine is inherited from the runBlocking {…} coroutine and runs in the main thread. When the delay function is called, the unrestricted coroutine resumes execution in the default executor thread.
The unconstrained scheduler is an advanced mechanism that can help in certain corner cases without the need to schedule the coroutine for later execution or to have undesirable side effects, since some operations must be performed immediately in the coroutine. The unrestricted scheduler should not be used in normal code.
Dispatchers.Default: Default dispatcher. The default scheduler uses a shared background thread pool. So launch(Dispatchers.Default) { … } uses the same scheduler as GlobalScope.launch { … }.
newSingleThreadContext(“MyOwnThread”): Customized coroutine thread layer. A thread is started for the coroutine to run. A dedicated thread is a very expensive resource. In actual development both must be released when no longer needed, using the close function, or stored in a top-level variable so that it can be reused throughout the application. Otherwise, thread flooding will occur.
Jumps in different threads
Implement the jump of two coroutine threads. Example:
fun main() = runBlocking<Unit> { newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { println("Open Ctx1") withContext(ctx2) { println("Ctx2 starts working") } println("Return to Ctx1 ") } } } } //Output Turn on Ctx1 Ctx2 starts working Return to Ctx1
In this example, a context is explicitly specified using runBlocking. And then use withContext in the coroutine to change the context of the coroutine while still residing in the same coroutine.
Get the above output. In this example, the threads created by newSingleThreadContext() are used, and we use the use function in the standard library to release the thread. Avoid thread abuse.
Job in context
The Job in a coroutine is part of the context and can be retrieved from the context using the coroutineContext[Job] expression.
Example:
fun main() = runBlocking<Unit> { println("My job is ${coroutineContext[Job]}") } //Output My job is BlockingCoroutine{Active}@1de0aca6
So what does this do? For example, we can query the activity status of the coroutine
Example:
fun main() = runBlocking<Unit> { println("My job is ${coroutineContext[Job]}") var s = coroutineContext[Job]?.isActive println(s) } //Output true
It shows that my current coroutine object is active.
And why add “?” That’s because the object may be null.
Sub-coroutine
When a coroutine is started in a CoroutineScope by another coroutine, it will inherit the context of the main coroutine through CoroutineScope.coroutineContext. And the Job object of this new coroutine will become the child Job object of the parent coroutine.
When a parent coroutine is canceled, all its child coroutines will also be canceled recursively.
However, when using GlobalScope to start a coroutine, the new coroutine’s job has no parent job. Therefore it is independent of the scope of this launch and operates independently.
Example:
fun main() = runBlocking<Unit> { // Start a coroutine to handle some kind of incoming request (request) val request = launch { // Two sub-jobs are hatched, one of which is started through GlobalScope GlobalScope.launch { println("job1: I run in the coroutine started by GlobalScope") delay(1000) println("job1: After waiting for 1 second, you will find that I am not affected by the cancellation method") } // The other one inherits the context of the parent coroutine launch { delay(100) println("job2: I am a child coroutine object started by a parent coroutine") delay(1000) println("job2: Wait for 1 second. If the parent coroutine is canceled, I will also be canceled. This line should not be printed") } } delay(500) request.cancel() // Cancel the execution of the request (request) delay(1000) // Delay for one second to see what happens println("main: after the entire coroutine is canceled") } //Output job1: I run in the coroutine started by GlobalScope job2: I am a child coroutine object started by the parent coroutine job1: After waiting for 1 second, you will find that I am not affected by the cancellation method main: After the entire coroutine is canceled
We can see it by outputting the results. Only the two methods of job1 are executed. Job2 was also canceled during the cancellation process.
Parent coroutine
Only after we understand the concept of child coroutine can we understand the parent coroutine more clearly.
A parent coroutine always waits for all child coroutines to finish executing. The parent coroutine does not explicitly track the start of all child coroutines, and does not have to wait for them at the end using Job.join:
Example:
fun main() = runBlocking<Unit> { // Start a coroutine to handle some kind of incoming request (request) val request = launch { repeat(3) { i -> // Start a small number of sub-coroutines launch { delay((i + 1) * 200L) // Delay 200 milliseconds, 400 milliseconds, 600 milliseconds println("Coroutine: $i ends") } } println("Return value: The parent coroutine itself has completed execution, but I did not call a method to explicitly close all child coroutines. The transaction of the child coroutines has not yet ended") } request.join() // Wait for the completion of the request, including all its sub-coroutines println("All coroutines end") } //Output Return value: The parent coroutine itself has been executed, but I did not call the method to explicitly close all child coroutines. The transaction of the child coroutines has not yet ended. Coroutine: 0 ended Coroutine: 1 ended Coroutine: 2 ended All coroutines end
We can see that the code of the parent coroutine has been executed and output. But the sub-coroutine is still active, so the entire coroutine is still active.
Of course, if we actively call .cancel(), the sub-coroutine will be forcibly terminated before it has finished running.
This is the relationship between the parent and child coroutines in the coroutine.
Name the coroutine for easy debugging
If the coroutine prints a log, it will have a default ID. But if you are processing some specific requests or logic
If we name the coroutine, we can debug it more conveniently when debugging.
Naming the coroutine is usually handled through CoroutineName.
Example:
val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1") 252 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2") 6 }
For example, above I named two coroutine objects. This naming result can only be seen in the log log.
When initializing the coroutine, add multiple elements
We learned to initialize the scheduler in the overloaded coroutine, and also learned to add the coroutine name in the previous step.
So if we need to add these two configuration properties at startup, how should we deal with it?
Can be spliced with +. Example:
fun main() = runBlocking<Unit> { // Start a coroutine to handle some kind of incoming request (request) launch(Dispatchers.Default + CoroutineName("test")) { println("My working thread: ${Thread.currentThread().name}") } }
Scope-CoroutineScope
We all understand scope, which is to take effect within a specified space and area. And if we are developing in Android, we use Activity to start a coroutine to handle operations such as network or asynchronous IO reading. All coroutines should be automatically canceled after the Activity is destroyed to avoid memory leaks.
In addition to manually processing and closing it, we can also declare its scope when the coroutine is built.
Example:
class DemoActivity : AppCompatActivity() { //MainScope uses the factory function that comes with the Kotlinx coroutine library. // It is an adapted UI thread using Dispatchers.Main as the scheduler private val mainScope = MainScope() ? //Cancel the scope when closing fun destroy() { mainScope.cancel() } ? fun doSomething() { // In the example, 10 coroutines are started, and each one works for a different length of time repeat(10) { i -> mainScope.launch { delay((i + 1) * 200L) // Delay 200 milliseconds, 400 milliseconds, 600 milliseconds, etc. different times println("Coroutine $i is done") } } } }
Then if we close the activity, the coroutine will also be closed automatically.
Android now provides first-level support for coroutine scope in all entities with life cycles (activity, Fragment, etc.).
Partial data transfer
If we use coroutines, especially child coroutines, parent coroutines are mixed, etc. Then if some data can be passed between coroutines and coroutines. Then the efficiency will be greatly improved.
Kotlin provides: ThreadLocal, asContextElement extension functions to help us, they create additional context elements, retain the value of the given ThreadLocal, and restore it every time the coroutine switches its context.
Example:
import kotlinx.coroutines.* ? val threadLocal = ThreadLocal<String?>() // declare thread-local variable ? fun main() = runBlocking<Unit> { threadLocal.set("main") println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) { println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") yield() println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") } job.join() println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'") } //Output Pre-main, current thread: Thread[main,5,main], thread local value: 'main' Launch start, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch' After yield, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch' Post-main, current thread: Thread[main,5,main], thread local value: 'main'
In this example, a new coroutine is started in the thread pool using Dispatchers.Default. So it works in different threads in the thread pool, but it still has the value of the thread local variable. For example, the above is to use asContextElement to modify the value of get from main to launch.