Kotlin coroutines: the basis and use of coroutines

1. Overview of coroutines

1. Concept

Coroutine is the Chinese abbreviation of Coroutine, co means collaboration and cooperation, and routine means program. A coroutine can be understood as multiple programs that cooperate with each other. A coroutine is a lightweight thread. Its lightness is reflected in the startup and switching. The startup of the coroutine does not need to apply for additional stack space; the switching of the coroutine occurs in the user mode instead of the kernel mode, avoiding complex systems transfer.

2. Features

1) It is more lightweight and takes up less resources.
2) Avoid “callback hell” and increase code readability.
3) The suspension of the coroutine does not block the thread.

3. Principle

The core of the Kotlin coroutine principle is embodied in two parts: “continuation transfer” and “state machine“.

1) Continuation delivery

Continuation-Passing-Style (Continuation-Passing-Style), or CPS for short, is a coding style. The continuation transfer is essentially the callback of the code and the transfer of the result. Assuming that the sequential execution code is divided into two parts, the first part is executed and returns a result (may be empty, an object reference, or a specific value). Then execute the second part of the code through the callback, and pass in the result returned by the first part of the code. This form of code writing style is the continuation transfer style.

Specifically, if you want to calculate a complex calculation, it will be written like this under normal circumstances. The code is as follows:

fun calculate(a: Int, b: Int): Int = a + b

fun main() {
    val result = calculate(1, 2)
    Log.d("liduo",result)
}

Transform the above code into a continuation passing style. First, define a continuation transfer interface, the code is as follows:

interface Continuation {
    fun next(result: Int)
}

Transform the calculate method, the code is as follows:

fun calculate(a: Int, b: Int, continuation: Continuation) =
    continuation. next(a + b)

fun main() {
    calculate(1, 2, object : Continuation {
    override fun next(result: Int) {
        Log.d("liduo", "$result")
        }
    })
}

After the transformation of the continuation transfer, the operation of printing the log is encapsulated into the Continuation and relies on the callback of the calculation operation. If the continuation method does not call back the execution parameter continuation, the operation of printing the log will never be executed.

Originally, a piece of code (logic) was executed sequentially, but after a continuation transformation, it became two pieces of code (logic).

2) State machine

When the code of the coroutine is processed by the Kotlin compiler, it will be optimized into a state machine model. Each piece of code has three states: Not Executed, Suspended, Resumed (Completed). The code in the unexecuted state can be executed. If it is suspended during execution, it will enter the suspended state. After resuming from suspension or execution, it will enter the resumed (completed) state. When multiple pieces of code like this cooperate, more complex state machines can be composed.

2. Coroutine basics

1. The context of the coroutine

A coroutine context is a set of persistent user-defined objects that can be attached to a coroutine. The code is as follows:

interface CoroutineContext {
    // Overload the "[]" operation
    operator fun  get(key: Key): E?
    // single value normalization operation
    fun  fold(initial: R, operation: (R, Element) -> R): R
    // Overload the "+" operation
    operator fun plus(context: CoroutineContext): CoroutineContext
    // Get other contexts other than the currently specified key
    fun minusKey(key: Key): CoroutineContext

    interface Element : CoroutineContext {
        val key: Key
    }

    interface Key
}

The Element interface inherits from the CoroutineContext interface, the interceptor, scheduler, exception handler andImportant classes representing the life cycle of the coroutine itself all implement the Element interface.

The Element interface stipulates that each object that implements this interface must have a unique Key, so that it can be quickly found in the coroutine context when needed. Therefore, the coroutine context can be understood as an index set of Element, an index set whose structure is between Set and Map.

2. The scope of the coroutine

The coroutine scope is used to manage the life cycle of the coroutine in the scope, the code is as follows:

interface CoroutineScope {
    // Default context for launching coroutines in scope
    val coroutineContext: CoroutineContext
}

