Implementation principle of Kotlin coroutine

Foreword

This article analyzes the implementation principle of coroutines in Kotlin/JVM.

First look at the suspend keyword

The following example simulates a network request:

class Temp {
    suspend fun fetchData(argument: String): Boolean {
        val result = netRequest(argument)
        return result == 0
    }

    // Simulate a network request
    suspend fun netRequest(argument: String): Int {
        delay(1000)
        return argument. length
    }
}

These two methods are decorated with the suspend keyword. We will decompile the bytecode of this file into Java code with the same effect:

public final class Temp {
   @Nullable
   public final Object fetchData(@NotNullString argument, @NotNull Continuation var2) {
      Object $continuation;
      label25: {
         if (var2 instanceof ) {
            $continuation = ()var2;
            if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
               (()$continuation).label -= Integer.MIN_VALUE;
               break label25;
            }
         }

         $continuation = new ContinuationImpl(var2) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Temp.this.fetchData((String)null, this);
            }
         };
      }

      Object $result = (()$continuation).result;
      Object var6 = IntrinsicsKt. getCOROUTINE_SUSPENDED();
      Object var10000;
      switch((()$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         (()$continuation).label = 1;
         var10000 = this.netRequest(argument, (Continuation)$continuation);
         if (var10000 == var6) {
            return var6;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         var10000 = $result;
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      int result = ((Number)var10000).intValue();
      return Boxing. boxBoolean(result == 0);
   }

   @Nullable
   public final Object netRequest(@NotNull String argument, @NotNull Continuation var2) {
      Object $continuation;
      label20: {
         if (var2 instanceof ) {
            $continuation = ()var2;
            if (((()$continuation).label & Integer.MIN_VALUE) != 0) {
               (()$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl(var2) {
            // $FF: synthetic field
            Object result;
            int label;
            Object L$0;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Temp.this.netRequest((String)null, this);
            }
         };
      }

      Object $result = (()$continuation).result;
      Object var5 = IntrinsicsKt. getCOROUTINE_SUSPENDED();switch((()$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         (()$continuation).L$0 = argument;
         (()$continuation).label = 1;
         if (DelayKt.delay(1000L, (Continuation)$continuation) == var5) {
            return var5;
         }
         break;
      case 1:
         argument = (String)(()$continuation).L$0;
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      return Boxing. boxInt(argument. length());
   }
}

A few lines of coroutine-related codes correspond to so many Java codes. It can be seen that the kotlin compiler has done a lot for us.

The readability of the above code is not high. For example, there are undefined types such as . I use jd-gui to Temp.class The file was decompiled again, After getting more information, I integrated the above decompiled code with the information obtained from jd-gui decompilation, and appropriately renamed some classes and variables, Get the more complete and readable “Java code corresponding to Temp.class decompiled”, first of all about fetchData of:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label25:
    {
        if (completion instanceof FetchDataStateMachine) {
            $continuation = (FetchDataStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label25;
            }
        }
        $continuation = new FetchDataStateMachine(completion);
    }

    Object $result = $continuation. result;
    Object resultTemp;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);$continuation.label = 1;
            resultTemp = this.netRequest(argument, (Continuation) $continuation);
            if (resultTemp == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
            break;
        case 1:
            ResultKt.throwOnFailure($result);
            resultTemp = $result;
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    int result = ((Number) resultTemp).intValue();
    return Boxing. boxBoolean(result == 0);
}

static final class FetchDataStateMachine extends ContinuationImpl {
    Object result;
    int label;

    FetchDataStateMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label|= Integer.MIN_VALUE;
        return Temp. this. fetchData(null, (Continuation) this);
    }
}

netRequest related codes and fetchData related codes are similar in structure and form:

public final Object netRequest(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label20:
    {
        if (completion instanceof NetRequestStateMachine) {
            $continuation = (NetRequestStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label20;
            }
        }
        $continuation = new NetRequestStateMachine(completion);
    }

    Object $result = $continuation. result;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);$continuation. functionParameter = argument;
            $continuation.label = 1;
            if (DelayKt.delay(1000L, (Continuation) $continuation) == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
            break;
        case 1:
            argument = (String) ($continuation. functionParameter);
            ResultKt.throwOnFailure($result);
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    return Boxing. boxInt(argument. length());
}

static final class NetRequestStateMachine extends ContinuationImpl {
    Object result;
    int label;
    Object functionParameter;

    NetRequestStateMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;this.label |= Integer.MIN_VALUE;
        return Temp.this.netRequest(null, (Continuation) this);
    }
}

It can be found that in the decompiled Java code, the fetchData and netRequest methods both have a Continuation completion code> parameter, which is done by Kotlin Compiler for us. For functions modified by suspend, Kotlin Compiler will help you when compiling We pass in a Continuation parameter in this function, and use the Continuation parameter instead of the suspend modifier. What is the meaning of this parameter? Woolen cloth?

First knowledge continuation

Continuations are a key to understanding how coroutines work.

Look at traditional network requests first:

data class User(val id: Long, val name: String)

interface Callback {
    fun success(user: User)
    fun failure(t: Throwable)
}

class Model {
    fun getUserInfo(callback: Callback) {
        Thread.sleep(1000) // simulate network request
        callback.success(User(1, "giagor"))
    }
}

class Business {
    val model = Model()

    fun getUserInfo() {
        model.getUserInfo(object : Callback {
            override fun success(user: User) {
                showMsg(user. toString())
            }

            override fun failure(t: Throwable) {
                showMsg(t. message ?: "")
            }
        })
    }

    fun showMsg(msg: String) {
        //...
    }
}

When using Model to make a network request, use Callback to receive the result of the network request, we At this time, we can use Callback Think of it as a continuation, that is, the continuation of the network request, used to receive the result of the network request.

Use the Continuation interface in the coroutine to represent a continuation, which represents a continuation after a suspension point, that is, the rest of the code that should be executed after the suspension point:

public interface Continuation {
    // The context of the coroutine corresponding to the continuation
    public val context: CoroutineContext
    // Resume the execution of the corresponding coroutine, and pass a result indicating success or failure as the return value of the last suspension point
    public fun resumeWith(result: Result<T>)
}

In Kotlin 1.3, there is also an extension function that can easily call resumeWith:

public inline fun  Continuation.resume(value: T): Unit =
    resumeWith(Result. success(value))

public inline fun  Continuation.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result. failure(exception))

As mentioned earlier, for functions modified by suspend, Kotlin Compiler will help us pass in a Continuation parameter in the function, Use the Continuation parameter instead of the suspend modifier, Through the Continuation parameter, Kotlin Compiler can convert our The coroutine code is converted into an equivalent callback code, that is to say, the Kt compiler has written the callback code for us. As for how to write it for us, we will analyze it later. code>Continuation to control the asynchronous call process is called CPS transformation (Continuation-Passing-Style Transformation).

State machine

fetchData function will generate the following static internal class when compiled (continuation):

static final class FetchDataStateMachine extends ContinuationImpl {
    Object result;
    int label;

    FetchDataStateMachine(Continuation $completion) {
        super($completion);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return Temp. this. fetchData(null, (Continuation) this);
    }
}

FetchDataStateMachineThe inheritance relationship is as follows:

FetchDataStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation

FetchDataStateMachine receives a Continuation parameter named $completion, $completion is saved in the parent classBaseContinuationImpl:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {...}

Through $completion, the execution result of the fetchData function can be passed back to the function calling fetchData, with $completion has the ability to implement callback.

The state machine FetchDataStateMachine declares two variables result and label

  • result: Indicates the result of the previous Continuation, for example, there are functions A and B, ContinuationA and ContinuationB are declared inside the function, A calls B and ContinuationA is passed to B for saving. In the subsequent callback process, ContinuationA can get the execution result of ContinuationB::invokeSuspend from the result variable .
  • label: Kotlin Compiler can identify where inside the function will be suspended, each suspension point (suspension point code>) is represented as a state of the state machine (state), these states are represented by the switch case statement. label indicates which state of the state machine should be executed currently, specifically which case to enter, and is recorded by the label variable The current state of the state machine.

Look at the first half of the code of fetchData:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    label25:
    {
        if (completion instanceof FetchDataStateMachine) {
            $continuation = (FetchDataStateMachine) completion;
            if (($continuation.label & Integer.MIN_VALUE) != 0) {
                $continuation.label -= Integer.MIN_VALUE;
                break label25;
            }
        }
        $continuation = new FetchDataStateMachine(completion);
    }
    ...
}

