浅谈Kotlin协程(4)

异常处理

Posted by SpirytusZ on 2022-08-29

前言

挂起函数提供了切换调用栈后自动切回来的能力。那么

  1. 这对挂起函数异常的捕获有何影响?
  2. 当协程发生异常时,协程内如何传递异常?
  3. 协程间如何传播异常?

本文是我对《深入理解Kotlin协程》的读书笔记,记录我对协程异常处理的理解。

协程内的异常捕获

在实践中我们可以使用try-catch语句包裹需要捕获异常的代码,但如果被包裹的代码涉及到调用栈的切换,则捕获不了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun crashAtEventLoop() {
try {
handler.post { 1 / 0 }
} catch(t: Throwable) {
t.printStackTrace()
}
}

fun crashAtThreadSwitch() {
try {
threadPool.execute { 1 / 0 }
} catch(t: Throwable) {
t.printStackTrace()
}
}

我们知道挂起函数内部有可能切换调用栈。如果挂起函数内部切换了调用栈,并且在另一个调用栈发生了异常,会如何表现?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun testCrash() {
GlobalScope.launch {
try {
suspendFunc()
} catch (t: Throwable) {
Log.e(TAG, "testCrash: $t")
}
}
}

private suspend fun suspendFunc() = suspendCoroutine<Int> {
thread {
it.resume(1/0)
}
}

调用testCrash后必会抛出未被捕获的异常而崩溃,崩溃点在第13行。这是合理的,因为崩溃发生在挂起函数切换调用栈后,切回调用栈前,是协程框架所干预不到的地方。

考虑到ContinuationresumeWith方法接受的是一个kotlin.Result参数,我们不妨使用kotlin.runCatching包裹住另一个调用栈,发生异常时返回Result.Failure

1
2
3
4
5
6
7
8
9
private suspend fun suspendFunc() = suspendCoroutine<Int> { cont ->
thread {
runCatching {
cont.resumeWith(1/0)
}.onFailure {
cont.resumeWithException(it)
}
}
}

改造完毕后,再执行一遍,发现异常在testCrash方法中被捕获了。因此我们可以得出一个结论,在另一个调用栈的异常需要通过Continuation.resumeWith方法传递,才能在恢复时挂起点处抛出并捕获。

协程内的异常传递

那么原因是什么呢?通过上篇文章浅谈Kotlin协程(3)-非阻塞式挂起与恢复,我们知道suspendCoroutine所暴露出来的Continuation是这么一个结构:

SafeContinuation的作用是保证suspendCoroutine所暴露的Continuation只调用一次;

DispatchedContinuation的作用是切线程环境;

那么焦点就来到了BaseContinuationImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BaseContinuationImpl {

override fun resumeWith(result: Result<Any?>) {
...
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
...
completion.resumeWith(outcome)
...
}
}

class CoroutineExceptionCatchTest$1 {

private var label = 0

override fun invokeSuspend(result: Result<Any?>): Any? {
...
if (label == 1) {
Result.throwOnFailure(result)
label = 2
}
...
}
}

BaseContinuationImpl.resumeWith捕获了invokeSuspend中主动抛出的异常,将异常传递给了StandaloneCoroutine

查看StandaloneCoroutine的代码,发现它只重写了handleJobException方法。从这个方法的名字看来,就是它处理BaseContinuationImpl传递过来的异常。跟进这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(...) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

fun handleCoroutineException(
context: CoroutineContext,
exception: Throwable
) {
try {
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}
handleCoroutineExceptionImpl(context, exception)
}

val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
CoroutineExceptionHandler::class.java,
CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()

fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
for (handler in handlers) {
handler.handleException(context, exception)
}

val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}

捕获的异常会按顺序传递给以下处理器处理。如果对应的处理器能够处理,则直接返回,否则就会传递给下一个处理器继续处理:

  1. 注册在协程上下文中的CoroutineExceptionHandler
  2. 注册在resources/META-INF/services目录下,通过服务发现机制获取的CoroutineExceptionHandler
  3. Thread.UncaughtExceptionHandler