Coroutines provide two commonly used methods to create new coroutine scopes, one is the coroutineScope method and the other is the supervisorScope method. The context in the scope created by these two methods will automatically inherit the context of the parent coroutine context. In addition, using GlobalScope to start a coroutine will also create a new coroutine scope for the coroutine, but the context of the coroutine scope is an empty context.

When the parent coroutine is canceled or an exception occurs, all child coroutines of the parent coroutine will be automatically canceled. When the child coroutine is canceled or an exception occurs, under the coroutineScope scope, the parent coroutine will be canceled; under the supervisorScope scope, the parent coroutine will not be affected.

The scope of coroutines is only valid for parent-child coroutines, not for offspring coroutines. For example: start the parent coroutine, start the child coroutine within the supervisorScope scope. When the child coroutine starts the grandson coroutine, if it is not specified as the supervisorScope scope, it defaults to the coroutineScope scope.

3. Coroutine scheduler

The coroutine scheduler is used to switch the thread that executes the coroutine. There are four common schedulers:

  • Dispatchers.Default: Default dispatcher. It uses the shared thread pool of the JVM, and the maximum concurrency of the scheduler is the number of CPU cores, which is 2 by default.
  • Dispatchers.Unconfined: Unconfined dispatcher. The scheduler does not restrict code execution to specific threads. That is, the code behind the suspending function will not actively resume execution in the thread before the suspending function, but will be executed on the thread that executes the suspending function.
  • Dispatchers.IO: IO scheduler. It offloads blocking IO tasks to a shared thread pool. This scheduler shares threads with Dispatchers.Default.
  • Dispatchers.Main: Main thread scheduler. Generally used to operate and update the UI.
    Note: The threads allocated by Dispatchers.Default scheduler and Dispatchers.IO scheduler are daemon threads.

4. The startup mode of the coroutine

Coroutines have the following four startup modes:

  • CoroutineStart.DEFAULT: Execute the coroutine immediately, which can be canceled at any time.
  • CoroutineStart.LAZY: Create a coroutine, but do not execute it, and manually trigger the execution when the user needs it.
  • CoroutineStart.ATOMIC: The coroutine is executed immediately, but cannot be canceled before the coroutine is executed. Currently in experimental stage.
  • CoroutineStart.UNDISPATCHED: Immediately execute the coroutine in the current thread until the first suspension is encountered. Currently in experimental stage.

5. The life cycle of the coroutine

Each coroutine will return an object pointed to by the Job interface after creation, aThe Job object represents a coroutine and is used to control the life cycle. The code is as follows:

interface Job : CoroutineContext.Element {
    ...
    // three status flags
    val isActive: Boolean
    val isCompleted: Boolean
    val isCancelled: Boolean
    // Get the specific cancellation exception
    fun getCancellationException(): CancellationException
    // start coroutine
    fun start(): Boolean
    // cancel coroutine
    fun cancel(cause: CancellationException? = null)
    ...
    // Wait for the coroutine execution to end
    suspend fun join()
    // for select statement
    val onJoin: SelectClause0
    // Callback used to register the end of coroutine execution
    fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
    ...
}

1) Coroutine state transition


Under the three modes of DEFAULT, ATOMIC, UNDISPATCHED , starting the coroutine will enter the Active state, and the coroutine started in the LAZY mode will enter the New state, which needs to enter the Active state after manually calling the start method.

Completing is an internal state that cannot be perceived externally.

2) Change of status flagCulture

State [isActive] [isCompleted] [isCancelled]
New false false false
Active true false false
Completing true false false
Cancelling false false true
Cancelled false true true
Completed fasle true false

3. Coroutine use

1. Start of coroutine

1) runBlocking method

fun  runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

This method is used to start a coroutine in a non-coroutine scope environment, and execute the code in the lambda expression in this coroutine. At the same time, calling this method will block the current thread until the lambda expression is executed. ShouldThe method should not be called in a coroutine. The purpose of this method is to allow the code written by suspend to be called in normal blocking code. If no coroutine scheduler is set, the coroutine will be executed in the currently blocked thread. The sample code is as follows:

private fun main() {
    // Do not specify a scheduler, execute in the thread of the method call
    runBlocking {
        // This is the scope of the coroutine
        Log.d("liduo", "123")
    }
}

private fun main() {
    // Specify the scheduler and execute it in the IO thread
    runBlocking(Dispatchers.IO) {
        // This is the scope of the coroutine
        Log.d("liduo", "123")
    }
}

2) launch method

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart. DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

This method is used to asynchronously start a new coroutine in the coroutine scope, calling this method will not block the thread. The sample code is as follows:

private fun test() {
    // The scope is GlobalScope
    // Lazy start, the main thread executes
    val job = GlobalScope.launch(
            context = Dispatchers. Main,
            start = CoroutineStart.LAZY) {
        Log.d("liduo", "123")
    }
    // start coroutine
    job. start()
}

3) async method

fun  CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart. DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred

This method is used to asynchronously start a new coroutine in the coroutine scope, calling this method will not block the thread. The difference between the async method and the launch method is that it can carry a return value. Calling this method will return an object pointed to by the Deferred interface, and calling this object can obtain the result of the coroutine execution. At the same time, the Deferred interface inherits from the Job interface, so the life cycle of the coroutine can still be manipulated. The sample code is as follows:

// suspend tag
private suspend fun test(): Int {
    // The scope is GlobalScope, the return value is Int type, the generic type can be omitted, and it will be automatically inferred
    val defer = GlobalScope. async {
        Log.d("liduo", "123")
        // delay1s
        delay(1000)
        1
    }
    // get return value
    return defer. await()
}

The return value can be obtained by calling the await method of the object pointed to by the returned Deferred interface. When the await method is called, if the coroutine is executed, the return value is obtained directly. If the coroutine is still executing, this method will cause the coroutine to be suspended until execution ends or an exception occurs.

4) suspend keyword

The suspend keyword is used to modify a method (lambda expression). The method modified by suspend is called the suspend method, which means that the method may be suspended during execution. Why is it possible? For example, although the following code is modified by suspend, it does not actually suspend:

private suspend fun test() {
    Log.d("liduo", "123")
}

Since suspension will occur, the suspend method can only be used in coroutines. The suspend method can call other suspend methods or non-suspend methods. But the suspend method can only be called by other suspend methods.

5) withContext method

suspend fun  withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

This method is used to switch to the thread specified by the scheduler to execute the code in the parameter block during the execution of the current coroutine, and return a result. Calling this method may suspend the current coroutine and resume the suspension at the end of method execution. The sample code is as follows :

private suspend fun test() {
    // IO thread starts and executes, start mode DEFAULT
    GlobalScope.launch(Dispatchers.IO) {
        Log.d("liduo", "start")
        // Thread main switch and suspend, generic can be omitted, automatically inferred
        val result = withContext(Dispatchers. Main) {
            // network request
            "json data"
        }
        // Switch back to the IO thread
        Log.d("liduo", result)
    }
}

6) suspend method

inline fun  suspend(noinline block: suspend () -> R): suspend () -> R = block

This method is used to wrap the suspend method, so that the suspend method can be called in the non-suspend method. This method needs to cooperate with the createCoroutine method to start the coroutine. The sample code is as follows:

// Return the continuation containing the current coroutine code
val continuation = suspend {
    // Execute the coroutine code
    // Generics can modify the required type
}.createCoroutine(object : Continuation {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext + Dispatchers.Main

    override fun resumeWith(result: Result) {
        // get the final result
    }
})
// Execute the continuation content
continuation. resume(Unit)

In general development, the coroutine will not be started through this method, but this method can show the start, recovery and suspension of the coroutine more essentially.

2. Communication between coroutines

1) Channel

Channel is used for communication between coroutines. Channel is essentially a concurrent safe queue, similar to BlockingQueue. When in use, communication is achieved by calling the send and receive methods of the same Channel object. The sample code is as follows:

suspend fun main() {
    // create
    val channel = Channel()

    val producer = GlobalScope. launch {
        var i = 0
        while (true){
            // send
            channel. send(i++)
            delay(1000)
            // The channel should be closed in time when it is not needed
            if(i == 10)
                channel. close()
        }
    }

    // Writing 1: Conventional
    val consumer = GlobalScope. launch {
        while(true){
            // take overval element = channel. receive()
            Log.d("liduo", "$element")
        }
    }
    
    // Writing 2: iterator
    val consumer = GlobalScope. launch {
        val iterator = channel. iterator()
        while(iterator.hasNext()){
            // take over
            val element = iterator. next()
            Log.d("liduo", "$element")
        }
    }
    
    // Writing method 3: Enhanced for loop
    val consumer = GlobalScope. launch {
        for(element in channel){
            Log.d("liduo", "$element")
        }
    }
    
    // Since the above coroutine is not lazy, it will start to execute immediately after creation
    // In other words, when the code comes here, the above two coroutines have started to work
    // The join method will suspend the current coroutine, not the two coroutines already started above
    // In the Android environment, the following two lines of code do not need to be added
    // producer. join()
    // consumer. join()
}

The above example is a classic producer-consumer model. In writing method 1, since the send method and receive method are modified by the suspend keyword, by default, when the production speed does not match the consumption speed, calling these two methods will cause the coroutine to hang.
Additionally,Channel supports receiving using iterators. Among them, the hasNext method may also cause the coroutine to hang.

The Channel object should be closed in time when it is not in use. It can be closed by the sender or the receiver, depending on the business scenario.

2) Channel capacity

The Channel method is not the construction method of Channel, but a factory method, the code is as follows:

fun  Channel(capacity: Int = RENDEZVOUS): Channel =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }

The capacity can be specified when creating a Channel:

  • RENDEZVOUS: Create a Channel with a capacity of 0, similar to SynchronousQueue. After send, it will hang until it is received. The enumeration value is 0.
  • UNLIMITED: Create a Channel with unlimited capacity, which is implemented internally through a linked list. The enumeration value is Int.MAX_VALUE.
  • CONFLATED: Create a Channel with a capacity of 1, and the current data will overwrite the previous data. The enumeration value is -1.
  • BUFFERED: Create a Channel with a default capacity. The default capacity is the value specified by the kotlinx.coroutines.channels.defaultBuffer configuration variable. If not configured, the default is 64. The enumeration value is -2.
  • If the value of capacity is not the above enumeration value, create a Channel with specified capacity.

3) produce method and actor method

fun  CoroutineScope. produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope.() -> Unit
): ReceiveChannel
fun CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    start: CoroutineStart = CoroutineStart. DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope.() -> Unit
): SendChannel

Same as the launch method and async method, the coroutine can also be started by using the produce method and actor method. But the difference is that Channe can be used more concisely in the produce method and actor methodl. The sample code is as follows:

// Start the coroutine and return a receiving Channel
val receiveChannel: ReceiveChannel = GlobalScope. produce {
    while(true){
        delay(100)
        // send
        send(1)
    }
}

// Start the coroutine and return a sending Channel
val sendChannel: SendChannel = GlobalScope. actor {
    while(true){
        // take over
        val element = receive()
        Log.d("liduo","$element")
    }
}

The produce method and the actor method internally process the Channel object. When the coroutine is executed, the Channel object is automatically closed.

But at present, the produce method is still in the experimental stage (modified by the ExperimentalCoroutinesApi annotation). The actor method is also obsolete (modified by the ObsoleteCoroutinesApi annotation). Therefore, it is best not to use it in actual development!

4) BroadcastChannel

BroadcastChannel can be used when a sender corresponds to multiple receivers. The code is as follows:

fun  BroadcastChannel(capacity: Int): BroadcastChannel =
    when (capacity) {0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
        UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
        CONFLATED -> ConflatedBroadcastChannel()
        BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayBroadcastChannel(capacity)
    }