It will judge whether the incoming completion is of FetchDataStateMachine type, if so, do some operations on its label variable, if notThen directly create a FetchDataStateMachine and pass in completion (completion will be saved).

Look at the second half of the code of fetchData:

public final Object fetchData(@NotNull String argument,@NotNull Continuation completion) {
    Object $continuation;
    ...
    Object $result = $continuation. result;
    Object resultTemp;
    switch ($continuation.label) {
        case 0:
            ResultKt.throwOnFailure($result);
            $continuation.label = 1;
            resultTemp = this.netRequest(argument, (Continuation) $continuation);
            if (resultTemp == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED;
            }
            break;
        case 1:
            ResultKt.throwOnFailure($result);
            resultTemp = $result;
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    int result = ((Number) resultTemp).intValue();
    return Boxing. boxBoolean(result == 0);
}

fetchDataThe original code statement of the method will be divided into multiple case statements under switch, here is


The label variable in

FetchDataStateMachine is to control which case branch to execute currently.

It can be seen that the function and the continuation form a finite state machine (FSM, namely Finite-State Machine) to control the execution of the coroutine code.

What is “non-blocking suspend”?

In the netRequest method, delay(1000) is called to suspend the current coroutine, simply look at the decompilation of the delay method Code after:

public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
    if (timeMillis <= 0L) {
        return Unit. INSTANCE;
    } else {// implementation class
        CancellableContinuationImpl cancelableContinuationImpl = new CancellableContinuationImpl(IntrinsicsKt. intercepted($completion), 1);
        cancelableContinuationImpl.initCancellability();
        // upcast
        CancellableContinuation cont = (CancellableContinuation)cancellableContinuationImpl;
        if (timeMillis < Long.MAX_VALUE) {
            // delay operation
            getDelay(cont. getContext()).scheduleResumeAfterDelay(timeMillis, cont);
        }
// get execution result
        Object result = cancelableContinuationImpl. getResult();
        if (result == COROUTINE_SUSPENDED) {
            DebugProbesKt.probeCoroutineSuspended($completion);
        }
// return result
        return result;
    }
}

