Core understanding of Kotlin coroutines

1. What is a coroutine?

1.1 Understanding of basic concepts

We know that the implementation of threads in the JVM is determined by the operating system it runs on. The JVM only encapsulates the API in the upper layer, including common threaded startup methods and state management. For example: Java abstracts 6 types of threads. status, provides the start method to start the thread.
But once the thread calls start() to start execution, it is difficult for us to control the stop of the thread. Although jdk provides the suspend() method, suspend only marks the thread that needs to be interrupted. Whether and when it will be interrupted. It still relies on the specific implementation logic of the operating system, which cannot be directly controlled from the language level.

//The state of the java thread is defined in the Java$State enumeration object
public enum State {<!-- -->
        /**
         * Thread state for a thread which has not yet started.
         */
        NEW,

        /**
         * Thread state for a runnable thread. A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */
        RUNNABLE,

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */
        BLOCKED,

        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         * <li>{@link Object#wait() Object.wait} with no timeout</li>
         * <li>{@link #join() Thread.join} with no timeout</li>
         * <li>{@link LockSupport#park() LockSupport.park}</li>
         *</ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called {@code Object.wait()}
         * on an object is waiting for another thread to call
         * {@code Object.notify()} or {@code Object.notifyAll()} on
         * that object. A thread that has called {@code Thread.join()}
         * is waiting for a specified thread to terminate.
         */
        WAITING,

        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         * <li>{@link #sleep Thread.sleep}</li>
         * <li>{@link Object#wait(long) Object.wait} with timeout</li>
         * <li>{@link #join(long) Thread.join} with timeout</li>
         * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         *</ul>
         */
        TIMED_WAITING,

        /**
         * Thread state for a terminated thread.
         * The thread has completed execution.
         */
        TERMINATED;
    }
  • Six state transition relationships of Java threads:

    The code within the coroutine is still executed on the thread because the premise that threads are the basic unit of CPU scheduling remains unchanged and is a basic concept at the operating system level. However, by using state machines, coroutines implement a thread task scheduling framework (language-level framework) whose state and life cycle are easier to control at the language level. It can also be understood as light Magnitude threadsBut unlike threads, which are implemented directly using the operating system, once started, they can only stop running when the task execution is completed or an interrupt is requested.
1.2 The relationship between coroutines, threads and processes

Schematic diagram of the relationship between processes, threads and coroutines
Start a thread to perform the task:

val task1 = Thread {<!-- -->
   val result = requestUserInfo()
   println("task1 finished, result = $result")
}
task1.start()
// Or use ThreadPoolExecutor.execute(Runnable),
// Finally, the start method of Thread is called to start the thread.

Start a coroutine to perform tasks:

val task1 = launch {<!-- -->
   val result = requestUserInfo()
   println("task1 finished, result = $result")
}
// requestUserInfo() needs to switch the thread running the coroutine and needs to add suspend modification.
// Define as a suspend function
suspend fun requestUserInfo(): UserInfo = withContext(Dispatchers.IO) {<!-- -->
    delay(500)
    return@withContext UserInfo("10000", "zhangsan")
}
  • To summarize, the difference between coroutines and threads:
    • Once the thread starts executing, it will not pause until the task ends. This process is continuous.
    • The coroutine can suspend and resume by itself, and the language level implements the suspension and resume process, enabling collaborative scheduling.
1.3 Key APIs for using coroutines
1.3.1 Coroutine scope: CoroutineScope

Creating a coroutine or calling a suspended function must have a coroutine scope. Kotlin has three ways to create a scope, GlobalScope, runBlocking and CoroutineScope() methods.
Usage of three coroutine scopes

  • The coroutine scopes provided in Android are:
    • MainScope()
      MainScope()

    • lifecycleScope

      The coroutine in lifecycleScope will execute cancel when the Activity is destroyed.

    • viewModelScope
      viewModelScope

1.3.2 Coroutine object: Job
public interface Job : CoroutineContext.Element {<!-- -->
// Note (1)
public companion object Key : CoroutineContext.Key<Job>
// If the coroutine has not been started, for example, the incoming start object is LAZY, it can be actively called
// start method starts the coroutine
public fun start(): Boolean
// Note (2)
public fun cancel(cause: CancellationException? = null)
// Sub-coroutines of the current coroutine
public val children: Sequence<Job>
// Attach a child coroutine to make the current coroutine object the parent coroutine
@InternalCoroutinesApi
    public fun attachChild(child: ChildJob): ChildHandle
    // Wait for the execution of the current coroutine to complete, for example, after calling the cancel() method of the coroutine, call join()
    // Just wait for the coroutine cancel execution to complete
    public suspend fun join()
    // Register a handler that will be called synchronously once when this job is canceled or completed
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
    @InternalCoroutinesApi
    public fun invokeOnCompletion(
        onCanceling: Boolean = false,
        invokeImmediately: Boolean = true,
        handler: CompletionHandler): DisposableHandle
}
  • (1) Key: After being declared as a companion object, as long as they are different Job instances created by the same type of Job, the keys will be the same, that is, the keys of Job objects of the same type will also be the same.
  • (2) cancel(): cancel the coroutine
1.3.3 Coroutine context: CoroutineContext

Store some information related to the coroutine

1.3.4 Coroutine Dispatcher: CoroutineDispatcher
  • Dispatchers.Main: Unique to Android, coroutine code is executed in the main thread. If used on other platforms, an exception will be thrown.
  • Dispatchers.IO: for IO-intensive coroutine tasks
  • Dispatchers.Default: used for CPU-intensive coroutine tasks
  • Dispathcers.Unconfined: The thread scheduler for coroutine execution is not specified
1.3.5 launch()

Start a new coroutine. The parameter can be passed to Dispatcher to specify the thread scheduler in which the coroutine runs.

1.3.6 withContext()

Switch the coroutine scheduler, that is, switch the thread for coroutine execution. The parameters are Dispatchers.IO, Dispatchers.Main, and Dispatcher.Default.

1.3.7 async()

The parameters are passed into the coroutine scheduler, specifying which thread (pool) specified by the thread scheduler to run the coroutine, and at the same time returning a Job object of Deferred type similar to Java’s Callback and Future, which is a subclass of Job, and finally calls The await() method waits synchronously for the results of execution completion.

public interface Deferred<out T> : Job {<!-- -->
public suspend fun await(): T
}
  • Kotlin documentation explains the await function:
    Waits for the completion of this value without blocking the thread, and resumes when the deferred calculation completes, returning the result value or raising the appropriate exception on deferred cancellation.
    This suspend function is cancelable. If the current coroutine’s Job is canceled or completes while it is waiting for this suspended function, this function will resume immediately with a CancellationException. There is a prompt cancellation guarantee. If a job is canceled while this feature is paused, the job will not be successfully resumed.
1.3.8 join()

2. Common uses of coroutines in Android

2.1 After executing time-consuming tasks in the sub-thread, switch to the main thread to update the UI
// Scenario 1: After executing time-consuming tasks in the sub-thread, switch to the main thread for processing
coroutineScope.launch {<!-- -->
    // Suspend function, detached from the current thread during execution, executed in the thread executed by dispatcher, and then switched to the original thread after execution is completed
    // After suspending, the next line of code of the current coroutine will wait for the execution of the suspended function to complete.
    val result = withContext(Dispatchers.IO) {<!-- -->
        //Execute the following code in the child thread specified by Dispatchers.IO (thread scheduler)
        delay(5000)
        100
    }
    Log.d(TAG, "onCreate: main 2 =========> $coroutineContext")
    binding.tvNews.text = result.toString()
}

The above usage is for Android. Coroutine is an asynchronous code execution framework. Compared with the Thread + Handler method, it is more concise and saves developers the work of writing thread switching code.

2.2 Multiple time-consuming tasks are executed in parallel to merge the results [common business model]

In Android business, we often need to start multiple business interface requests in parallel, and then merge them into one result for subsequent business logic judgment and UI display. Using CountDownLatch provided by Jdk and RxJava’s zip can implement similar functional logic.
The following shows how kotlin is implemented in this business model:

coroutineScope.launch {<!-- -->
    // Execute task 1 in the thread executed by Dispatchers.IO
    val async1Result = async(Dispatchers.IO) {<!-- -->
        Log.d(TAG, "onCreate: async1 $coroutineContext")
        executeTask1()
    }
    //Execute task 2 in the thread executed by Dispatchers.IO
    val async2Result = async(Dispatchers.IO) {<!-- -->
        Log.d(TAG, "onCreate: async2 $coroutineContext")
        executeTask2()
    }
    // After calling the async method, both coroutine tasks have run in parallel. At this time, the await method is called to wait for the execution result.
    val result = async1Result.await() + async2Result.await()
    Log.d(TAG, "onCreate: async result = $result")
}
  • Kotlin uses async to implement coroutine tasks similar to Callable in Java, but the await method blocks and waits for results and does not provide a timeout parameter.
  • The async() method creates a coroutine object that can obtain the return value. The type is Deferred and inherits from Job.

3. Understanding the suspend function

3.1 The nature of the suspend function
  • The core of a coroutine is that a function or a program can support suspension, resume from the suspension position after execution is completed, and then continue to execute the subsequent code.
  • Kotlin is implemented with the help of threads. It is an encapsulation framework for threads. Starting a coroutine through launch and async is actually starting a code block in a closure.
  • When the suspend function is executed, the coroutine code will not be executed temporarily, but will be detached from the current thread. The logic within the function will be transferred to the thread specified by the coroutine scheduler for execution. After the execution of the suspended function is completed, it will be executed again. Restore the suspended position and continue executing subsequent logic.

So in summary, the suspend function is a thread scheduling operation that switches to another thread and can automatically switch back later.

3.2 Why must the suspension function be called in the coroutine or suspension function?

After the suspending function is switched to the scheduler thread, the coroutine framework needs to actively call the resumeWith method and then switch back. If the non-coroutine non-suspending function is called, then there will be no coroutine environment, and switching back cannot be achieved. Pending execution logic.

4. Cancellation of the coroutine

The coroutine provides the cancel method for cancellation.

class Job {<!-- -->
public fun cancel(cause: CancellationException? = null)
}

The cancel() method actually has two other overloaded methods, but they are annotated with @Deprecated, so they are no longer used.

5. Coroutine concurrent data synchronization issues

When multi-threads access shared variables in Java, there are main memory and working memory model designs.

class Test {<!-- -->
    private var count = 0
    private val mutex = Mutex()
    suspend fun test() = withContext(Dispatchers.IO) {<!-- -->
        repeat(100) {<!-- -->
            launch {<!-- -->
                repeat(1000) {<!-- -->
                   count++
                }
            }
        }
        launch {<!-- -->
            delay(3000)
            println("finished----> count = $count")
        }
    }
}

fun main(): Unit = runBlocking {<!-- -->
    runBlocking {<!-- -->
        Test().test()
    }
}

Output result:

finished----> count = 97861

As shown in the above code, 100 coroutines are created in the loop, and the count variable is accumulated 1,000 times in each coroutine. The final value of count should be 100000, but the output is 97861. There is an inconsistency problem in concurrently modified data.
In Java threads, we can use sychronized or ReentrantLock to process multi-thread synchronization to ensure the correctness of the final data. In the coroutine, kotlin provides the Mutext class as a synchronization lock object. Call Mutext’s lockWith method to add a synchronization lock to the shared variable. It is written as follows:

  • Mutex is an interface that provides the following methods for locking and releasing locks:
public interface Mutex {<!-- -->
// The lock and unlock methods pass in the lock object. If no lock object is passed in, the default value is null.
public suspend fun lock(owner: Any? = null)
public fun unlock(owner: Any? = null)

//Try to lock this mutex, return false if the mutex is already locked,
// The comparison and exchange plus synchronization lock mechanism is used.
public fun tryLock(owner: Any? = null): Boolean
// It is an extension method of the Mutex type, which implements the execution of synchronization code block logic in the try statement.
// The logic of calling unlock in the finally code block to release the lock is the same as Java.
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
//Returns true when locked
public val isLocked: Boolean
public val onLock: SelectClause2<Any?, Mutex>
public fun holdsLock(owner: Any): Boolean
}

Although the lock object owner is allowed to be null by default in the above lock and unlock methods, it is meaningless to be null. In fact, the default lock objects LOCKED and UNLOCKED are provided in the MutexImpl implementation code. When the owner is null, these will be taken. The two objects serve as the owner variable values of lock and unlock.

code show as below:

@SharedImmutable
private val LOCKED = Symbol("LOCKED")
@SharedImmutable
private val UNLOCKED = Symbol("UNLOCKED")
@SharedImmutable
private val EMPTY_LOCKED = Empty(LOCKED)
@SharedImmutable
private val EMPTY_UNLOCKED = Empty(UNLOCKED)
  • Mutex provides lock and unlock methods. You can also use the withLock method instead of lock () {} finally { unLock() }. The above code with inconsistent multi-threaded calculation data should use Mutex and add a synchronization lock as follows:
class Test {<!-- -->
    private var count = 0
    private val mutex = Mutex()
    suspend fun test() = withContext(Dispatchers.IO) {<!-- -->
        repeat(100) {<!-- -->
            launch {<!-- -->
                repeat(1000) {<!-- -->
                    // Use Mutex instead of ReentrantLock, the usage is similar,
                    //Use mutex.withLock{} to achieve data synchronization of different coroutines
                    mutex.withLock {<!-- -->
                        count++
                    }
                }
            }
        }
        launch {<!-- -->
            delay(3000)
            println("finished----> count = $count")
        }
    }
}

After modification, the correct result can be output:

finished----> count = 100000
  • Note: Mutext() is not a new object, but a method. The code style including CoroutineScope() in kotlin is also a method. You need to pay attention to its implementation distinction to avoid misunderstandings.
    • The Mutext() method directly creates a new MutextImpl() object:
    public fun Mutex(locked: Boolean = false): Mutex = MutexImpl(locked)