A brief introduction to Kotlin coroutines and context and dispatchers (Dispatchers)

Coroutine Overview

Subprograms, or functions, are hierarchical calls in all languages. For example: A calls B, B calls C during execution, C returns after execution, B returns after execution, and finally A completes execution. So subroutines are implemented through the stack, and a thread executes a subroutine. Subroutine calls always have one entrance and one return, and the calling sequence is clear.

The coroutine looks like a subprogram. During the execution process, you can interrupt in the subprogram to execute other subprograms, and you can come back and continue execution at the appropriate time.

How Kotlin coroutines work

The general execution process of Kotlin coroutines is shown in the figure above. This process is roughly followed when executing various types of coroutines. It is not a strictly precise execution process.

Create and start the coroutine

fun create.main() {
    //1. Create coroutine body
    val coroutine = suspend {
        println("in coroutine")
        5
    }.createCoroutine(object: Continuation<Int> {
        override fun resumeWith(result: Result<Int>) {
            println("coroutine end: $result")
        }
?
        override val context: CoroutineContext
            get() = EmptyCoroutineContext
?
    })
?
    //2. Execute coroutine
    coroutine.resume(Unit)
}

The output of the above code:

in coroutine
coroutine end: Success(5)

Execution process of coroutine

The call stack flow is as follows

  1. The coroutine we get through suspend block#createCoroutine is actually a SafeContinuation object
  2. SafeContinuation is actually a proxy class, and the delegate property is the real Continuation object.
  3. The code in the suspend block is executed in BaseContinuationImpl
  4. Our anonymous inner class object Continuation is called back

How does the suspend block become a coroutine body and be executed?

We analyze the call stack and know that resumeWith is ultimately executed in BaseContinuationImpl. Let’s take a look at the code.

@SinceKotlin("1.3")
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param) //1. The suspend block is executed here
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted()
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param=outcome
                } else {
                    completion.resumeWith(outcome) //2. Our anonymous inner class is called back here
                    return
                }
            }
        }
    }
?
    protected abstract fun invokeSuspend(result: Result<Any?>): Any? //3. Abstract method
}

At code comment 1., current.invokeSuspend is called to execute the coroutine body we defined, proving that the suspend block is actually a subclass of BaseContinuationImpl

At 2., after the coroutine body is executed, our code receives the completion callback

At 3., you can find that invokeSuspend is an abstract method, and suspend block is the specific implementation of this method.

Next, I will use breakpoints to further analyze which subclass the suspend block is executed through.

You can see that current is a file named {file}

method

{method}

The object in the format of method {variable}$1 proves that the kotlin compiler will help us generate a subclass of BaseContinuationImpl after encountering the suspend keyword.

So, what exactly is this subcategory? After compiling kt into .class and opening it through jadx, the resulting java code is as follows

public final class CreateCoroutineKt {
    public static final void create.main() {
        Continuation coroutine = ContinuationKt.createCoroutine(new CreateCoroutineKt$create.main$coroutine$1(null), new CreateCoroutineKt$create.main$coroutine$2());
        Unit unit = Unit.INSTANCE;
        Result.Companion companion = Result.Companion;
        coroutine.resumeWith(Result.constructor-impl(unit));
    }
}
final class CreateCoroutineKt$create.main$coroutine$1 extends SuspendLambda implements Function1<Continuation<? super Integer>, Object> {
    int label;
?
    CreateCoroutineKt$create.main$coroutine$1(Continuation<? super CreateCoroutineKt$create.main$coroutine$1> continuation) {
        super(1, continuation);
    }
?
    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation<?> continuation) {
        return new CreateCoroutineKt$create.main$coroutine$1(continuation);
    }
?
    @Nullable
    public final Object invoke(@Nullable Continuation<? super Integer> continuation) {
        return create(continuation).invokeSuspend(Unit.INSTANCE);
    }