In this method, the delay operation will be performed. If it needs to be suspended, the COROUTINE_SUSPENDED value will be returned to the caller.

Combining the decompiled code of fetchData, netRequest and delay, we can get the following call graph:

The red line in the figure indicates that the function returns COROUTINE_SUSPENDED and needs to be suspended. When the delay method needs to be suspended, it returns COROUTINE_SUSPENDED, then the netRequest method returns COROUTINE_SUSPENDED, then fetchData method returns COROUTINE_SUSPENDED, repeating this process until the top of the call stack.

Through this “end method call” method, the coroutine is temporarily not executed on this thread, so that the thread can handle other tasks (including executing other coroutines), which is why the coroutine The suspension will not block the current thread, which is also the origin of “non-blocking suspension”.

How to restore?

Since the coroutine is suspended, there is a corresponding recovery of the coroutine. Let me talk about the conclusion first: the essence of coroutine recovery is to call back the continuation.

I haven’t studied the specific implementation of the delay function yet,but the delay function will perform a waiting operation in a certain sub-thread, and when the delay time arrives, it will The resumeWith method of $completion passed to the delay function will be called, that is, the resumeWith method of NetRequestStateMachine will be called method. The inheritance relationship and parent class of NetRequestStateMachine are as follows:

NetRequestStateMachine -> ContinuationImpl -> BaseContinuationImpl -> Continuation

BaseContinuationImpl is currently a focus of our analysis, it mainly does the following things:

  1. Save completion: It saves the FetchDataStateMachine instance of the fetchData method, so that the continuation can be called back level by level .
  2. Rewrite the resumeWith method: BaseContinuationImpl rewrites the resumeWith method of the Continuation interface, This method is used to restore the coroutine and is also the core logic of coroutine recovery.

We look at the definition of the BaseContinuationImpl class:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation?
) : Continuation, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Perform debug probes on each resumed continuation, allowing the debug library to track exactly which parts of the suspended call stack
            // has been restored.
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result. success(outcome)
                    } catch (exception: Throwable) {
                        Result. failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {// unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion. resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }
    
    ...
}

The focus is on the implementation of the resumeWith method, which executes the callback logic under a while(true) loop. Let’s combine the decompiled code of fetchData and netRequest given above to see when the delay time of the delay function is called NetRequestStateMachine‘s resumeWith method, what is the follow-up execution process:

  1. Execute the resumeWith method of NetRequestStateMachine parent class BaseContinuationImpl.
  2. Execute the current continuation which is NetReThe invokeSuspend method of questStateMachine (NetRequestStateMachine has implemented this method, if you forget it, you can look back at the previous decompiled code).
  3. The invokeSuspend method of NetRequestStateMachine calls the netRequest method, and passes in the continuation itself as a parameter.
  4. In the netRequest method, since the type of completion is NetRequestStateMachine, you can use the continuation directly , there is no need to create a new continuation body like the first time entering the netRequest method. At this time, the label value of the continuation is 1, so enter the case 1 statement branch of netRequest.

Actually, in this process, there are some calculation and conversion operations on the label of the continuation body, but the final value of label is 1, The calculation and conversion operation done does not affect our analysis, so it is not the focus

  1. From the continuation, take out the parameter that was first passed into the netRequest method, that is, argument, and return argument.length . For the convenience of later explanation, the return value argument.length is recorded as netRequest-Return here.
  2. Then the netRequest method ends, NetRequestStateMachine::invokeSuspend method is also executed, and netRequest-Return is also used as the return value of the invokeSuspend method, which will be passed to the BaseContinuationImpl In the resumeWith method, in the resumeWith method, wrap netRequest-Return as Result and save to the outcome variable.
  3. Determine whether completion held by NetRequestStateMachine is of BaseContinuationImpl type, we know that the instance it holds is actually FetchDataStateMachine, so it must be BaseContinuationImpl, so the variable is updated
 // Update current to FetchDataStateMachine instance
current = completion
// Update param to outcome (wraps the Result of netRequest-Return)
    param = outcome

In this way, the callback can actually be realized, let’s continue to look back.

  1. Continue to the next round of while loop, FetchDataStateMachine::invokeSuspend will be executed in the with block , in invokeSuspend , Save the incoming parameter param to the result variable (In fact, this is similar to the traditional callback, and the traditional callback also needs to save the execution result of the lower layer Call back to the upper layer), and then call the fetchData method.
  2. In the fetchData method, since the incoming completion is already of type FetchDataStateMachine, so there is no need to go to Create a new continuation. Since the value of the continuation label is 1 at this time, it will enter the case 1 statement, and the netRequest method The execution result is stored in the resultTemp variable, and finally the fetchData method ends and the result result == 0 is returned. For the convenience of explanation, The execution result of the fetchData method is recorded as fetchData-Return.
  3. FetchDataStateMachine::invokeSuspendThe method will also end and return fetchData-Return, and then in resumeWith of BaseContinuationImpl In the method, fetchData-Return is wrapped as Result. Then it will judge whether the completion held by FetchDataStateMachine is of type BaseContinuationImpl.
  4. The follow-up direction of the code, we are not clear at present, we have to know to call fetchData method, what will be done in order to know the follow-up code trend.

From the above process analysis, we have a basic understanding of the recovery of the coroutine, and the flow chart is given below to summarize:

Looking at the calling process of the continuation above, it is actually calling the invokeSuspend method of the continuation layer by layer. From the process point of view, it is a bit like a recursive call, but BaseContinuationImpl:: The implementation of resumeWith is not the same as recursion. Its implementation is to call the invokeSuspend method on the continuation once in the while(true) loop, and then record Its return result is used as the method parameter of the next continuation invokeSuspend.

Simply speaking, it is calling the invokeSuspend method of a continuation, and after the execution of this method is completed, the invokeSuspend method of the next continuation is called. One reason for this is to avoid the call stack being too deep, and there are related comments in BaseContinuationImpl::resumeWith:

This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume

Start a coroutine

We call the fetchData method in a coroutine:

class Temp2 {
    fun execute() {
        GlobalScope. launch(Dispatchers. Main) {
            Temp(). fetchData("argument")
        }
    }
}

A coroutine can be started by the launch method, and its source code is as follows:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart. DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine. start(start, coroutine, block)
    return coroutine
}

