It’s time to unravel the truth about Kotlin coroutine scheduling and switching threads

Foreword

Coroutine series articles:

  • A short story to explain the relationship between processes, threads, and Kotlin coroutines?
  • Young man, do you know what Kotlin coroutines looked like at first?
  • To be honest, the suspend/resume of Kotlin coroutines is not so mysterious (story)
  • To be honest, the suspend/resume of Kotlin coroutines is not so mysterious (principle)
  • It’s time for Kotlin coroutine scheduling and switching threads to unravel the truth
  • Kotlin coroutine thread pool exploration tour (with Java thread pool PK)
  • Kotlin Coroutine Cancellation and Exception Handling Exploration Tour (Part 1)
  • Kotlin Coroutine Cancellation and Exception Handling Exploration Tour (Part 2)
  • Come, Kotlin runBlocking/ with me launch/join/async/delay principle & use
  • Come on, come with me in the deep water area of ​​Kotlin Channel
  • Kotlin coroutine Select: see how I multiplex
  • Kotlin Sequence is time to come in handy
  • Kotlin Flow backpressure and thread switching are so similar
  • Kotlin Flow, where are you going?
  • How hot is the Kotlin SharedFlow&StateFlow heat flow?

In the previous article, we demonstrated how to start coroutines, suspend and resume coroutines through relatively basic methods. It does not involve how to switch thread execution, and a coroutine without the function of switching threads has no soul.
This article will focus on analyzing how coroutines switch thread execution and how to return to the original thread execution.
Through this article, you will learn:

  1. How to specify the thread that the coroutine runs?
  2. Principle of coroutine scheduler
  3. Selection of thread when coroutine resumes

1. How to specify the thread on which the coroutine runs?

Common methods for switching threads in Android

Conventional means

The usual means of switching to the main thread: Activity.runOnUiThread(xx), View.post(xx), Handler.sendMessage(xx) and other simple methods. There are also some frameworks, such as AsyncTask, RxJava, thread pool, etc.
They essentially use the Looper+Handler function.
Look at a demo first, get student information in the sub-thread, switch to the main thread to display after getting the result:

 private inner class MyHandler : Handler(Looper.getMainLooper()) {
        override fun handleMessage(msg: Message) {
            //The main thread pops up toast
            Toast.makeText(context, msg.obj.toString(), Toast.LENGTH_SHORT).show()
        }
    }

    // Get student information
    fun showStuInfo() {
        thread {
            //Simulate network requests
            Thread. sleep(3000)
            var handler = MyHandler()
            var msg = Message. obtain()
            msg.obj = "I am a murloc"
            //Send to the main thread for execution
            handler. sendMessage(msg)
        }
    }

We know that the Android UI refresh is event-driven. The main thread keeps trying to get the pending event from the event queue. This is also the core function of Looper, which continuously detects the event queue, and putting events into the queue is operated through the Handler.

The child thread stores events in the queue through the Handler, and the main thread is traversing the queue. This is a process in which the child thread switches to the main thread to run.

Of course, because the main thread has a message queue, if you want to throw an event to the sub-thread for execution, just construct a message queue in the sub-thread.

The coroutine switches to the main thread

The same function can be achieved with coroutines:

 fun showStuInfoV2() {
        GlobalScope. launch(Dispatchers. Main) {
            var stuInfo = withContext(Dispatchers.IO) {
                //Simulate network request
                Thread. sleep(3000)
                "I'm a little murloc"
            }

            Toast. makeText(context, stuInfo, Toast.LENGTH_SHORT). show()
        }
    }

Obviously, coroutines are too concise.
Compared with conventional methods, the coroutine does not need to display the construction thread, nor does it need to display the sending through the Handler, and receive and display the information in the Handler.
We have reason to guess that the internal coroutine is also switched to the main thread through Handler+Looper.

Coroutine switching thread

Of course, coroutines can not only switch from sub-threads to main threads, but also from main threads to sub-threads, and even switch between sub-threads.

 fun switchThread() {
        println("I am in a thread, ready to switch to the main thread")
        GlobalScope. launch(Dispatchers. Main) {
            println("I am in the main thread, ready to switch to the child thread")
            withContext(Dispatchers.IO) {
                println("I am in the child thread, ready to switch to the child thread")
                withContext(Dispatchers. Default) {
                    println("I am in the child thread, ready to switch to the main thread")
                    withContext(Dispatchers. Main) {
                        println("I'm on the main thread")
                    }
                }
            }
        }
    }