由于此时我们并没有注册CoroutineExceptionHandler,所以最终异常会交给Thread.UncaughtExceptionHandler处理,其结果就是抛出异常。而后在try-catch语句中再次被捕获,并打印出来。

协程的结构

父子协程关系构建

要想了解异常如何在协程间传播,我们需要了解协程之间的结构关系。协程是一个抽象的概念,在Kotlin中的具象化表达为Job,并且Kotlin协程标准库提供了使用CoroutineScope为receiver的启动方法。我们可以这么启动子协程:

1
2
3
4
5
6
7
8
9
val job0 = coroutineScope.launch {
val job1 = launch {
delay(3000)
}
val job2 = launch {
delay(2000)
}
delay(1000)
}

从直觉来看,很自然的就能知道三个job之间的关系,job1、job2是job0的子协程:

事实上在标准库中的实现也确实是这个结构。父子协程关系的建立被标准库抽象成了Job.attachChild方法,在启动一个Job(即协程)时会与父Job建立父子关系。

CoroutineScope.launch启动的协程为例,内部新建了一个协程AbstractCoroutine,并在其构造方法中构建父子协程关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AbstractCoroutine : JobSupport(...) {
init {
if (initParentJob) initParentJob(parentContext[Job])
}
}

open class JobSupport : Job, ChildJob, ParentJob {
var parentHandle: ChildHandle?

protected fun initParentJob(parent: Job?) {
if (parent == null) {
parentHandle = NonDisposableHandle
return
}

...
parentHandle = parent.attachChild(this)
...
}
}

attachChild的真正实现在JobSupport内,它返回的是一个双向链表的节点;而这个双向链表的作用,是维护父Job下所有子协程的Job取消回调和Job完成回调。由于此时是子Job调用父JobattachChild方法,所以我们把视角切换到父Job,跟进代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
open class JobSupport  : Job, ChildJob, ParentJob {
override fun attachChild(child: ChildJob): ChildHandle {
return invokeOnCompletion(
onCancelling = true,
handler = ChildHandleNode(child).asHandler
) as ChildHandle
}

override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
...
}
}

由于协程标准库使用无锁算法实现双向链表,所以在代码实现中充斥着大量的自旋操作。这里简化了部分代码,只保留关键部分。

首先,attachChild方法其实是将传入的childJob包装成为了ChildHandleNode后,又去调用了invokeOnComplete方法。所谓的ChildHandleNode,我认为其实是父Job与子Job建立双向关系的连接点,具体其实现便能明白:

1
2
3
4
5
6
7
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

其次,在invokeOnComplete方法内会构建一个双向链表的节点,然后根据当前Job的状态决定是否插入到双向链表中和返回该节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
open class JobSupport  : Job, ChildJob, ParentJob {
override fun invokeOnCompletion(
onCancelling: Boolean,
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
val node: JobNode = makeNode(handler, onCancelling)
loopOnState { state ->
when (state) {
is Empty -> {
_state.compareAndSet(state, node)
return node
}
is Incomplete -> {
val list = state.list
addLastAtomic(state, list, node)
return node
}
else -> {
// is complete
return NonDisposableHandle
}
}
}
}

private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
list.addLastIf(node) { this.state === expect }
}
  • 如果当前Job处于Empty状态(代表未完成,且没有任何回调,链表为空),就以创建的回调节点为链表头,并返回创建的节点;
  • 如果当前的Job处于Incomplete状态(代表未完成,但有回调,链表不为空),就调用addLastAtomic将新建的回调节点插入到维护在状态state内的双向链表中,并返回新建的节点;
  • 否则,当前Job已经完成了,则返回一个NonDisposableHandle,代表不会再对任何操作做出响应。

