Implementation principle of Kotlin coroutine
Foreword
This article analyzes the implementation principle of coroutines in Kotlin/JVM
.
First look at the suspend keyword
The following example simulates a network request:
class Temp {
suspend fun fetchData(argument: String): Boolean {
val result = netRequest(argument)
return result == 0
}
// Simulate a network request
suspend fun netRequest(argument: String): Int {
delay(1000)
return argument. length
}
}
These two methods are decorated with the suspend
keyword. We will decompile the bytecode of this file into Java
code with the same effect:
public final class Temp {
@Nullable
public final Object fetchData(@NotNullString argument, @NotNull Continuation var2) {
Object $continuation;
label25: {
if (var2 instanceof ) {
$continuation = ()var2;
if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
(()$continuation).label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new ContinuationImpl(var2) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.fetchData((String)null, this);
}
};
}
Object $result = (()$continuation).result;
Object var6 = IntrinsicsKt. getCOROUTINE_SUSPENDED();
Object var10000;
switch((()$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
(()$continuation).label = 1;
var10000 = this.netRequest(argument, (Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number)var10000).intValue();
return Boxing. boxBoolean(result == 0);
}
@Nullable
public final Object netRequest(@NotNull String argument, @NotNull Continuation var2) {
Object $continuation;
label20: {
if (var2 instanceof ) {
$continuation = ()var2;
if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
(()$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl(var2) {
// $FF: synthetic field
Object result;
int label;
Object L$0;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp.this.netRequest((String)null, this);
}
};
}
Object $result = (()$continuation).result;
Object var5 = IntrinsicsKt. getCOROUTINE_SUSPENDED();switch((()$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
(()$continuation).L$0 = argument;
(()$continuation).label = 1;
if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
return var5;
}
break;
case 1:
argument = (String)(()$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing. boxInt(argument. length());
}
}
A few lines of coroutine-related codes correspond to so many Java
codes. It can be seen that the kotlin
compiler has done a lot for us.
The readability of the above code is not high. For example, there are undefined types such as . I use
jd-gui
to Temp.class
The file was decompiled again, After getting more information, I integrated the above decompiled code with the information obtained from jd-gui
decompilation, and appropriately renamed some classes and variables, Get the more complete and readable “Java
code corresponding to Temp.class
decompiled”, first of all about fetchData
of:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label25:
{
if (completion instanceof FetchDataStateMachine) {
$continuation = (FetchDataStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new FetchDataStateMachine(completion);
}
Object $result = $continuation. result;
Object resultTemp;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);$continuation.label = 1;
resultTemp = this.netRequest(argument, (Continuation) $continuation);
if (resultTemp == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
resultTemp = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number) resultTemp).intValue();
return Boxing. boxBoolean(result == 0);
}
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label|= Integer.MIN_VALUE;
return Temp. this. fetchData(null, (Continuation) this);
}
}
netRequest
related codes and fetchData
related codes are similar in structure and form:
public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label20:
{
if (completion instanceof NetRequestStateMachine) {
$continuation = (NetRequestStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new NetRequestStateMachine(completion);
}
Object $result = $continuation. result;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);$continuation. functionParameter = argument;
$continuation.label = 1;
if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
argument = (String) ($continuation. functionParameter);
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing. boxInt(argument. length());
}
static final class NetRequestStateMachine extends ContinuationImpl {
Object result;
int label;
Object functionParameter;
NetRequestStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;this.label |= Integer.MIN_VALUE;
return Temp.this.netRequest(null, (Continuation) this);
}
}
It can be found that in the decompiled Java
code, the fetchData
and netRequest
methods both have a Continuation completion
code> parameter, which is done by Kotlin Compiler
for us. For functions modified by suspend
, Kotlin Compiler
will help you when compiling We pass in a Continuation
parameter in this function, and use the Continuation
parameter instead of the suspend
modifier. What is the meaning of this parameter? Woolen cloth?
First knowledge continuation
Continuations are a key to understanding how coroutines work.
Look at traditional network requests first:
data class User(val id: Long, val name: String)
interface Callback {
fun success(user: User)
fun failure(t: Throwable)
}
class Model {
fun getUserInfo(callback: Callback) {
Thread.sleep(1000) // simulate network request
callback.success(User(1, "giagor"))
}
}
class Business {
val model = Model()
fun getUserInfo() {
model.getUserInfo(object : Callback {
override fun success(user: User) {
showMsg(user. toString())
}
override fun failure(t: Throwable) {
showMsg(t. message ?: "")
}
})
}
fun showMsg(msg: String) {
//...
}
}
When using Model
to make a network request, use Callback
to receive the result of the network request, we At this time, we can use Callback
Think of it as a continuation, that is, the continuation of the network request, used to receive the result of the network request.
Use the Continuation
interface in the coroutine to represent a continuation, which represents a continuation after a suspension point, that is, the rest of the code that should be executed after the suspension point:
public interface Continuation {
// The context of the coroutine corresponding to the continuation
public val context: CoroutineContext
// Resume the execution of the corresponding coroutine, and pass a result indicating success or failure as the return value of the last suspension point
public fun resumeWith(result: Result<T>)
}
In Kotlin 1.3
, there is also an extension function that can easily call resumeWith
:
public inline fun Continuation.resume(value: T): Unit =
resumeWith(Result. success(value))
public inline fun Continuation.resumeWithException(exception: Throwable): Unit =
resumeWith(Result. failure(exception))
As mentioned earlier, for functions modified by suspend
, Kotlin Compiler
will help us pass in a Continuation
parameter in the function, Use the Continuation
parameter instead of the suspend
modifier, Through the Continuation
parameter, Kotlin Compiler
can convert our The coroutine code is converted into an equivalent callback code, that is to say, the Kt
compiler has written the callback code for us. As for how to write it for us, we will analyze it later. code>Continuation to control the asynchronous call process is called CPS
transformation (Continuation-Passing-Style Transformation
).
State machine
fetchData
function will generate the following static internal class when compiled (continuation):
static final class FetchDataStateMachine extends ContinuationImpl {
Object result;
int label;
FetchDataStateMachine(Continuation $completion) {
super($completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return Temp. this. fetchData(null, (Continuation) this);
}
}
FetchDataStateMachine
The inheritance relationship is as follows:
FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
FetchDataStateMachine
receives a Continuation
parameter named $completion
, $completion
is saved in the parent classBaseContinuationImpl
:
internal abstract class BaseContinuationImpl(
public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {...}
Through $completion
, the execution result of the fetchData
function can be passed back to the function calling fetchData
, with $completion
has the ability to implement callback.
The state machine FetchDataStateMachine
declares two variables result
and label
result
: Indicates the result of the previousContinuation
, for example, there are functionsA
andB
,ContinuationA
andContinuationB
are declared inside the function,A
callsB
andContinuationA
is passed toB
for saving. In the subsequent callback process,ContinuationA
can get the execution result ofContinuationB::invokeSuspend
from theresult
variable .label
:Kotlin Compiler
can identify where inside the function will be suspended, each suspension point (suspension point
code>) is represented as a state of the state machine (state
), these states are represented by theswitch case
statement.label
indicates which state of the state machine should be executed currently, specifically whichcase
to enter, and is recorded by thelabel
variable The current state of the state machine.
Look at the first half of the code of fetchData
:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
label25:
{
if (completion instanceof FetchDataStateMachine) {
$continuation = (FetchDataStateMachine) completion;
if (($continuation.label & Integer.MIN_VALUE) != 0) {
$continuation.label -= Integer.MIN_VALUE;
break label25;
}
}
$continuation = new FetchDataStateMachine(completion);
}
...
}
It will judge whether the incoming completion
is of FetchDataStateMachine
type, if so, do some operations on its label
variable, if notThen directly create a FetchDataStateMachine
and pass in completion
(completion
will be saved).
Look at the second half of the code of fetchData
:
public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
Object $continuation;
...
Object $result = $continuation. result;
Object resultTemp;
switch ($continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
$continuation.label = 1;
resultTemp = this.netRequest(argument, (Continuation) $continuation);
if (resultTemp == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
resultTemp = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int result = ((Number) resultTemp).intValue();
return Boxing. boxBoolean(result == 0);
}
fetchData
The original code statement of the method will be divided into multiple case
statements under switch
, here is
The label
variable in
FetchDataStateMachine
is to control which case
branch to execute currently.
It can be seen that the function and the continuation form a finite state machine (FSM, namely Finite-State Machine) to control the execution of the coroutine code.
What is “non-blocking suspend”?
In the netRequest
method, delay(1000)
is called to suspend the current coroutine, simply look at the decompilation of the delay
method Code after:
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit. INSTANCE;
} else {// implementation class
CancellableContinuationImpl cancelableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt. intercepted($completion), 1);
cancelableContinuationImpl.initCancellability();
// upcast
CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
if (timeMillis < Long.MAX_VALUE) {
// delay operation
getDelay(cont. getContext()).scheduleResumeAfterDelay(timeMillis, cont);
}
// get execution result
Object result = cancelableContinuationImpl. getResult();
if (result == COROUTINE_SUSPENDED) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
// return result
return result;
}
}
In this method, the delay operation will be performed. If it needs to be suspended, the COROUTINE_SUSPENDED
value will be returned to the caller.
Combining the decompiled code of fetchData
, netRequest
and delay
, we can get the following call graph:
The red line in the figure indicates that the function returns COROUTINE_SUSPENDED
and needs to be suspended. When the delay
method needs to be suspended, it returns COROUTINE_SUSPENDED
, then the netRequest
method returns COROUTINE_SUSPENDED
, then fetchData
method returns COROUTINE_SUSPENDED
, repeating this process until the top of the call stack.
Through this “end method call” method, the coroutine is temporarily not executed on this thread, so that the thread can handle other tasks (including executing other coroutines), which is why the coroutine The suspension will not block the current thread, which is also the origin of “non-blocking suspension”.
How to restore?
Since the coroutine is suspended, there is a corresponding recovery of the coroutine. Let me talk about the conclusion first: the essence of coroutine recovery is to call back the continuation.
I haven’t studied the specific implementation of the delay
function yet,but the delay
function will perform a waiting operation in a certain sub-thread, and when the delay time arrives, it will The resumeWith
method of $completion
passed to the delay
function will be called, that is, the resumeWith
method of NetRequestStateMachine
will be called method. The inheritance relationship and parent class of NetRequestStateMachine
are as follows:
NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation
BaseContinuationImpl
is currently a focus of our analysis, it mainly does the following things:
- Save
completion
: It saves theFetchDataStateMachine
instance of thefetchData
method, so that the continuation can be called back level by level . - Rewrite the
resumeWith
method:BaseContinuationImpl
rewrites theresumeWith
method of theContinuation
interface, This method is used to restore the coroutine and is also the core logic of coroutine recovery.
We look at the definition of the BaseContinuationImpl
class:
internal abstract class BaseContinuationImpl(
public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Perform debug probes on each resumed continuation, allowing the debug library to track exactly which parts of the suspended call stack
// has been restored.
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result. success(outcome)
} catch (exception: Throwable) {
Result. failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion. resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
...
}
The focus is on the implementation of the resumeWith
method, which executes the callback logic under a while(true)
loop. Let’s combine the decompiled code of fetchData
and netRequest
given above to see when the delay time of the delay
function is called NetRequestStateMachine
‘s resumeWith
method, what is the follow-up execution process:
- Execute the
resumeWith
method ofNetRequestStateMachine
parent classBaseContinuationImpl
. - Execute the current continuation which is
NetReThe
(invokeSuspend
method of questStateMachineNetRequestStateMachine
has implemented this method, if you forget it, you can look back at the previous decompiled code). - The
invokeSuspend
method ofNetRequestStateMachine
calls thenetRequest
method, and passes in the continuation itself as a parameter. - In the
netRequest
method, since the type ofcompletion
isNetRequestStateMachine
, you can use the continuation directly , there is no need to create a new continuation body like the first time entering thenetRequest
method. At this time, thelabel
value of the continuation is1
, so enter thecase 1
statement branch ofnetRequest
.
Actually, in this process, there are some calculation and conversion operations on the
label
of the continuation body, but the final value oflabel
is1
, The calculation and conversion operation done does not affect our analysis, so it is not the focus
- From the continuation, take out the parameter that was first passed into the
netRequest
method, that is,argument
, and returnargument.length
. For the convenience of later explanation, the return valueargument.length
is recorded asnetRequest-Return
here. - Then the
netRequest
method ends, NetRequestStateMachine::invokeSuspend method is also executed, andnetRequest-Return
is also used as the return value of theinvokeSuspend
method, which will be passed to theBaseContinuationImpl
In theresumeWith
method, in theresumeWith
method, wrapnetRequest-Return
asResult
and save to theoutcome
variable. - Determine whether
completion
held byNetRequestStateMachine
is ofBaseContinuationImpl
type, we know that the instance it holds is actuallyFetchDataStateMachine
, so it must beBaseContinuationImpl
, so the variable is updated
// Update current to FetchDataStateMachine instance
current = completion
// Update param to outcome (wraps the Result of netRequest-Return)
param = outcome
In this way, the callback can actually be realized, let’s continue to look back.
- Continue to the next round of
while
loop,FetchDataStateMachine::invokeSuspend
will be executed in thewith
block , ininvokeSuspend
, Save the incoming parameterparam
to theresult
variable (In fact, this is similar to the traditional callback, and the traditional callback also needs to save the execution result of the lower layer Call back to the upper layer), and then call thefetchData
method. - In the
fetchData
method, since the incomingcompletion
is already of typeFetchDataStateMachine
, so there is no need to go to Create a new continuation. Since the value of the continuationlabel
is1
at this time, it will enter thecase 1
statement, and thenetRequest
method The execution result is stored in theresultTemp
variable, and finally thefetchData
method ends and the resultresult == 0
is returned. For the convenience of explanation,The execution result of the fetchData
method is recorded asfetchData-Return
. FetchDataStateMachine::invokeSuspend
The method will also end and returnfetchData-Return
, and then inresumeWith of
method,BaseContinuationImpl
In thefetchData-Return
is wrapped asResult
. Then it will judge whether thecompletion
held byFetchDataStateMachine
is of typeBaseContinuationImpl
.- The follow-up direction of the code, we are not clear at present, we have to know to call fetchData method, what will be done in order to know the follow-up code trend.
From the above process analysis, we have a basic understanding of the recovery of the coroutine, and the flow chart is given below to summarize:
Looking at the calling process of the continuation above, it is actually calling the invokeSuspend
method of the continuation layer by layer. From the process point of view, it is a bit like a recursive call, but BaseContinuationImpl:: The implementation of resumeWith
is not the same as recursion. Its implementation is to call the invokeSuspend
method on the continuation once in the while(true)
loop, and then record Its return result is used as the method parameter of the next continuation invokeSuspend
.
Simply speaking, it is calling the invokeSuspend
method of a continuation, and after the execution of this method is completed, the invokeSuspend
method of the next continuation is called. One reason for this is to avoid the call stack being too deep, and there are related comments in BaseContinuationImpl::resumeWith
:
This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
Start a coroutine
We call the fetchData
method in a coroutine:
class Temp2 {
fun execute() {
GlobalScope. launch(Dispatchers. Main) {
Temp(). fetchData("argument")
}
}
}
A coroutine can be started by the launch
method, and its source code is as follows:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart. DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine. start(start, coroutine, block)
return coroutine
}
The code in the coroutine will be packaged as a block
. By default, a StandaloneCoroutine
will be created and its start
method will be called And return StandaloneCoroutine
.
Standalone Coroutine
indirectly implements the Job
interface and Continuation
interface, as follows:
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
public abstract class AbstractCoroutine(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation, CoroutineScope {...}
It can be seen that StandaloneCoroutine
has multiple roles and implements the Job, Continuation, CoroutineScope
interface. The following code tracing can draw a conclusion that the topmost continuation implementation is the coroutine itself, that is, when the coroutine resumes, the continuation body is called back layer by layer, and the topmost continuation is the coroutinecoroutine
itself,That is StandaloneCoroutine
(here we take StandaloneCoroutine
as an example).
In addition, pay attention to the block
block type passed in the launch
method:
block: suspend CoroutineScope.() -> Unit
It is equivalent to the following function type:
// CoroutineScope: converted from extension function
// Continuation: converted from the suspend keyword, the Continuation parameter is passed in by the compiler
block : (CoroutineScope,Continuation) -> Unit
// Or expressed in the form of Function2
block : Function2
Then trace the calling process of starting the coroutine. In the launch
method, the AbstractCoroutine::start
method is called:
public fun start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// Syntactic sugar, actually calls the CoroutineStart.invoke method
start(block, receiver, this)
}
CoroutineStart::invoke
method:
public operator fun invoke(block: suspend R.() -> T, receiver: R, completion: Continuation): Unit =
when (this) {
DEFAULT -> block. startCoroutineCancellable(receiver, completion)
ATOMIC -> block. startCoroutine(receiver, completion)
UNDISPATCHED -> block. startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
From the launch
method, we can know that the default value of CoroutineStart
is CoroutineStart.DEFAULT
, so it will call block
The startCoroutineCancellable
method:
internal fun (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
}
When I followed the code call of createCoroutineUnintercepted
in AS
, I found that it would jump to the IntrinsicsKt.class
file, which was not found in this file The source code of the method, finally found the IntrinsicsJvm.kt
file, and found the source code of the createCoroutineUnintercepted
method, as follows:
# R: CoroutineScope
# T: Unit
@SinceKotlin("1.3")
public actual fun (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation
): Continuation {
// The probeCoroutineCreated method directly returns completion
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation, Any?>).invoke(receiver, it)
}
}
}
Here will determine whether the type of this
is BaseContinuationImpl
, this
is what we passed in launch
before lambda
block, so what type is this lambda
code block? To know the answer, we have to decompile the code given at the beginning of this section
kotlin
Code:
class Temp2 {
fun execute() {
GlobalScope. launch(Dispatchers. Main) {
Temp(). fetchData("argument")
}
}
}
Properly rename and adjust the decompiled java
code to get:
public final class Temp2 {
...
static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation, Object> {
int label;
LaunchLambda(Continuation $completion) {super(2, $completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if ((new Temp()). fetchData("argument", (Continuation) this) == COROUTINE_SUSPENDED)
return COROUTINE_SUSPENDED;
(new Temp()). fetchData("argument", (Continuation) this);
return Unit. INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit. INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation $completion) {
return (Continuation) new LaunchLambda($completion);
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation p2) {
return ((LaunchLambda) create(p1, p2)). invokeSuspend(Unit. INSTANCE);
}
}
}
It can be seen that a static internal class LaunchLambda
is automatically generated in Temp2
, which corresponds to the introduced in the
block. Inheritance relationship of launch
method >lambdaLaunchLambda
(from top to bottom, order from subclass to parent class):
LaunchLambda
-> SuspendLambda // Lambda blocks modified with suspend will inherit from this class
-> ContinuationImpl
-> BaseContinuationImpl // rewrite resumeWith function
-> Continuation
OK, back to the createCoroutineUnintercepted
method, now you can answer the question just raised, lambda
incoming lambda block of type BaseContinuationImpl
? According to the inheritance relationship above, of course it is! Then it will call the create
method of LaunchLambda
, note that the second parameter is passed in completion
(the code says probeCompletion
), it will eventually be saved in the completion
variable of the parent class BaseContinuationImpl
, this completion
parameter is < The StandaloneCoroutine
created in the code>launch method is the coroutine itself, which is the topmost continuation when the coroutine resumes.
A LaunchLambda
instance is obtained by calling the create
method, and the createCoroutineUnintercepted
method is executed and the LaunchLambda
instance is returned. Then the code execution returns to startCoroutineCancellable
, review the method:
internal fun (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
}
There are two parts of the call here, first calling the intercepted
method, and then calling the resumeCancellableWith
method. The intercepted
method is related to the continuation interception mechanism, which will be introduced later. Ignore it here. Here, it is directly considered that the resumeCancellableWith
method of the LaunchLambda
instance is called. , the method is as follows:
public fun Continuation.resumeCancellableWith(
result: Result,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
Then it will go to the resumeWith
method. As mentioned earlier, this method is implemented in the parent class BaseContinuationImpl
, and invokeSuspend
will be called in this method method, the invokeSuspend
method is implemented in LaunchLambda
, as follows:
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch (this.label) {
case 0:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
this.label = 1;
if ((new Temp()). fetchData("argument", (Continuation) this) == COROUTINE_SUSPENDED)
return COROUTINE_SUSPENDED;
(new Temp()). fetchData("argument", (Continuation) this);
return Unit. INSTANCE;
case 1:
ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
return Unit. INSTANCE;
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
At the beginning, the value of label
is 0
, so it will enter the case 0
statement branch, in which the label
will be set The value of >label is 1
, then create a Temp
object andAnd call its fetchData
method, and pass in LaunchLambda
itself as a parameter, that is, the LaunchLambda
instance will be saved in fetchData< In the
completion
variable of the continuation created by the /code> method, it is convenient to call back when the coroutine resumes.
Current holding diagram of the continuation body:
At this point, from starting a coroutine to how the coroutine is finally suspended, we can already connect it in series. In the "How to restore?" section, we haven't analyzed the last few steps of the coroutine recovery. Let's analyze it here, and then the entire process of coroutine recovery can also be linked together.
The follow-up process of coroutine recovery:
- When
FetchDataStateMachine::invokeSuspend
is executed, it will be judged inresumeWith
method ofBaseContinuationImpl
< Whether thecompletion
(i.e.LaunchLambda
) held by code>FetchDataStateMachine is ofBaseContinuationImpl
type depends onLaunchLambda
Inheritance relationship, it is easy to get the answer “yes”, so it will enter the next round ofwhile
loop and call theinvokeSuspend
method ofLaunchLambda
. - Because
label = 1
will entercase 1
statement, which directlyreturn Unit
. Then judge whether thecompletion
(i.e.StandaloneCoroutine
) held byLaunchLambda
is ofBaseContinuationImpl
type, according toStandaloneCoroutine is easy to get the answer "no", so the
resumeWith
method ofStandaloneCoroutine
will be called. StandaloneCoroutine
'sresumeWith
method is implemented in the parent classAbstractCoroutine
:
public final override fun resumeWith(result: Result) {
val state = makeCompletingOnce(result.toState())
// If waiting for the child coroutine to complete, return
if (state === COMPLETING_WAITING_CHILDREN) return
// should do some follow-up processing
afterResume(state)
}
At this time, the topmost continuation (the coroutine itself) is also restored.
The execution of the BaseContinuationImpl::resumeWith
method is completed, and the recovery of the entire coroutine is also completed.
Complete and improve on the basis of the previous flow chart:
1. The flowchart of coroutine calling from top to bottom ( coroutine suspended)
The blue text and lines represent new additions, and the red text and lines represent pending processes.
2. The flow chart of coroutine bottom-up recovery (coroutine recovery)
The blue text and lines represent new additions, and the orange text and lines represent the end of method calls.
Coroutine context
The coroutine context CoroutineContext
defines the behavior of the coroutine. It records the information held by the current coroutine and is an important data object in the coroutine operation. CoroutineContext
is an interface:
public interface CoroutineContext {...}
In the continuation, there is information about CoroutineContext
:
public interface Continuation {
// The context of the coroutine corresponding to the continuation
public val context: CoroutineContext
// Resume the execution of the corresponding coroutine, and pass a result indicating success or failure as the return value of the last suspension point
public fun resumeWith(result: Result)
}
The following elements are all elements of "coroutine context":
Job
: Control the life cycle of the coroutine.CoroutineDispatcher
: Dispatches work to the appropriate thread.CoroutineName
: The name of the coroutine, which can be used for debugging.CoroutineExceptionHandler
: Handle uncaught exceptions.
CoroutineContext
can be regarded as a collection of CoroutineContext.Element
, and each element in the collection can be located using CoroutineContext.Key
, and the Key
of each element is different.
CoroutineContext.Element
definition:
public interface Element : CoroutineContext {...}
You can see that Element
itself also implements the CoroutineContext
interface, which is very strange. It seems that Int
implements List The
interface is the same, Why is the element itself a collection? In fact, this is mainly for the convenience of API design. In this case, an element such as Job
can also be directly used as a CoroutineContext
instead of creating a containing only one element. >List
, multiple elements can also be spliced by "+", such as:
scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}
The "+" here is actually operator overloading, corresponding to the plus
method declared by CoroutineContext
:
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
The "coroutine context" stores elements in a clever way. It doesn't create a collection internally, and each position of the collection stores an element. It uses a CombinedContext
structure to achieve data access, the definition of CombinedContext
and the get
method:
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
override fun get(key: Key): E? {
var cur = this
while (true) {
cur.element[key]?.let { return it }
val next = cur. left
if (next is CombinedContext) {cur = next
} else {
return next[key]
}
}
}
...
}
It can be seen from the constructor that it contains two parts: left
and element
. That is to say, a CombinedContext
may contain multiple elements inside.
- left: It may be an ordinary context element (
CoroutineContext.Element
), or it may be anotherCombinedContext
(which contains multiple context elements). - element: A coroutine context element.
In the get
method of CombinedContext
, there is a while(true)
loop, the execution process is as follows:
- It will first judge whether the current
element
element matches the incomingkey
, if yes, return the element directly, otherwise getleft
section. - If
left
is aCombinedContext
part, repeat step 1 for theleft
variable. - If
left
is not part ofCombinedContext
, then directly call itsget
method to get the element (if not, returnnull
).
In addition, it can also be seen that element
precedes left
is accessed, so the farther to the right the context element has the higher priority.
Key
is used to identify the coroutine context element, see its definition:
public interface CoroutineContext {
...
public interface Key
public interface Element : CoroutineContext {
// Key used to identify the element
public val key: Key
...
}
}
CoroutineContext.Element
has an abstract class implementation, which allows us to implement context elements more conveniently:
public abstract class AbstractCoroutineContextElement(public override val key: Key) : Element
Taking CoroutineName
as an example, analyze how to implement a coroutine context element:
public data class CoroutineName(
val name: String
/* CoroutineName.Key can be abbreviated as CoroutineName */
) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key
...}
First declare that the parameter passed to the parent class AbstractCoroutineContextElement
is CoroutineName.Key
, but it can be abbreviated as CoroutineName
. In fact, this is also easy to understand. In Kotlin
, when we call the method of the companion object, we can omit the class name of the companion object. The same is true here.
CoroutineName
internally declares a companion object Key
inherited from CoroutineContext.Key
, and passes it to the parent class as a construction parameterAbstractCoroutineContextElement
, as the Key
of the coroutine context element.
The above is a common way to implement the coroutine context element, that is, define a companion object in the coroutine context element, and use the companion object as Key
to identify the context element.
Finally, look at the complete definition of CoroutineContext
:
public interface CoroutineContext {
// get element by key
public operator fun get(key: Key): E?
// translated as "folding", it is related to the accumulation of context elements
public fun fold(initial: R, operation: (R, Element) -> R): R
// Accumulation of coroutine context elements
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
// In the current CoroutineContext, after removing the element identified by the key, the remaining context elements (returned in the form of CoroutineContext)
public fun minusKey(key: Key): CoroutineContext
public interface Key
public interface Element : CoroutineContext {
// Key that identifies the context element
public val key: Key
// If the key is the same, return the element itself, otherwise return null
public override operator fun get(key: Key): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
// Execute the incoming operation function
public override fun fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
CoroutineContext
plus
method:
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed. minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)}
}
}
For the convenience of later explanation, the calling form is A + B, assuming that A is a coroutine context containing multiple elements, and B is a single context element. The general execution flow of this method is as follows:
- If element B is empty, return to the original context A.
- In the lambda block of fold, it can be considered that acc is A and element is B.
- If the element.key element is subtracted from A (denoted as C), and C is an empty context, return B (equivalent to element B replacing context A).
- Check whether there is a ContinuationInterceptor element in C, if not, return C and B after splicing.
- C removes the ContinuationInterceptor and records it as D. If D is empty, splice B and ContinuationInterceptor and return.
- D is not empty, then splice D and B and ContinuationInterceptor and return.
To put it simply, here is to splice the “incoming coroutine context element” with the “original coroutine context element”. If there is a conflict in key, replace the element with conflicting key
in the original collection with the incoming element. When splicing context elements, if there is a ContinuationInterceptor
element, make sure it is at the far right of the “coroutine context element collection”, so that it has the highest priority. When obtaining this element from the coroutine context can be obtained faster (as for why the element is on the right, the priority of the element is high and the acquisition is fast, which has been explained in the introduction of CombinedContext
).
plus
method executionIt is difficult to describe the execution process clearly in words. If you want to know its implementation process, you can try a few examples. But its specific execution process is not the focus of analysis, just have a general impression.
Continuation interception mechanism
This is the last link in the analysis of the principle of coroutine implementation. When we use coroutines, we will use some schedulers such as Dispatchers.Main
and Dispatchers.IO
to schedule threads. In the previous analysis, there is no Mention how coroutines perform thread scheduling.
Thread scheduling is related to the continuation interceptor ContinuationInterceptor
, which is also a “coroutine context element”:
public interface ContinuationInterceptor : CoroutineContext.Element {
// Key corresponding to the continuation interceptor
companion object Key : CoroutineContext.Key
// Return a continuation that wraps the original continuation (the original continuation is passed in as a method parameter).
// If the method does not want to intercept the incoming continuation, it can also return the original continuation directly.
// When the original continuation is completed, if the continuation was intercepted before, the coroutine framework will call releaseInterceptedContinuation
// method, the parameter passed in is the "wrapper class of the continuation".
public fun interceptContinuation(continuation: Continuation): Continuation
// This function will only be called if the interceptContinuation is successfully intercepted.
// If the original continuation is successfully intercepted, When the original continuation is completed and is no longer used, this method will be called, and the parameter passed in is the "wrapper class of the continuation".
public fun releaseInterceptedContinuation(continuation: Continuation) {
/* do nothing by default */
}
...
}
The continuation interceptor can be used to intercept a continuation. The most common continuation interceptor is the coroutine dispatcher CoroutineDispatcher
, which can be obtained through the singleton class Dispatchers
to the corresponding coroutine scheduler. View the implementation of CoroutineDispatcher
:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
public final override fun interceptContinuation(continuation: Continuation): Continuation =
DispatchedContinuation(this, continuation)
@InternalCoroutinesApi
public override fun releaseInterceptedContinuation(continuation: Continuation) {
(continuation as DispatchedContinuation).reusableCancellableContinuation?.detachChild()
}
...
}
- Interceptor:
CoroutineDispatcher
inherits fromContinuationInterceptor
, so it is also a continuation interceptor. - Identification of the context element:
CoroutineDispatcher
inherits fromAbstractCoroutineContextElement
, and passes in theContinuationInterceptor.Key
construction parameter to identify itself. - isDispatchNeeded: If you need to use the
dispatch
method to schedule, returntrue
, otherwise returnfalse
. This method returns by defaulttrue
. The coroutine scheduler can override this method to provide a performance optimization to avoid unnecessarydispatch
, for example, the main thread schedulerDispatchers.Main
will determine whether the current coroutine has been In theUI
thread, if it is, the method will returnfalse
, and there is no need to execute thedispatch
method for unnecessary thread scheduling. - dispatch: In a given context and thread, execute the
block
block.
Assume that the coroutine scheduler used is the main thread scheduler Dispatchers.Main
:
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
View MainDispatcherLoader.dispatcher
:
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader. loadMainDispatcherFactory()
} else {// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
Called tryCreateDispatcher
:
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
createMissingDispatcher(cause, hintOnError())
}
Continue to track and find that createDispatcher
is a method of MainDispatcherFactory
interface, one of which is implemented in AndroidDispatcherFactory
:
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List) =
HandlerContext(Looper. getMainLooper(). asHandler(async = true))
...
}
HandlerContext
is actually the final implementation of the scheduler Dispatchers.Main
:
# handler: Handler of the main thread
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invoke Immediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
...
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler. post(block)
}
...
}
-
isDispatchNeeded: Use
looper
to determine whether the coroutine is currently on the main thread. If yes, returnfalse
, indicating that no more thread scheduling is needed. Otherwise, returntrue
to indicate that thread scheduling is required. -
dispatch: use the main thread’s
handler
topost
the incomingblock
block operate.
After having a certain understanding of the “continuation interceptor” and “coroutine scheduler”, let’s look back at how the coroutine scheduler works. We have previously analyzed the startCoroutineCancellable
method of the Cancellable
file:
internal fun (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
}
The LaunchLambda
instance is returned in the createCoroutineUnintercepted
method. In the previous analysis, we ignored the intercepted
method and directly analyzed it as LaunchLambda
will call the resumeCancellableWith
method. If no continuation interceptor is set for the coroutine, then LaunchLambda
will directly call resumeCancellableWith
method. Let’s see, what happens if a continuation interceptor is set for the coroutine?
Check the intercepted
method called by LaunchLambda
, which is in the IntrinsicsJVM
file:
public actual fun Continuation. intercepted(): Continuation =
(this as? ContinuationImpl)?.intercepted()?: this
LaunchLambda
is of type ContinuationImpl
, so it will call the parent class ContinuationImpl::intercepted
:
internal abstract class ContinuationImpl(
completion: Continuation?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation?) : this(completion, completion?.context)
...
@Transient
private var intercepted: Continuation? = null
public fun intercepted(): Continuation =
intercepted
?: (context[ContinuationInterceptor]?. interceptContinuation(this) ?: this)
.also { intercepted = it }
...
}
At the beginning, intercepted
is null
, so it will judge whether there is a ContinuationInterceptor
element in the coroutine context, and if not, it will return this
(i.e. LaunchLambda
itself , and set the intercepted
variable to LaunchLambda
), if there is, it will call the interceptContinuation
method, assuming that the continuation interceptor used is Dispatchers.Main
, then it is to call the interceptContinuation
method of CoroutineDispatcher
, which will return a DispatchedContinuation
(and put the DispatchedContinuation
is set to the intercepted
variable).
View CoroutineDispatcher::interceptContinuation
:
public final override fun interceptContinuation(continuation: Continuation): Continuation =
DispatchedContinuation(this, continuation)
DispatchedContinuation
class:
internal class DispatchedContinuation(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation
) : DispatchedTask(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation by continuation {...}
In the example here, dispatcher
is Dispatchers.Main
and continuation
is LaunchLambda
.
Go back to the startCoroutineCancellable
method of the Cancellable
file:
internal fun (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
}
In the case of a continuation interceptor (Dispatchers.Main
), the intercepted
method will return DispatchedContinuation
, and then call its resumeCancellableWith
method:
public fun Continuation.resumeCancellableWith(
result: Result,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
Call to another resumeCancellableWith
method, which is implemented in DispatchedContinuation
:
inline fun resumeCancellableWith(
result: Result,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) { // need thread scheduling
_state = state
resumeMode = MODE_CANCELLABLE
// Thread scheduling, passing itself in as a Runnable block
dispatcher. dispatch(context, this)
} else { // No need for thread scheduling
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
// Finally, continuation.resumeWith will be called, namely LaunchLambda.resumeWith
resumeUndispatchedWith(result)
}
}
}
}
As you can see, it calls dispatcher.isDispatchNeeded
to determine whether thread scheduling is required. Taking Dispatchers.Main
as an example, it is to determine whether the current coroutine is in the main If it is running in a thread, no scheduling is required; otherwise, the coroutine needs to be scheduled to run in the main thread.
- No thread scheduling is required: it will eventually call
LaunchLambda.resumeWith
, and its subsequent execution process has been analyzed before. - Thread scheduling is required: (Taking the coroutine scheduler of the main thread as an example) will eventually execute the incoming
Runnable
in the main thread.
Where is the run
method of
Runnable
implemented? There is an implementation of the run
method in DispatchedContinuation
parent class DispatchedTask
:
public final override fun run() {
...
try {
// The obtained delegate is actually DispatchedContinuationval delegate = delegate as DispatchedContinuation
// The obtained continuation is actually LaunchLambda
val continuation = delegate. continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if canceled
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job. getCancellationException()cancelCompletedResult(state, cause)
continuation. resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation. resumeWithException(exception)
} else {
// Under normal circumstances, it will execute here and call the resume method of LaunchLambda
continuation. resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
...
} finally {
...
}
}
In the run
method, the resume
method of LaunchLambda
will be called eventually (internally, resumeWith
method). So the thread scheduling done here is actually to pass the code post
to the main thread to run through the handler
of the main thread, so as to complete the thread scheduling work.
In addition, there are a few unstudied places and my own conjectures:
1. releaseIntercepted method: in BaseContinuationImpl::resumeWith
,Every time the invokeSuspend
method of a continuation is executed, the releaseIntercepted
method of the continuation will be called
protected override fun releaseIntercepted() {
val intercepted = intercepted
// intercepted is not null and not itself (that is, the continuation was successfully intercepted before), enter the If block
if (intercepted != null && intercepted !== this) {
// Call the releaseInterceptedContinuation method of the continuation interceptor and pass in the continuation wrapper class
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(
intercepted)
}
// Set the intercepted variable to CompletedContinuation
this. intercepted = CompletedContinuation // just in case
}
The releaseInterceptedContinuation
method of the continuation interceptor should do some resource cleaning.
Second, functions like withContext
:
scope.launch(Dispatchers.Main) {
withContext(Dispatchers.IO) {}
}
public suspend fun withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {...}
After the block
block is executed, the thread will automatically switch back to the thread “specified by the coroutine scheduler when starting the coroutine”, so how does it switch back? Personal guess, when the coroutine is called from top to bottom, the context of the coroutine will be passed down layer by layer. When the block
block of withContext
is executed, the coroutine The context will be saved somewhere, and when the execution of the block
block ends, the coroutine scheduler will be taken out from the previously saved coroutine context, and the remaining code (coroutine recovery) will be scheduled to Execute in the corresponding thread, so that after the execution of the block
block, the thread will automatically switch back to the thread “specified by the coroutine scheduler when starting the coroutine”.
Reference
-
Coroutine Cafe – Construct Magic – Exploring Kotlin Coroutine Implementation Principles – M.D.
-
Suspend functions – Kotlin Vocabulary – YouTube.