The code in the coroutine will be packaged as a block. By default, a StandaloneCoroutine will be created and its start method will be called And return StandaloneCoroutine.

Standalone Coroutine indirectly implements the Job interface and Continuation interface, as follows:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

public abstract class AbstractCoroutine(
    /**
     * The context of the parent coroutine.
     */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation, CoroutineScope {...}

It can be seen that StandaloneCoroutine has multiple roles and implements the Job, Continuation, CoroutineScope interface. The following code tracing can draw a conclusion that the topmost continuation implementation is the coroutine itself, that is, when the coroutine resumes, the continuation body is called back layer by layer, and the topmost continuation is the coroutinecoroutine itself,That is StandaloneCoroutine (here we take StandaloneCoroutine as an example).

In addition, pay attention to the block block type passed in the launch method:

 block: suspend CoroutineScope.() -> Unit

It is equivalent to the following function type:

// CoroutineScope: converted from extension function
// Continuation: converted from the suspend keyword, the Continuation parameter is passed in by the compiler
block : (CoroutineScope,Continuation) -> Unit

// Or expressed in the form of Function2
block : Function2

Then trace the calling process of starting the coroutine. In the launch method, the AbstractCoroutine::start method is called:

 public fun  start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        // Syntactic sugar, actually calls the CoroutineStart.invoke method
        start(block, receiver, this)
    }

CoroutineStart::invoke method:

 public operator fun  invoke(block: suspend R.() -> T, receiver: R, completion: Continuation): Unit =
        when (this) {
            DEFAULT -> block. startCoroutineCancellable(receiver, completion)
            ATOMIC -> block. startCoroutine(receiver, completion)
            UNDISPATCHED -> block. startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

From the launch method, we can know that the default value of CoroutineStart is CoroutineStart.DEFAULT, so it will call block The startCoroutineCancellable method:

internal fun  (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
    }

When I followed the code call of createCoroutineUnintercepted in AS, I found that it would jump to the IntrinsicsKt.class file, which was not found in this file The source code of the method, finally found the IntrinsicsJvm.kt file, and found the source code of the createCoroutineUnintercepted method, as follows:

# R: CoroutineScope
# T: Unit
@SinceKotlin("1.3")
public actual fun  (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation
): Continuation {
    // The probeCoroutineCreated method directly returns completion
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation, Any?>).invoke(receiver, it)
        }
    }
}

Here will determine whether the type of this is BaseContinuationImpl, this is what we passed in launch before lambda block, so what type is this lambda code block? To know the answer, we have to decompile the code given at the beginning of this section

kotlinCode:

class Temp2 {
    fun execute() {
        GlobalScope. launch(Dispatchers. Main) {
            Temp(). fetchData("argument")
        }
    }
}

Properly rename and adjust the decompiled java code to get:

public final class Temp2 {
    ...
    static final class LaunchLambda extends SuspendLambda implements Function2<CoroutineScope, Continuation, Object> {
        int label;

        LaunchLambda(Continuation $completion) {super(2, $completion);
        }

        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
            switch (this.label) {
                case 0:
                    ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                    this.label = 1;
                    if ((new Temp()). fetchData("argument", (Continuation) this) == COROUTINE_SUSPENDED)
                        return COROUTINE_SUSPENDED;
                    (new Temp()). fetchData("argument", (Continuation) this);
                    return Unit. INSTANCE;
                case 1:
                    ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
                    return Unit. INSTANCE;
            }
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }

        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation $completion) {
            return (Continuation) new LaunchLambda($completion);
        }

        @Nullable
        public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation p2) {
            return ((LaunchLambda) create(p1, p2)). invokeSuspend(Unit. INSTANCE);
        }
    }
}

It can be seen that a static internal class LaunchLambda is automatically generated in Temp2, which corresponds to the introduced in the launch method >lambda block. Inheritance relationship of LaunchLambda (from top to bottom, order from subclass to parent class):

LaunchLambda
-> SuspendLambda // Lambda blocks modified with suspend will inherit from this class
-> ContinuationImpl
-> BaseContinuationImpl // rewrite resumeWith function
-> Continuation