Whether it is the launch() function or the withContext() function, as long asWe specify the running thread, then the coroutine will run on the specified thread.

2. Principle of coroutine scheduler

Specify the thread that the coroutine runs

Next, starting from the source code of launch(), we will explore how the coroutine switches threads step by step.
launch() concise writing:

 fun launch1() {
        GlobalScope. launch {
            println("launch default")
        }
    }

The launch() function has three parameters, the first two parameters have default values, and the third is our coroutine body, which is the content in the curly braces of GlobalScope.launch.

#Builders.common.kt
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart. DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //Construct a new context
    val newContext = newCoroutineContext(context)
    //construct completion
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //Start the coroutine
    coroutine.start(start, coroutline, block)
    return coroutine
}

Then look at the implementation of newCoroutineContext:

#CoroutineContext.kt
actual fun CoroutineScope. newCoroutineContext(context: CoroutineContext): CoroutineContext {
    //In the Demo environment coroutineContext = EmptyCoroutineContext
    val combined = coroutineContext + context
    //DEBUG = false
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID. incrementAndGet()) else combined
    //No dispatcher is specified, the default dispatcher is: Dispatchers.Default
    //If a distributor is specified, use the specified
    return if (combined !== Dispatchers. Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers. Default else debug
}

This part involves the operation of some overloaded operators of CoroutineContext. I won’t go deep into CoroutineContext this time, just understand its meaning.

Just need to know:
CoroutineContext stores the coroutine distributor.

What are the distributors for coroutines?

Dispatchers. Main

UI thread, the main thread in Android

Dispatchers.IO

IO thread, mainly performs IO operations

Dispatchers. Default

Mainly perform CPU-intensive operations, such as some computational tasks

Dispatchers. Unconfined

Do not specify the thread used

Specify the coroutine to run on the main thread

Do not use default parameters, specify the dispatcher of the coroutine:

 fun launch1() {
        GlobalScope. launch(Dispatchers. Main) {
            println("I am executing on the main thread")
        }
    }

Take this as an example and continue to analyze its source code.
As mentioned above, use the coroutine.start(start, coroutine, block) function to start the coroutine:


    #AbstractCoroutine.kt
    fun  start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        //start is the function in CoroutineStart
        // will eventually be called to invoke
        start(block, receiver,this)
    }
    #CoroutineStart.kt
    public operator fun  invoke(block: suspend R.() -> T, receiver: R, completion: Continuation): Unit =
        when (this) {
            //this refers to StandaloneCoroutine, which defaults to default
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        }

CoroutineStart.DEFAULT and CoroutineStart.ATOMIC represent the start mode of the coroutine, among which DEFAULT means immediate start, which is also the default start mode.

The next step is to call a series of startup functions through the block. We have analyzed this part in detail before, so let’s briefly explain it here:

block represents the coroutine body, and its actual compilation result is: an anonymous inner class, which inherits from SuspendLambda, and SuspendLambda indirectly implements the Continuation interface.

Continue to see the call of the block:

#Cancellable.kt
//block extension function
internal fun  (suspend (R) -> T). startCoroutineCancellable(
    receiver: R, completion: Continuation,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    //runSafely is a high-order function, which calls the content in "{}"
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
    }

The process flow is transferred to the createCoroutineUnintercepted() function. In Juvenile, do you know what the Kotlin coroutine looks like at the beginning? It has been analyzed in key points: this function is the place where the coroutine body is actually created.

Directly upload the code:

#IntrinsicsJvm.kt
actual fun  (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation
): Continuation {
    //Pack completion
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)// Create coroutine body class
       //receiver completion is a coroutine body object StandaloneCoroutine
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation, Any?>).invoke(receiver, it)
        }
    }
}

The function of this function is to create a coroutine body class, which we call MyAnnoy for the time being.

class MyAnnoy extends SuspendLambda implements Function2 {
    @Nullable
    @Override
    protected Object invokeSuspend(@NotNull Object o) {
        //...coroutine body logic
        return null;
    }
    @NotNull
    @Override
    public Continuation create(@NotNull Continuation completion) {
        //... create MyAnnoy
        return null;
    }
    @Override
    public Object invoke(Object o, Object o2) {
        return null;
    }
}