?
    @Nullable
    public final Object invokeSuspend(@NotNull Object obj) {
        IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(obj);
                System.out.println((Object) "in coroutine"); //Logic of coroutine body
                return Boxing.boxInt(5);
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }
}

It is obvious that the kt compiler helped us change the suspend keyword into a subclass of SuspendLambda, and rewrote the invokeSuspend method. It is not difficult to guess that SuspendLambda inherits from BaseContinuationImp.

Kotlin full analysis document: “Kotlin Manual”Click to view detailed categories

Coroutine context and scheduler

The context of a coroutine is usually represented by the CoroutineContext type. This type is defined in the Kotlin standard library.

In a coroutine, the context is a collection of various elements. The leading element among them is Job.

When we understand the concurrency and scheduling of coroutines, we involve Job. Kotlin coroutine combines the suspend function and the async keyword to implement concurrent operations of the coroutine (zinyan.com)

This article continues to learn more about Job.

Schedulers and threads

What is a scheduler? The scheduler is a control object that determines which thread or threads a coroutine will execute on.

It can limit the execution of the coroutine to a specific thread, assign the coroutine to a thread pool, or allow the coroutine to run without restrictions.

Coroutine context object: CoroutineContext.

Coroutine dispatcher object: CoroutineDispatcher.

And we usually can define the CoroutineContext object through optional parameters when using launch or async. Then it will help us specify a scheduler object. You can also use the Dispatchers object to define the scheduler

Example:

import kotlinx.coroutines.*
?
fun main() = runBlocking<Unit> {
    //Run in the context of the parent coroutine, that is, runBlocking main coroutine
    launch {
        println("main runBlocking : My working thread ${Thread.currentThread().name}")
    }
    //Scheduled into the mainline layer and not restricted
    launch(Dispatchers.Unconfined) { //
        println("Unconfined : My working thread ${Thread.currentThread().name}")
        
            println("What is the end of this node")
    }
    //Scheduled to the default thread
    launch(Dispatchers.Default) {
        println("Default : My working thread ${Thread.currentThread().name}")
    }
    // Schedule to a new thread
    launch(newSingleThreadContext("ZinyanThread")) {
        println("ZinyanThreadContext: My working thread ${Thread.currentThread().name}")
    }
}
//Output
Unconfined : The thread main where I work
Default: My working thread DefaultDispatcher-worker-1
ZinyanThreadContext: The thread I work on ZinyanThread
main runBlocking : the thread I work on main

The following describes the above four scheduling logics.

launch{…}: By default, it will inherit the context and scheduler from the coroutine object that launched it.

Our example above is that the context is inherited from the runBlocking coroutine object in the main thread, and the result shows that it is running in the main thread.

Dispatchers.Unconfined: It is a special scheduler. In the above example, it runs on the main line layer. But there is a note called unrestricted scheduler. Then you can see that its output is the fastest and earliest. But it just runs to the first suspension point. After suspending, it resumes the coroutine in the thread, and this is entirely determined by the suspending function that was called. The unconstrained scheduler is ideal for coroutines that perform tasks that do not consume CPU time, and that do not update any shared data (such as UI) that is restricted to a specific thread.

It will inherit the external coroutine object by default. When it is restricted to the caller thread, inheriting from it will effectively restrict the coroutine to run on that thread and have predictable FIFO scheduling.

example:

fun main() = runBlocking<Unit> {
    // Unrestricted - will work with the main thread
    launch(Dispatchers.Unconfined) {
        println("Unconfined : worker thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined : Thread delay ${Thread.currentThread().name}")
    }
    launch { //Context of parent coroutine, main runBlocking coroutine
        println("main runBlocking: worker thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: thread delay ${Thread.currentThread().name}")
    }
}
//Output
Unconfined: worker thread main
main runBlocking: worker thread main
Unconfined: thread delay kotlinx.coroutines.DefaultExecutor
main runBlocking: thread delay main

Therefore, the context of the coroutine is inherited from the runBlocking {…} coroutine and runs in the main thread. When the delay function is called, the unrestricted coroutine resumes execution in the default executor thread.