OK, back to the createCoroutineUnintercepted method, now you can answer the question just raised, lambda incoming lambda block of type BaseContinuationImpl? According to the inheritance relationship above, of course it is! Then it will call the create method of LaunchLambda, note that the second parameter is passed in completion (the code says probeCompletion), it will eventually be saved in the completion variable of the parent class BaseContinuationImpl, this completion parameter is < The StandaloneCoroutine created in the code>launch method is the coroutine itself, which is the topmost continuation when the coroutine resumes.

A LaunchLambda instance is obtained by calling the create method, and the createCoroutineUnintercepted method is executed and the LaunchLambda instance is returned. Then the code execution returns to startCoroutineCancellable, review the method:

internal fun  (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
    }

There are two parts of the call here, first calling the intercepted method, and then calling the resumeCancellableWith method. The intercepted method is related to the continuation interception mechanism, which will be introduced later. Ignore it here. Here, it is directly considered that the resumeCancellableWith method of the LaunchLambda instance is called. , the method is as follows:

public fun  Continuation.resumeCancellableWith(
    result: Result,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

Then it will go to the resumeWith method. As mentioned earlier, this method is implemented in the parent class BaseContinuationImpl, and invokeSuspend will be called in this method method, the invokeSuspend method is implemented in LaunchLambda, as follows:

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
    switch (this.label) {
        case 0:
            ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
            this.label = 1;
            if ((new Temp()). fetchData("argument", (Continuation) this) == COROUTINE_SUSPENDED)
                return COROUTINE_SUSPENDED;
            (new Temp()). fetchData("argument", (Continuation) this);
            return Unit. INSTANCE;
        case 1:
            ResultKt.throwOnFailure(SYNTHETIC_LOCAL_VARIABLE_1);
            return Unit. INSTANCE;
    }
    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

At the beginning, the value of label is 0, so it will enter the case 0 statement branch, in which the label will be set The value of >label is 1, then create a Temp object andAnd call its fetchData method, and pass in LaunchLambda itself as a parameter, that is, the LaunchLambda instance will be saved in fetchData< In the completion variable of the continuation created by the /code> method, it is convenient to call back when the coroutine resumes.

Current holding diagram of the continuation body:

At this point, from starting a coroutine to how the coroutine is finally suspended, we can already connect it in series. In the "How to restore?" section, we haven't analyzed the last few steps of the coroutine recovery. Let's analyze it here, and then the entire process of coroutine recovery can also be linked together.

The follow-up process of coroutine recovery:

  1. When FetchDataStateMachine::invokeSuspend is executed, it will be judged in resumeWith method of BaseContinuationImpl< Whether the completion (i.e. LaunchLambda) held by code>FetchDataStateMachine is of BaseContinuationImpl type depends on LaunchLambda Inheritance relationship, it is easy to get the answer “yes”, so it will enter the next round of while loop and call the invokeSuspend method of LaunchLambda.
  2. Because label = 1 will enter case 1 statement, which directly return Unit. Then judge whether the completion (i.e. StandaloneCoroutine) held by LaunchLambda is of BaseContinuationImpl type, according to StandaloneCoroutine is easy to get the answer "no", so the resumeWith method of StandaloneCoroutine will be called.
  3. StandaloneCoroutine's resumeWith method is implemented in the parent class AbstractCoroutine:
 public final override fun resumeWith(result: Result) {
        val state = makeCompletingOnce(result.toState())
        // If waiting for the child coroutine to complete, return
        if (state === COMPLETING_WAITING_CHILDREN) return
        // should do some follow-up processing
        afterResume(state)
    }

At this time, the topmost continuation (the coroutine itself) is also restored.

  1. The execution of the BaseContinuationImpl::resumeWith method is completed, and the recovery of the entire coroutine is also completed.

Complete and improve on the basis of the previous flow chart:

1. The flowchart of coroutine calling from top to bottom ( coroutine suspended)

The blue text and lines represent new additions, and the red text and lines represent pending processes.

2. The flow chart of coroutine bottom-up recovery (coroutine recovery)

The blue text and lines represent new additions, and the orange text and lines represent the end of method calls.

Coroutine context

The coroutine context CoroutineContext defines the behavior of the coroutine. It records the information held by the current coroutine and is an important data object in the coroutine operation. CoroutineContext is an interface:

public interface CoroutineContext {...}

In the continuation, there is information about CoroutineContext:

public interface Continuation {
    // The context of the coroutine corresponding to the continuation
    public val context: CoroutineContext
    // Resume the execution of the corresponding coroutine, and pass a result indicating success or failure as the return value of the last suspension point
    public fun resumeWith(result: Result)
}

The following elements are all elements of "coroutine context":

  • Job: Control the life cycle of the coroutine.
  • CoroutineDispatcher: Dispatches work to the appropriate thread.
  • CoroutineName: The name of the coroutine, which can be used for debugging.
  • CoroutineExceptionHandler: Handle uncaught exceptions.

CoroutineContext can be regarded as a collection of CoroutineContext.Element, and each element in the collection can be located using CoroutineContext.Key , and the Key of each element is different.

CoroutineContext.Element definition:

 public interface Element : CoroutineContext {...}

You can see that Element itself also implements the CoroutineContext interface, which is very strange. It seems that Int implements List The interface is the same, Why is the element itself a collection? In fact, this is mainly for the convenience of API design. In this case, an element such as Job can also be directly used as a CoroutineContext instead of creating a containing only one element. >List, multiple elements can also be spliced ​​by "+", such as:

scope.launch(CoroutineName("coroutine") + Dispatchers.Main) {...}

The "+" here is actually operator overloading, corresponding to the plus method declared by CoroutineContext:

 public operator fun plus(context: CoroutineContext): CoroutineContext = ...

The "coroutine context" stores elements in a clever way. It doesn't create a collection internally, and each position of the collection stores an element. It uses a CombinedContext structure to achieve data access, the definition of CombinedContext and the get method:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    override fun  get(key: Key): E? {
        var cur = this
        while (true) {
            cur.element[key]?.let { return it }
            val next = cur. left
            if (next is CombinedContext) {cur = next
            } else {
                return next[key]
            }
        }
    }
    ...
}