After the new MyAnnoy is created, call the intercepted(xx) function, which is very important:

#Intrinsics.Jvm.kt
public actual fun  Continuation. intercepted(): Continuation =
    //Judging if it is ContinuationImpl, it will be converted to ContinuationImpl type
    //Then call the intercepted() function
    (this as? ContinuationImpl)?. intercepted() ?: this

Why should MyAnnoy be converted to ContinuationImpl here?
Because it calls the intercepted() function in ContinuationImpl:

#ContinuationImpl.kt
public fun intercepted(): Continuation =
    intercepted
        //1. If intercepted is empty, fetch data from context
        //2. If the context cannot be obtained, return itself, and finally assign a value to intercepted
        ?: (context[ContinuationInterceptor]?. interceptContinuation(this) ?: this)
            .also { intercepted = it }

Look at the intercepted variable type first:

#ContinuationImpl.kt
    private var intercepted: Continuation? = null

is still a Continuation type, initially intercepted = null.
context[ContinuationInterceptor] means to take out the Element whose key is ContinuationInterceptor from CoroutineContext.
Since it needs to be taken out, when it has to be put in, when did you put it in?

The answer is:

newCoroutineContext(context) constructs a new CoroutineContext, which stores the dispatcher.

Because we set the distribution in the main thread: Dispatchers.Main, so context[ContinuationInterceptor] takes out Dispatchers.Main.

Dispatchers.Main definition:

#Dispatchers.kt
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader. dispatcher
#MainCoroutineDispatcher.kt
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}

MainCoroutineDispatcher inherits from CoroutineDispatcher, and there is a function in it:

#CoroutineDispatcher.kt
    public final override fun  interceptContinuation(continuation: Continuation): Continuation =DispatchedContinuation(this, continuation)

And Dispatchers.Main calls the interceptContinuation(xx) function.
The input parameter of this function is the Continuation type, which is the MyAnnoy object. The content of the function is very simple:

  • Construct the DispatchedContinuation object, and the incoming parameters are Dispatchers.Main and MyAnnoy objects.
  • Dispatchers.Main and MyAnnoy are assigned to member variables dispatcher and continuation respectively.

DispatchedContinuation inherits from DispatchedTask, which inherits from SchedulerTask, which is essentially Task. Task implements the Runnable interface:

#Tasks.kt
internal abstract class Task(
    @JvmField var submissionTime: Long,
    @JvmField var taskContext: TaskContext
) : Runnable {
    //...
}

So far, we focus on implementing the run() function in the Runnable interface.

Looking back at the DispatchedContinuation after constructing it, call the resumeCancellableWith() function:

#DispatchedContinuation.kt
    override fun resumeWith(result: Result) {val context = continuation.context
        val state = result.toState()
        //Need to distribute
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            //Call the dispatcher to distribute
            dispatcher. dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation. resumeWith(result)
                }
            }
        }
    }

The dispatcher here in the Demo is Dispatchers.Main.

Okay, let’s summarize the function of the launch() function:

Dispatchers.Main implementation

Next, let’s take a look at how Dispatchers.Main distributes tasks, first look at its implementation:

#MainDispatcherLoader.java
internal object MainDispatcherLoader {

    //default trueprivate val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)

    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
    / / Construct the main thread distribution
    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                //load dispatcher factory①
                FastServiceLoader. loadMainDispatcherFactory()
            } else {
                ...
            }
            //Through the factory class, create a distributor②
            factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            ...
        }
    }
}

Look at ① first:

#FastServiceLoader.kt
    internal fun loadMainDispatcherFactory(): List {
        val clz = MainDispatcherFactory::class.java
        //...
        return try {
            // Reflection construction factory class: AndroidDispatcherFactory
            val result = ArrayList(2)
            FastServiceLoader.createInstanceOf(clz,
                "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) }
            FastServiceLoader.createInstanceOf(clz,
                "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
            result
        } catch (e: Throwable) {
            //...
        }
    }

The factory class returned by this function is: AndroidDispatcherFactory.

Look at ② again, after getting the factory class, it’s time to use it to create specific entities:

#HandlerDispatcher.kt
    internal class AndroidDispatcherFactory : MainDispatcherFactory {
        //Rewrite the createDispatcher function and return HandlerContext
        override fun createDispatcher(allFactories: List) =
            HandlerContext(Looper. getMainLooper(). asHandler(async = true), "Main")
        //...
    }

//definition
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invoke Immediately: Boolean
) : HandlerDispatcher(), Delay {
}

The HandlerContext is finally created.
HandlerContext inherits from class: HandlerDispatcher

#HandlerDispatcher.kt
sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay {
    //Rewrite dispatch function
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // Throw to the main thread for execution, the handler is the Handler of the main thread
        handler. post(block)
    }
}