When creating a BroadcastChannel object, the capacity must be specified. The receiver obtains the ReceiveChannel object to receive messages by calling the openSubscription method of the BroadcastChannel object. The sample code is as follows:

// Create a BroadcastChannel with a capacity of 5
val broadcastChannel = BroadcastChannel(5)

// create sender coroutine
GlobalScope. launch {
    // send 1
    broadcastChannel. send(1)
    delay(100)
    // send 2
    broadcastChannel. send(2)
    // closure
    broadcastChannel. close()
}.join()

// create receiver 1 coroutine
GlobalScope. launch {
    // Get ReceiveChannel
    val receiveChannel = broadcastChannel. openSubscripton()
    // take over
    for (element in receiveChannel) {
        Log.d("receiver_1: ", "$element")
    }
}.join()

// create receiver 2 coroutine
GlobalScope. launch {
    // Get ReceiveChannel
    val receiveChannel = broadcastChannel. openSubscription()
    // take over
    for (element in receiveChannel) {
        Log.d("receiver_2: ", "$element")
    }
}.join()

Every receiver can receive every message sent by the sender. Use the extension method broadcast to directly convert the Channel object into a BroadcastChannel object, the sample code is as follows:

val channel = Channel()
val broadcastChannel = channel. broadcast(10)

Many methods of BroadcastChannel are also in the experimental stage (decorated by ExperimentalCoroutinesApi annotations), so be careful when using them!

3. Multiplexing

The coroutine provides a select method similar to Nio in Java for multiplexing, the code is as follows:

suspend inline fun  select(crossinline builder: SelectBuilder.() -> Unit): R

Taking the multiplexing of Channel as an example, let’s take a look at the use of the select method. The sample code is as follows:

private suspend fun test() {
    // Create a list of Channels
    val channelList = mutableListOf<Channel>()
    // Suppose there are 5 Channels
    channelList. add(Channel())
    channelList. add(Channel())
    channelList. add(Channel())
    channelList. add(Channel())
    channelList. add(Channel())
    
    // Call the select method, the coroutine hangs
    val result = select {
        // Register and listen to 5 Channels, waiting to receive
        channelList.forEach {
            it.onReceive
        }
    }
    // When any one of the 5 Channels receives a message, select suspends and resumes
    // and assign the return value to result
    Log.d("liduo", "$result")
}

In addition, there are many interfaces in the coroutine that define methods named “onXXX”, such as the onJoin method of the Job interface and the onAwait method of the Deferred interface, which are used to cooperate with the select method for multiplexing use.

4. Sequence Generator

The sequence method is provided in the coroutine to generate a sequence. The sample code is as follows:

private suspend fun test() {
    // Create a sequence that can output odd numbers, the generic type can be omitted, and it will be automatically inferred
    val singleNumber = sequence {
        val i = 0
        while (true) {
            // Call the yield method where output is required
            yield(2 * i - 1)
        }
    }

    // Call the iterator to get the output of the sequence
    singleNumber.iterator().forEach {
        Log.d("liduo", "$it")
    }

    // Get the first five items of the sequence, and iterate the output
    singleNumber.take(5).forEach {
        Log.d("liduo", "$it")
    }
}

Calling the yield method will suspend the coroutine and output the value currently generated by this sequence. In addition, you can also call the yieldAll method to output the collection of values ​​generated by the sequence. The sample code is as follows:

private suspend fun test() {
    // Create a sequence that can output odd numbers, the generic type can be omitted, and it will be automatically inferred
    val singleNumber = sequence {
        yieldAll(listOf(1,3,5,7))
        yieldAll(listOf(9,11,13))
        yieldAll(listOf(15,17))
    }
    
    // Call the iterator to get the output of the sequence, up to 9 items
    singleNumber. iterator().forEach {
        Log.d("liduo", "$it")
    }

    // Get the first five items of the sequence, and iterate the output
    singleNumber.take(5).forEach {
        // 1, 3, 5, 7, 9
        Log.d("liduo", "$it")
    }
}