It can be seen from the constructor that it contains two parts: left and element. That is to say, a CombinedContext may contain multiple elements inside.

  • left: It may be an ordinary context element (CoroutineContext.Element), or it may be another CombinedContext (which contains multiple context elements).
  • element: A coroutine context element.

In the get method of CombinedContext, there is a while(true) loop, the execution process is as follows:

  1. It will first judge whether the current element element matches the incoming key, if yes, return the element directly, otherwise get left section.
  2. If left is a CombinedContext part, repeat step 1 for the left variable.
  3. If left is not part of CombinedContext, then directly call its get method to get the element (if not, return null).

In addition, it can also be seen that element precedes left is accessed, so the farther to the right the context element has the higher priority.

Key is used to identify the coroutine context element, see its definition:

public interface CoroutineContext {
...
    public interface Key

    public interface Element : CoroutineContext {
        // Key used to identify the element
    public val key: Key
        ...
    }
}

CoroutineContext.Element has an abstract class implementation, which allows us to implement context elements more conveniently:

public abstract class AbstractCoroutineContextElement(public override val key: Key) : Element

Taking CoroutineName as an example, analyze how to implement a coroutine context element:

public data class CoroutineName(
    val name: String
    /* CoroutineName.Key can be abbreviated as CoroutineName */
) : AbstractCoroutineContextElement(CoroutineName) {
    
    public companion object Key : CoroutineContext.Key
...}

First declare that the parameter passed to the parent class AbstractCoroutineContextElement is CoroutineName.Key, but it can be abbreviated as CoroutineName. In fact, this is also easy to understand. In Kotlin, when we call the method of the companion object, we can omit the class name of the companion object. The same is true here.

CoroutineName internally declares a companion object Key inherited from CoroutineContext.Key, and passes it to the parent class as a construction parameterAbstractCoroutineContextElement, as the Key of the coroutine context element.

The above is a common way to implement the coroutine context element, that is, define a companion object in the coroutine context element, and use the companion object as Key to identify the context element.

Finally, look at the complete definition of CoroutineContext:

public interface CoroutineContext {
    // get element by key
    public operator fun  get(key: Key): E?

    // translated as "folding", it is related to the accumulation of context elements
    public fun  fold(initial: R, operation: (R, Element) -> R): R

    // Accumulation of coroutine context elements
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
        
// In the current CoroutineContext, after removing the element identified by the key, the remaining context elements (returned in the form of CoroutineContext)
    public fun minusKey(key: Key): CoroutineContext

    public interface Key

    public interface Element : CoroutineContext {
        // Key that identifies the context element
        public val key: Key

        // If the key is the same, return the element itself, otherwise return null
        public override operator fun  get(key: Key): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        // Execute the incoming operation function
        public override fun  fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

CoroutineContext plusmethod:

public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element ->
            val removed = acc.minusKey(element.key)
            if (removed === EmptyCoroutineContext) element else {
                // make sure interceptor is always last in the context (and thus is fast to get when present)
                val interceptor = removed[ContinuationInterceptor]
                if (interceptor == null) CombinedContext(removed, element) else {
                    val left = removed. minusKey(ContinuationInterceptor)
                    if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        CombinedContext(CombinedContext(left, element), interceptor)}
            }
        }

For the convenience of later explanation, the calling form is A + B, assuming that A is a coroutine context containing multiple elements, and B is a single context element. The general execution flow of this method is as follows:

  1. If element B is empty, return to the original context A.
  2. In the lambda block of fold, it can be considered that acc is A and element is B.
  3. If the element.key element is subtracted from A (denoted as C), and C is an empty context, return B (equivalent to element B replacing context A).
  4. Check whether there is a ContinuationInterceptor element in C, if not, return C and B after splicing.
  5. C removes the ContinuationInterceptor and records it as D. If D is empty, splice B and ContinuationInterceptor and return.
  6. D is not empty, then splice D and B and ContinuationInterceptor and return.

To put it simply, here is to splice the “incoming coroutine context element” with the “original coroutine context element”. If there is a conflict in key, replace the element with conflicting key in the original collection with the incoming element. When splicing context elements, if there is a ContinuationInterceptor element, make sure it is at the far right of the “coroutine context element collection”, so that it has the highest priority. When obtaining this element from the coroutine context can be obtained faster (as for why the element is on the right, the priority of the element is high and the acquisition is fast, which has been explained in the introduction of CombinedContext).

plus method executionIt is difficult to describe the execution process clearly in words. If you want to know its implementation process, you can try a few examples. But its specific execution process is not the focus of analysis, just have a general impression.

Continuation interception mechanism

This is the last link in the analysis of the principle of coroutine implementation. When we use coroutines, we will use some schedulers such as Dispatchers.Main and Dispatchers.IO to schedule threads. In the previous analysis, there is no Mention how coroutines perform thread scheduling.

Thread scheduling is related to the continuation interceptor ContinuationInterceptor, which is also a “coroutine context element”:

public interface ContinuationInterceptor : CoroutineContext.Element {
    // Key corresponding to the continuation interceptor
companion object Key : CoroutineContext.Key
    