Obviously, DispatchedContinuation uses dispatcher.dispatch() for distribution, and dispatcher is Dispatchers.Main, and the final implementation is HandlerContext.
So the dispatch() function calls the HandlerDispatcher.dispatch() function, which throws the block to the main thread for execution.
Why is the block?
block is actually a DispatchedContinuation object. From the above analysis, it can be seen that it indirectly implements the Runnable interface.
Check out its implementation:

#DispatchedTask.kt
override fun run() {
    val taskContext = this.taskContext
    var fatalException: Throwable? = null
    try {
        //delegate is DispatchedContinuation itself
        val delegate = delegate as DispatchedContinuation
        //delegate.continuation is our coroutine body MyAnnoy
        val continuation = delegate. continuation
        withContinuationContext(continuation, delegate. countOrElement) {
            val context = continuation.context
            //...
            val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
            if (job != null && !job.isActive) {
                //...
            } else {
                if (exception != null) {continuation. resumeWithException(exception)
                } else {
                    // Execute the coroutine body
                    continuation. resume(getSuccessfulResult(state))
                }
            }
        }
    } catch (e: Throwable) {
        //...
    } finally {
        //...
    }
}

The continuation variable is our coroutine body: MyAnnoy.
MyAnnoy.resume(xx) We are very familiar with this function, let’s get familiar with it again:

#ContinuationImpl.kt
override fun resumeWith(result: Result) {
    // This loop unrolls recursion in current. resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        with(current) {
            //completion is the StandaloneCoroutine defined at the beginning
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result; =
                try {
                    // Execute the code in the coroutine body
                    val outcome = invokeSuspend(param)
                    if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) return
                    kotlin.Result.success(outcome)
                } catch (exception: Throwable) {
                    kotlin.Result.failure(exception)
                }
            //...
        }
    }
}

invokeSuspend(param) calls the code in the coroutine body, that is, the content in the curly braces of launch, so the content inside is executed by the main thread.

Let’s take a look at the execution steps of the launch(Dispatchers.Main) function as follows:

  1. Distributor HandlerContext is stored in CoroutineContext (coroutine context).
  2. Construct the DispatchedContinuation distributor, which holds the variable dispatcher=HandlerContext, continuation=MyAnnoy.
  3. DispatchedContinuation calls dispatcher(HandlerContext) for distribution.
  4. HandlerContext throws Runnable(DispatchedContinuation) to the main thread.

After the above steps, the launch(Dispatchers.Main) task is completed, As for when Runnable is executed, it has nothing to do with it.

When the Runnable is executed in the main thread, the continuation(MyAnnoy) is taken out from the DispatchedContinuation, and the continuation.resume() function is called, and then the MyAnnoy.invokeSuspend() function is executed, and finally the content in the launch{} coroutine body is executed.
So the coroutine is happily executed on the main thread.

Old rules, combine code and function call graph:

3. Thread selection when coroutine resumes

Taking the main thread as an example, we know how the coroutine specifies the thread to run.
Imagine another scenario:

If the sub-thread execution is switched in the coroutine, will the sub-thread return to the main thread to execute after the sub-thread is executed?

Reform the above Demo:

 fun launch2() {
        GlobalScope. launch(Dispatchers. Main) {
            println("I am executing on the main thread")
            withContext(Dispatchers.IO) {
                println("I am executing in the child thread")//②
            }
            println("Which thread am I executing in?")//③
        }
    }

Guess what is the answer to ③? Is it the main thread or the child thread?

withContext(xx)The first part of the function (to be honest, the suspension of Kotlin coroutines is not so mysterious (principle)) has been analyzed in depth, it is a suspension function, the main function:

Switch the thread to execute the coroutine.