5. Coroutine asynchronous flow

The coroutine provides a responsive programming API similar to RxJava – Flow (officially called asynchronous cold data flow, and the official method for creating hot data flow is also provided).

1) Basic usage

// called on the main thread
GlobalScope. launch(Dispatchers. Main) {
    // create stream
    flow {
        // hang up, output the return value
        emit(1)
      // Set the thread for stream execution and consume the stream
    }.flowOn(Dispatchers.IO).collect {
            Log.d("liduo", "$it")
        }
}.join()

The emit method is a suspending method, similar to the yield method in the sequence, which is used to output the return value. The flowOn method is equivalent to the subscribeOn method in Rxjava, which is used to switch the thread of flow execution. In order to avoid understanding confusion, Flow does not provide a similar observeOn method in Rxjava, but it can be determined by specifying the context parameter of the coroutine where the flow is located. The collect method is equivalent to the subscribe method in RxJava for triggering and consuming streams.

A stream can be consumed multiple times, as shownThe sample code is as follows:

GlobalScope.launch(Dispatchers.IO) {
    val mFlow = flow {
        emit(1)
    }.flowOn(Dispatchers.Main)

    mFlow.collect { Log.d("liduo1", "$it") }
    mFlow.collect { Log.d("liduo2", "$it") }
}.join()

2) Exception handling

Flow supports exception handling similar to try-catch-finally. The sample code is as follows:

flow {
    emit(1)
    // Throw an exception
    throw NullPointerException()
}.catch { cause: Throwable ->
    Log.d("liduo", "${cause. message}")
}.onCompletion { cause: Throwable? ->
    Log.d("liduo", "${cause?.message}")
}

The catch method is used to catch exceptions. The onCompletion method is equivalent to the finally code block. Kotlin does not recommend catching exceptions directly through try-catch-finally code blocks in flow!

Flow also provides operations similar to RxJava’s onErrorReturn method, the sample code is as follows:

flow {
    emit(1)
    // Throw an exception
    throw NullPointerException()
}.catch { cause: Throwable ->
    Log.d("liduo", "${cause. message}")
    emit(-1)
}

3) Trigger separation

Flow supports the consumption of written flow in advance, and then triggers the consumption operation when necessary. The sample code is as follows:

// Method of creating Flow
fun myFlow() = flow {
    // production process
    emit(1)
}.onEach {
    // consumption process
    Log.d("liduo", "$it")
}

suspend fun main() {
    // Writing 1
    GlobalScope. launch {
        // trigger consumption
        myFlow(). collect()
    }.join()
    
    // Writing 2
    myFlow().launchIn(GlobalScope).join()
}

4) Note

  • Flow does not provide a method to cancel collect. If you want to cancel the execution of the flow, you can directly cancel the coroutine where the flow is located.
  • The emit method is not thread-safe, so do not call withContext and other methods in the flow to switch the scheduler. If you need to switch, you can use channelFlow.

6. Global context

In this article, GlobalScope is used to start the coroutine, but in the actual development process, GlobalScope should not be used. GlobalScope will open a new coroutine scope, and it is not under our control. Assuming that when the Activity page is closed, the coroutines in it have not finished running, and we cannot cancel the execution of the coroutines, which may cause memory leaks. Therefore, in actual development, you can customize a global coroutine scope, or at least write code as follows:

// Implement the CoroutineScope interface
class MainActivity : AppCompatActivity(), CoroutineScope by MainScope() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R. layout. activity_main)
        // Start the coroutine directly
        launch {
            Log.d("liduo", "launch")
        }
    }

    override fun onDestroy() {
        super. onDestroy()
        // Cancel the top-level parent coroutine
        cancel()
    }
}

The code of MainScope is as follows:

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

Dispatchers.Main means scheduling in the main thread, and SupervisorJob() means sub-coroutines cannot be canceledWill affect the parent coroutine.

Leave a Reply

Your email address will not be published. Required fields are marked *