此时Job父子关系已经构建完毕,其关键在于Job内部维护的状态_state。状态内部存储着一个双向链表,每个链表节点代表着父子Job沟通的桥梁ChildHandleNode,内部同时存储了父Job和子Job

光阅读代码略显抽象,不妨以上文的job0job1job2为例,使用GlobalScope.launch启动job0时,由于job0并没有父Job,所以Job构建父子关系的过程相对较为简单,其结果为:

由于job0的上下文CoroutineContext中没有Job,因此其parentHandle被初始化为NonDisposableHandle,然后直接返回。而_state则使用默认值Empty,代表没有任何的取消、完成回调。

job0运行的时候,启动了job1。由于启动job1的协程作用域CoroutineScope中存在Job(即job0),所以会执行父子关系构建的逻辑。parentHandle被设置为ChildHandle,内部指向了双向链表表头ChildHandleNode——维护job0job1关系的桥梁。

由于job1并没有子Job,所以其parentHandle被初始化为NonDisposableHandle,其_state为默认值,即Empty

因此,在job1启动后,协程间的关系为:

job1启动完毕后,job2的启动过程也类似,我们可以如法炮制。当涉及到双向链表的插入操作时,job0job2的桥梁——ChildHandleNode会被插入到表头之后。最终,在job2启动后,协程间的关系为:

小结

协程在启动时,会去检查传入的协程上下文CoroutineContext中是否有Job的存在。如果存在Job,则认为是父Job,进而去初始化父子Job的关系;反之则跳过;

在初始化父子Job关系时,子Job会调用父JobattachChild,主动让父Job来attach自己。在attachChild的过程中,父Job会构建一个双向链表,每个节点为ChildHandleNode,存储父Job自己和对应的子Job,使得父Job和子Job能够双向通信。

最终,父Job会将这个双向链表包装成ChildHandle,保存在parentHandle中。

异常传播

了解完协程间的结构后,或许我们就能轻松理解异常在协程间传播的逻辑。在一个协程内,异常的传递类似于剥洋葱的方式,在被包了多层Continuation的洋葱结构内进行传播,最终被捕获的异常传递到了AbstractCoroutineresumeWith方法中:

1
2
3
4
5
6
class AbstractCoroutine {
override fun resumeWith(result: Result<T>) {
makeCompletingOnce(result.toState())
...
}
}

异常传播的关键全在makeCompletingOnce方法中,也是异常传播、协程取消等庞大逻辑的入口函数。其中的Result.toState方法,会将成功和失败包装成对应的类,方便后续对结果成功和结果失败的判断。我们进一步追踪makeCompletingOnce方法看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class JobSupport {

internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
tryMakeCompleting(state, proposedUpdate)
...
}

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
tryFinalizeSimpleState(state, proposedUpdate)
return proposedUpdate
}
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
}

贴出的代码省略了各种中间状态,只保留主流程;其调用流程如下:

可以看到,在tryMakeCompleting方法内,会对当前Job的状态进行一次判断,以决定走快路径还是走慢路径。所谓快路径和慢路径,区别在于慢路径需要处理一些中间状态,例如多个异常合并为一个异常、挂起等待子Job完成等状态,但结果都是殊途同归,和快路径一样,最终会扭转Job自身的状态,并通知子Job和父Job

先来看看慢路径,慢路径主要处理了多个异常合并为一个异常、挂起等待子Job完成等逻辑。总体流程如下:

这里有几个比较关键的方法,nofityCancellingtryWaitForChildcancelParent

nofityCancelling(list, cause)

如其命名以及参数列表,这个方法最主要的职责是将异常通知给当前Job的子Job。查看代码:

1
2
3
4
5
6
7
8
9
class JobSupprot {
private fun notifyCancelling(list: NodeList, cause: Throwable) {
...
// 取消子Job
notifyHandlers<JobCancellingNode>(list, cause)
// 取消当前Job
cancelParent(cause)
}
}

tryWaitForChild

cancelParent