    // Return a continuation that wraps the original continuation (the original continuation is passed in as a method parameter).
    // If the method does not want to intercept the incoming continuation, it can also return the original continuation directly.
    // When the original continuation is completed, if the continuation was intercepted before, the coroutine framework will call releaseInterceptedContinuation
    // method, the parameter passed in is the "wrapper class of the continuation".
    public fun  interceptContinuation(continuation: Continuation): Continuation
    
    // This function will only be called if the interceptContinuation is successfully intercepted.
    // If the original continuation is successfully intercepted, When the original continuation is completed and is no longer used, this method will be called, and the parameter passed in is the "wrapper class of the continuation".
    public fun releaseInterceptedContinuation(continuation: Continuation) {
        /* do nothing by default */
    }
    ...
}

The continuation interceptor can be used to intercept a continuation. The most common continuation interceptor is the coroutine dispatcher CoroutineDispatcher, which can be obtained through the singleton class Dispatchers to the corresponding coroutine scheduler. View the implementation of CoroutineDispatcher:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey(
        ContinuationInterceptor,
        { it as? CoroutineDispatcher })
    
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
        
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
        
    public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
        
    public final override fun  interceptContinuation(continuation: Continuation): Continuation =
        DispatchedContinuation(this, continuation)
        
    @InternalCoroutinesApi
    public override fun releaseInterceptedContinuation(continuation: Continuation) {
        (continuation as DispatchedContinuation).reusableCancellableContinuation?.detachChild()
    }
        
...
}
  • Interceptor: CoroutineDispatcher inherits from ContinuationInterceptor, so it is also a continuation interceptor.
  • Identification of the context element: CoroutineDispatcher inherits from AbstractCoroutineContextElement, and passes in the ContinuationInterceptor.Key construction parameter to identify itself.
  • isDispatchNeeded: If you need to use the dispatch method to schedule, return true, otherwise return false. This method returns by default true. The coroutine scheduler can override this method to provide a performance optimization to avoid unnecessary dispatch, for example, the main thread scheduler Dispatchers.Main will determine whether the current coroutine has been In the UI thread, if it is, the method will return false, and there is no need to execute the dispatch method for unnecessary thread scheduling.
  • dispatch: In a given context and thread, execute the block block.

Assume that the coroutine scheduler used is the main thread scheduler Dispatchers.Main:

 public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

View MainDispatcherLoader.dispatcher:

 @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader. loadMainDispatcherFactory()
            } else {// We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            @Suppress("ConstantConditionIf")
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }

Called tryCreateDispatcher:

public fun MainDispatcherFactory.tryCreateDispatcher(factories: List): MainCoroutineDispatcher =
    try {
        createDispatcher(factories)
    } catch (cause: Throwable) {
        createMissingDispatcher(cause, hintOnError())
    }

Continue to track and find that createDispatcher is a method of MainDispatcherFactory interface, one of which is implemented in AndroidDispatcherFactory:

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List) =
        HandlerContext(Looper. getMainLooper(). asHandler(async = true))
...
}

HandlerContext is actually the final implementation of the scheduler Dispatchers.Main:

# handler: Handler of the main thread
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invoke Immediately: Boolean
) : HandlerDispatcher(), Delay {
	
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)
    ...
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
    
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler. post(block)
    }
    ...
}
  • isDispatchNeeded: Use looper to determine whether the coroutine is currently on the main thread. If yes, return false, indicating that no more thread scheduling is needed. Otherwise, return true to indicate that thread scheduling is required.

  • dispatch: use the main thread’s handler to post the incoming block block operate.

After having a certain understanding of the “continuation interceptor” and “coroutine scheduler”, let’s look back at how the coroutine scheduler works. We have previously analyzed the startCoroutineCancellable method of the Cancellable file:

internal fun  (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
    }

The LaunchLambda instance is returned in the createCoroutineUnintercepted method. In the previous analysis, we ignored the intercepted method and directly analyzed it as LaunchLambda will call the resumeCancellableWith method. If no continuation interceptor is set for the coroutine, then LaunchLambda will directly call resumeCancellableWith method. Let’s see, what happens if a continuation interceptor is set for the coroutine?

Check the intercepted method called by LaunchLambda, which is in the IntrinsicsJVM file:

public actual fun  Continuation. intercepted(): Continuation =
    (this as? ContinuationImpl)?.intercepted()?: this

LaunchLambda is of type ContinuationImpl, so it will call the parent class ContinuationImpl::intercepted:

internal abstract class ContinuationImpl(
    completion: Continuation?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation?) : this(completion, completion?.context)
...
    @Transient
    private var intercepted: Continuation? = null
    
    public fun intercepted(): Continuation =
        intercepted
            ?: (context[ContinuationInterceptor]?. interceptContinuation(this) ?: this)
                .also { intercepted = it }
    ...
}