The unconstrained scheduler is an advanced mechanism that can help in certain corner cases without the need to schedule the coroutine for later execution or to have undesirable side effects, since some operations must be performed immediately in the coroutine. The unrestricted scheduler should not be used in normal code.

Dispatchers.Default: Default dispatcher. The default scheduler uses a shared background thread pool. So launch(Dispatchers.Default) { … } uses the same scheduler as GlobalScope.launch { … }.

newSingleThreadContext(“MyOwnThread”): Customized coroutine thread layer. A thread is started for the coroutine to run. A dedicated thread is a very expensive resource. In actual development both must be released when no longer needed, using the close function, or stored in a top-level variable so that it can be reused throughout the application. Otherwise, thread flooding will occur.

Jumps in different threads

Implement the jump of two coroutine threads. Example:

fun main() = runBlocking<Unit> {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                println("Open Ctx1")
                withContext(ctx2) {
                    println("Ctx2 starts working")
                }
                println("Return to Ctx1 ")
            }
        }
    }
}
//Output
Turn on Ctx1
Ctx2 starts working
Return to Ctx1

In this example, a context is explicitly specified using runBlocking. And then use withContext in the coroutine to change the context of the coroutine while still residing in the same coroutine.

Get the above output. In this example, the threads created by newSingleThreadContext() are used, and we use the use function in the standard library to release the thread. Avoid thread abuse.

Job in context

The Job in a coroutine is part of the context and can be retrieved from the context using the coroutineContext[Job] expression.

Example:

fun main() = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}
//Output
My job is BlockingCoroutine{Active}@1de0aca6

So what does this do? For example, we can query the activity status of the coroutine

Example:

fun main() = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
    var s = coroutineContext[Job]?.isActive
    println(s)
}
//Output
true

It shows that my current coroutine object is active.

And why add “?” That’s because the object may be null.

Sub-coroutine

When a coroutine is started in a CoroutineScope by another coroutine, it will inherit the context of the main coroutine through CoroutineScope.coroutineContext. And the Job object of this new coroutine will become the child Job object of the parent coroutine.

When a parent coroutine is canceled, all its child coroutines will also be canceled recursively.

However, when using GlobalScope to start a coroutine, the new coroutine’s job has no parent job. Therefore it is independent of the scope of this launch and operates independently.

Example:

fun main() = runBlocking<Unit> {
    // Start a coroutine to handle some kind of incoming request (request)
    val request = launch {
        // Two sub-jobs are hatched, one of which is started through GlobalScope
        GlobalScope.launch {
            println("job1: I run in the coroutine started by GlobalScope")
            delay(1000)
            println("job1: After waiting for 1 second, you will find that I am not affected by the cancellation method")
        }
        // The other one inherits the context of the parent coroutine
        launch {
            delay(100)
            println("job2: I am a child coroutine object started by a parent coroutine")
            delay(1000)
            println("job2: Wait for 1 second. If the parent coroutine is canceled, I will also be canceled. This line should not be printed")
        }
    }
    delay(500)
    request.cancel() // Cancel the execution of the request (request)
    delay(1000) // Delay for one second to see what happens
    println("main: after the entire coroutine is canceled")
}
//Output
job1: I run in the coroutine started by GlobalScope
job2: I am a child coroutine object started by the parent coroutine
job1: After waiting for 1 second, you will find that I am not affected by the cancellation method
main: After the entire coroutine is canceled

We can see it by outputting the results. Only the two methods of job1 are executed. Job2 was also canceled during the cancellation process.

Parent coroutine

Only after we understand the concept of child coroutine can we understand the parent coroutine more clearly.

A parent coroutine always waits for all child coroutines to finish executing. The parent coroutine does not explicitly track the start of all child coroutines, and does not have to wait for them at the end using Job.join:

Example:

fun main() = runBlocking<Unit> {
    // Start a coroutine to handle some kind of incoming request (request)
    val request = launch {
        repeat(3) { i -> // Start a small number of sub-coroutines
            launch {
                delay((i + 1) * 200L) // Delay 200 milliseconds, 400 milliseconds, 600 milliseconds
                println("Coroutine: $i ends")
            }
        }
        println("Return value: The parent coroutine itself has completed execution, but I did not call a method to explicitly close all child coroutines. The transaction of the child coroutines has not yet ended")
    }
    request.join() // Wait for the completion of the request, including all its sub-coroutines
    println("All coroutines end")
}
//Output
Return value: The parent coroutine itself has been executed, but I did not call the method to explicitly close all child coroutines. The transaction of the child coroutines has not yet ended.
Coroutine: 0 ended
Coroutine: 1 ended
Coroutine: 2 ended
All coroutines end

We can see that the code of the parent coroutine has been executed and output. But the sub-coroutine is still active, so the entire coroutine is still active.

Of course, if we actively call .cancel(), the sub-coroutine will be forcibly terminated before it has finished running.

This is the relationship between the parent and child coroutines in the coroutine.

Name the coroutine for easy debugging

If the coroutine prints a log, it will have a default ID. But if you are processing some specific requests or logic

If we name the coroutine, we can debug it more conveniently when debugging.

Naming the coroutine is usually handled through CoroutineName.

Example:

val v1 = async(CoroutineName("v1coroutine")) {
    delay(500)
    log("Computing v1")
    252
}
val v2 = async(CoroutineName("v2coroutine")) {
    delay(1000)
    log("Computing v2")
    6
}

For example, above I named two coroutine objects. This naming result can only be seen in the log log.

When initializing the coroutine, add multiple elements

We learned to initialize the scheduler in the overloaded coroutine, and also learned to add the coroutine name in the previous step.

So if we need to add these two configuration properties at startup, how should we deal with it?

Can be spliced with +. Example:

fun main() = runBlocking<Unit> {
    // Start a coroutine to handle some kind of incoming request (request)
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("My working thread: ${Thread.currentThread().name}")
    }
}

Scope-CoroutineScope

We all understand scope, which is to take effect within a specified space and area. And if we are developing in Android, we use Activity to start a coroutine to handle operations such as network or asynchronous IO reading. All coroutines should be automatically canceled after the Activity is destroyed to avoid memory leaks.

In addition to manually processing and closing it, we can also declare its scope when the coroutine is built.

Example:

class DemoActivity : AppCompatActivity() {
    //MainScope uses the factory function that comes with the Kotlinx coroutine library.
    // It is an adapted UI thread using Dispatchers.Main as the scheduler
    private val mainScope = MainScope()
?
    //Cancel the scope when closing
    fun destroy() {
        mainScope.cancel()
    }
?
    fun doSomething() {
        // In the example, 10 coroutines are started, and each one works for a different length of time
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // Delay 200 milliseconds, 400 milliseconds, 600 milliseconds, etc. different times
                println("Coroutine $i is done")
            }
        }
    }
}

Then if we close the activity, the coroutine will also be closed automatically.

Android now provides first-level support for coroutine scope in all entities with life cycles (activity, Fragment, etc.).

Partial data transfer

If we use coroutines, especially child coroutines, parent coroutines are mixed, etc. Then if some data can be passed between coroutines and coroutines. Then the efficiency will be greatly improved.

Kotlin provides: ThreadLocal, asContextElement extension functions to help us, they create additional context elements, retain the value of the given ThreadLocal, and restore it every time the coroutine switches its context.

Example:

import kotlinx.coroutines.*
?
val threadLocal = ThreadLocal<String?>() // declare thread-local variable
?
fun main() = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
//Output
Pre-main, current thread: Thread[main,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-1,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main,5,main], thread local value: 'main'

In this example, a new coroutine is started in the thread pool using Dispatchers.Default. So it works in different threads in the thread pool, but it still has the value of the thread local variable. For example, the above is to use asContextElement to modify the value of get from main to launch.