Kotlin — asynchronous using coroutines

Communication between coroutines

Coroutines cannot directly access data through variables, which will lead to data atomicity problems, so coroutines provide a set of Channel mechanisms to transfer data between coroutines.

Channel is a concept very similar to BlockingQueue. One of the differences is that it replaces the blocking put operation and provides a pending send, and also replaces the blocking take operation and provides receive pending.

Channel send and receive operations are fair and respect the multiple coroutines that call them. They obey the first-in-first-out principle, and you can see that the first coroutine calls receive and gets the element

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        // Here may be asynchronous logic that consumes a lot of CPU operations, we will only do the square of the integer 5 times and send
        for (x in 1..5) channel. send(x * x)
        channel. close()
    }
    // Here we print the received integer 5 times:
    // channel.receive() is blocked, waiting for the sent data to be sent
    repeat(5) { println(channel. receive()) }
    println("Done!")
}

After sending, remember to call channel.close(), the close() operation is like sending a special close command to the channel. This iteration stops indicating that the close command has been received. So here it is guaranteed that all previously sent elements are received before the channel is closed.

Coroutine-based producer\consumer

In the coroutine, you can use produce to simulate the producer to produce data. And use consume to simulate the consumer situation.

Currently, in version 1.3.11 of Kotlin, produce and consume are only experimental functions, there is no official release, remember to use when using Functions used by the @ExperimentalCoroutinesApi tag

runBlocking {
      val receiveChannel: ReceiveChannel<Int> = produce {
          for (x in 1..5) send(x * x)
      }
      receiveChannel. consumeEach {
          println(it)
      }
 }

Fan-in

The concept of fan-in is similar to IO multiplexing in file systems.

Fan-in allows multiple coroutines to send to the same channel.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive the first six
        println("${channel.receive()}...${Thread.currentThread().name}")
    }
    coroutineContext.cancelChildren() // cancel all child coroutines to let the main coroutine end
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel. send(s)
    }
}

operation result:

foo...main @coroutine#1
foo...main @coroutine#1
BAR!...main @coroutine #1
foo...main @coroutine#1
foo...main @coroutine#1
foo...main @coroutine#1

Using async concurrency

async is similar to launch. It starts a single coroutine, which is a lightweight thread that works concurrently with all other coroutines. The difference is that launch returns a Job without any result value, while async returns a Deferred— A lightweight non-blocking future that represents a promise that will deliver a result later. You can use .await() on a deferred value to get its final result, but Deferred is also a Job, so if needed , you can cancel it.

In fact, the difference is that async can obtain execution results, while launch simply executes tasks.

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one. await() + two. await()}")
    }
    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // assume we're doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // assume we do something useful here too
    return 29
}

Results of the:

The answer is 42
Completed in 1017 ms

However, async{} will directly start the coroutine. If you need to wait for an event to start, you need to use CoroutineStart.LAZY:

val time = measureTimeMillis {
    val one = async(start = CoroutineStart. LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart. LAZY) { doSomethingUsefulTwo() }
    // perform some calculations
    one.start() // start the first one
    two.start() // start the second
    println("The answer is ${one. await() + two. await()}")
}
println("Completed in $time ms")

Coroutine security

Coroutines, like threads, cannot maintain atomicity in data operations, so in coroutines, you need to use atomic data structures, such as AotimicInteger, etc., or use mutex.withLock code>, to handle the atomicity of data

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*

suspend fun CoroutineScope. massiveRun(action: suspend () -> Unit) {
    val n = 100 // number of coroutines to start
    val k = 1000 // the number of times each coroutine repeats the same action
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs. forEach { it. join() }
    }
    println("Completed ${n * k} actions in $time ms")
}

val mutex = Mutex()
var counter = 0

fun main() = runBlocking<Unit> {
    GlobalScope. massiveRun {
        mutex. withLock {
            counter ++
        }
    }
    println("Counter = $counter")
}

Output result:

Completed 100000 actions in 104 ms
Counter = 100000

Actor

An Actor is an entity composed of a coroutine, state bounded and encapsulated into that coroutine, and a channel to communicate with other coroutines. A simple actor can simply be written as a function, but an actor with complex state is better represented by a class.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*

suspend fun CoroutineScope. massiveRun(action: suspend () -> Unit) {
    val n = 100 // number of coroutines to start
    val k = 1000 // the number of times each coroutine repeats the same action
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch {
                repeat(k) { action() }
            }
        }
        jobs. forEach { it. join() }
    }
    println("Completed ${n * k} actions in $time ms")
}

// Various types of counter actors
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment the counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // request with reply

// This function starts a new counter actor
fun CoroutineScope. counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterator over incoming messages
        when (msg) {
            is IncCounter -> counter ++
            is GetCounter -> msg. response. complete(counter)
        }
    }
}

fun main() = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    GlobalScope. massiveRun {
        counter. send(IncCounter)
    }
    // send a message to get the counter value from an actor
    val response = CompletableDeferred<Int>()
    counter. send(GetCounter(response))
    println("Counter = ${response. await()}")
    counter.close() // close the actor
}

The context in which the actor itself executes does not matter (for correctness). An actor is a coroutine, and a coroutine is executed sequentially, so restricting state to a specific coroutine solves the problem of shared mutable state. In fact, actors can modify their own private state, but can only interact with each other via messages (avoiding any locking).

An actor is more efficient than a lock under high load, because in this case it always has work to do and doesn’t need to switch to a different context at all.

You can scan the QR code to get more learning materials about Kotlin for free!

Advanced Kotlin Strengthening Practice

Chapter 1 Introduction to Kotlin

? ● Kotlin overview

? Kotlin vs. Java

? ● Smart use of Android Studio

? ● Know Kotlin basic types

? ● Walking into arrays in Kotlin

? ● Walk into Kotlin collections

? ● full code

? ● basic grammar

img

Chapter 2 Kotlin Practical Pit Avoidance Guide

? ● Method input parameters are constants and cannot be modified

? ● No Companion, INSTANCE?

? ● Java overloading, how to make a clever transition in Kotlin?

? ● Null detection gesture in Kotlin

? ● Kotlin overrides methods in Java parent classes

? ● Kotlin becomes “ruthless”, even TODO is not spared!

? ● Pit in is, as`

? ● Understanding of Property in Kotlin

? also keyword

? ● takeIf keyword

? ● How to write the singleton pattern

img

Chapter 3 Project Combat “Kotlin Jetpack Combat”

? ● Start with a demo that worships a great god

? ● What is the experience of writing Gradle scripts in Kotlin?

? ● The Triple Realm of Kotlin Programming

? ● Kotlin higher-order functions

? ● Kotlin Generics

? ● Kotlin extensions

? Kotlin delegates

? ● Coroutine “unknown” debugging skills

? ● Graphical coroutine: suspend

img