At the beginning, intercepted is null, so it will judge whether there is a ContinuationInterceptor element in the coroutine context, and if not, it will return this (i.e. LaunchLambda itself , and set the intercepted variable to LaunchLambda), if there is, it will call the interceptContinuation method, assuming that the continuation interceptor used is Dispatchers.Main, then it is to call the interceptContinuation method of CoroutineDispatcher, which will return a DispatchedContinuation (and put the DispatchedContinuation is set to the intercepted variable).

View CoroutineDispatcher::interceptContinuation:

 public final override fun  interceptContinuation(continuation: Continuation): Continuation =
        DispatchedContinuation(this, continuation)

DispatchedContinuation class:

internal class DispatchedContinuation(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation
) : DispatchedTask(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation by continuation {...}

In the example here, dispatcher is Dispatchers.Main and continuation is LaunchLambda.

Go back to the startCoroutineCancellable method of the Cancellable file:

internal fun  (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion). intercepted(). resumeCancellableWith(Result. success(Unit), onCancellation)
    }

In the case of a continuation interceptor (Dispatchers.Main), the intercepted method will return DispatchedContinuation, and then call its resumeCancellableWith method:

public fun  Continuation.resumeCancellableWith(
    result: Result,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

Call to another resumeCancellableWith method, which is implemented in DispatchedContinuation:

 inline fun resumeCancellableWith(
        result: Result,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) { // need thread scheduling
            _state = state
            resumeMode = MODE_CANCELLABLE
            // Thread scheduling, passing itself in as a Runnable block
            dispatcher. dispatch(context, this)
        } else { // No need for thread scheduling
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    // Finally, continuation.resumeWith will be called, namely LaunchLambda.resumeWith
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

As you can see, it calls dispatcher.isDispatchNeeded to determine whether thread scheduling is required. Taking Dispatchers.Main as an example, it is to determine whether the current coroutine is in the main If it is running in a thread, no scheduling is required; otherwise, the coroutine needs to be scheduled to run in the main thread.

  • No thread scheduling is required: it will eventually call LaunchLambda.resumeWith, and its subsequent execution process has been analyzed before.
  • Thread scheduling is required: (Taking the coroutine scheduler of the main thread as an example) will eventually execute the incoming Runnable in the main thread.

Where is the run method of

Runnable implemented? There is an implementation of the run method in DispatchedContinuation parent class DispatchedTask:

 public final override fun run() {
        ...
        try {
            // The obtained delegate is actually DispatchedContinuationval delegate = delegate as DispatchedContinuation
            // The obtained continuation is actually LaunchLambda
            val continuation = delegate. continuation
            val context = continuation.context
            val state = takeState() // NOTE: Must take state in any case, even if canceled
            withCoroutineContext(context, delegate.countOrElement) {
                val exception = getExceptionalResult(state)
                /*
                 * Check whether continuation was originally resumed with an exception.
                 * If so, it dominates cancellation, otherwise the original exception
                 * will be silently lost.
                 */
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job. getCancellationException()cancelCompletedResult(state, cause)
                    continuation. resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation. resumeWithException(exception)
                    } else {
                        // Under normal circumstances, it will execute here and call the resume method of LaunchLambda
                        continuation. resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            ...
        } finally {
            ...
        }
    }

In the run method, the resume method of LaunchLambda will be called eventually (internally, resumeWith method). So the thread scheduling done here is actually to pass the code post to the main thread to run through the handler of the main thread, so as to complete the thread scheduling work.

In addition, there are a few unstudied places and my own conjectures:

1. releaseIntercepted method: in BaseContinuationImpl::resumeWith ,Every time the invokeSuspend method of a continuation is executed, the releaseIntercepted method of the continuation will be called

 protected override fun releaseIntercepted() {
        val intercepted = intercepted
        // intercepted is not null and not itself (that is, the continuation was successfully intercepted before), enter the If block
        if (intercepted != null && intercepted !== this) {
            // Call the releaseInterceptedContinuation method of the continuation interceptor and pass in the continuation wrapper class
 context[ContinuationInterceptor]!!.releaseInterceptedContinuation(
            intercepted)
        }
        // Set the intercepted variable to CompletedContinuation
        this. intercepted = CompletedContinuation // just in case
    }

The releaseInterceptedContinuation method of the continuation interceptor should do some resource cleaning.

Second, functions like withContext:

scope.launch(Dispatchers.Main) {
    withContext(Dispatchers.IO) {}
}
public suspend fun  withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {...}

After the block block is executed, the thread will automatically switch back to the thread “specified by the coroutine scheduler when starting the coroutine”, so how does it switch back? Personal guess, when the coroutine is called from top to bottom, the context of the coroutine will be passed down layer by layer. When the block block of withContext is executed, the coroutine The context will be saved somewhere, and when the execution of the block block ends, the coroutine scheduler will be taken out from the previously saved coroutine context, and the remaining code (coroutine recovery) will be scheduled to Execute in the corresponding thread, so that after the execution of the block block, the thread will automatically switch back to the thread “specified by the coroutine scheduler when starting the coroutine”.

Reference

  1. Coroutine Cafe – Construct Magic – Exploring Kotlin Coroutine Implementation Principles – M.D.

  2. Suspend functions – Kotlin Vocabulary – YouTube.

Leave a Reply

Your email address will not be published. Required fields are marked *