MyAnnoy1 corresponds to coroutine body 1, which is the parent coroutine body.
MyAnnoy2 corresponds to coroutine body 2, which is the sub-coroutine body.
When the execution of ② is completed, it will switch to the execution of the parent coroutine. Let’s take a look at the process of switching the parent coroutine.
The execution of each coroutine has to go through the following function:

#BaseContinuationImpl.kt
override fun resumeWith(result: Result) {
    //...
    while (true) {
        //..
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result =
                try {
                    // Execute the coroutine body
                    val outcome = invokeSuspend(param)
                    if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) returnkotlin.Result.success(outcome)
                } catch (exception: Throwable) {
                    kotlin.Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                //...
            } else {
                //If the coroutine body of the previous step is not blocked, execute completion
                completion. resumeWith(outcome)
                return
            }
        }
    }
}

Here, take the execution of the coroutine body of the withContext(xx) function as an example. What is its completion?
As mentioned above, when launch() starts a coroutine, the completion of its coroutine body is StandaloneCoroutine, that is to say, MyAnnoy1.completion = StandaloneCoroutine.
From the source code of withContext(xx), its completion is DispatchedCoroutine, DispatchedCoroutine, which inherits from ScopeCoroutine, and ScopeCoroutine has a member variable: uCont: Continuation.
When constructing DispatchedCoroutine, the incoming coroutine body is assigned to uCont.
is DispatchedCoroutine.uCont = MyAnnoy1, MyAnnoy2. completion = DispatchedCoroutine.

At this point, the child coroutine body is associated with the parent coroutine through DispatchedCoroutine.

Therefore completion.resumeWith(outcome)==DispatchedCoroutine.resumeWith(outcome).
Just check the latter to achieve it:

#AbstractCoroutine.kt
    public final override fun resumeWith(result: Result) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }

#Builders.common.kt
In the #DispatchedCoroutine class
    override fun afterResume(state: Any?) {
        //uCont is the body of the parent coroutine
        uCont. intercepted(). resumeCancellableWith(recoverResult(state, uCont))
    }

At this point, it suddenly becomes clear, uCont.intercepted() finds its interceptor, because uCont is MyAnnoy1, its interceptor is HandlerContext, and it is thrown back to the main thread for execution again.

Therefore, the answer to ③ in the demo above is:

It executes on the main thread.

In summary, just two steps:

  1. The parent coroutine is executed on the main thread, and the method that is suspended in the middle is switched to the child thread (child coroutine) for execution.
  2. When the child coroutine is executed, find the coroutine body of the parent coroutine and continue to distribute it according to the original rules.

Old rules, there are codes, pictures and truth:

So far, the principle of switching to the main thread execution has been analyzed.

Curious friends may ask: Your examples are switching from sub-threads to main threads. What if sub-threads switch from sub-threads to sub-threads?
Switching to the main thread depends on the Handler, while switching to the sub-threads depends on the thread pool.
Now that this point has been mentioned, here is another question:

 fun launch3() {
        GlobalScope.launch(Dispatchers.IO) {
            withContext(Dispatchers. Default) {
                println("Which thread am I running on")
                delay(2000)
                println("Which thread am I running after delay")
            }
            println("Which thread am I running in")
        }
    }

Do you know the answer to the above?

Our next article will focus on analyzing the scheduling principle of the coroutine thread pool, through which you willwill know the answer above.

This article is based on Kotlin 1.5.3, please click here for the complete Demo

If you like it, please like and follow, your encouragement is my motivation

Continuously updating, step by step with me system, in-depth study of Android/Kotlin

1. The past and present of Android’s various Contexts
2. Android DecorView must be known
3. Window/WindowManager must-know things
4. View Measure/Layout/Draw is true Understood
5. A full set of Android event distribution services
6. Thoroughly clarify Android invalidate/postInvalidate/requestLayout
7. How to determine the size of Android Window/the reason for multiple executions of onMeasure()
8. Android event-driven Handler-Message-Looper analysis
9. Android keyboard can be done with one trick
10. Android coordinates are completely clear
11. Background of Android Activity/Window/View
/> 12. Android Activity creation to View display
13. Android IPC series
14. Android storage series
15. Java concurrent series no longer doubts
16. Java thread Pool series
17. Android Jetpack pre-basic series
18. Android Jetpack easy to learn and understand series
19. Kotlin easy entry series

Leave a Reply

Your email address will not be published. Required fields are marked *