当所有协程都已经用 CouroutineExceptionHandler 包装时,如何发现 "Job was cancelled" 异常来自哪里?

How to spot where "Job was cancelled" exception comes from when all your coroutines are already wrapped with a CouroutineExceptionHandler?

我阅读了所有 kotlinx UI docs 并实现了那里描述的 ScopedActivity(参见下面的代码)。

在我的 ScopedActivity 实现中,我还添加了一个 CouroutineExceptionHandler,尽管我将我的异常处理程序传递给了我的所有协程,但我的用户遇到了崩溃,我在堆栈跟踪中获得的唯一信息是 "Job was cancelled"。

我搜索了几天,但没有找到解决方案,我的用户仍然随机崩溃,但我不明白为什么...

这是我的 ScopedActivity 实现

abstract class ScopedActivity : BaseActivity(), CoroutineScope by MainScope() {

    val errorHandler by lazy { CoroutineExceptionHandler { _, throwable -> onError(throwable) } }

    open fun onError(e: Throwable? = null) {
        e ?: return
        Timber.i(e)
    }

    override fun onDestroy() {
        super.onDestroy()
        cancel()
    }
}

这是一个 activity 实现它的例子:

class ManageBalanceActivity : ScopedActivity() {

    @Inject
    lateinit var viewModel: ManageBalanceViewModel

    private var stateJob: Job? = null

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_manage_balance)
        AndroidInjection.inject(this)

        init()
    }

    private fun init() {
        SceneManager.create(
            SceneCreator.with(this)
                .add(Scene.MAIN, R.id.activity_manage_balance_topup_view)
                .add(Scene.MAIN, R.id.activity_manage_balance_topup_bt)
                .add(Scene.SPINNER, R.id.activity_manage_balance_spinner)
                .add(Scene.SPINNER, R.id.activity_manage_balance_info_text)
                .add(Scene.PLACEHOLDER, R.id.activity_manage_balance_error_text)
                .first(Scene.SPINNER)
        )

        // Setting some onClickListeners ...
        bindViewModel()
    }

    private fun bindViewModel() {
        showProgress()
        stateJob = launch(errorHandler) {
            viewModel.state.collect { manageState(it) }
        }
    }

    private fun manageState(state: ManageBalanceState) = when (state) {
        is ManageBalanceState.NoPaymentMethod -> viewModel.navigateToManagePaymentMethod()
        is ManageBalanceState.HasPaymentMethod -> onPaymentMethodAvailable(state.balance)
    }

    private fun onPaymentMethodAvailable(balance: Cash) {
        toolbarTitle.text = formatCost(balance)
        activity_manage_balance_topup_view.currency = balance.currency
        SceneManager.scene(this, Scene.MAIN)
    }

    override fun onError(e: Throwable?) {
        super.onError(e)
        when (e) {
            is NotLoggedInException -> loadErrorScene(R.string.error_pls_signin)
            else -> loadErrorScene()
        }
    }

    private fun loadErrorScene(@StringRes textRes: Int = R.string.generic_error) {

   activity_manage_balance_error_text.setOnClickListener(this::reload)
        SceneManager.scene(this, Scene.PLACEHOLDER)
    }

    private fun reload(v: View) {
        v.setOnClickListener(null)
        stateJob.cancelIfPossible()
        bindViewModel()
    }

    private fun showProgress(@StringRes textRes: Int = R.string.please_wait_no_dot) {
        activity_manage_balance_info_text.setText(textRes)
        SceneManager.scene(this, Scene.SPINNER)
    }

    override fun onDestroy() {
        super.onDestroy()
        SceneManager.release(this)
    }
}
fun Job?.cancelIfPossible() {
    if (this?.isActive == true) cancel()
}

这是 ViewModel

class ManageBalanceViewModel @Inject constructor(
    private val userGateway: UserGateway,
    private val paymentGateway: PaymentGateway,
    private val managePaymentMethodNavigator: ManagePaymentMethodNavigator
) {

    val state: Flow<ManageBalanceState>
        get() = paymentGateway.collectSelectedPaymentMethod()
            .combine(userGateway.collectLoggedUser()) { paymentMethod, user ->
                when (paymentMethod) {
                    null -> ManageBalanceState.NoPaymentMethod
                    else -> ManageBalanceState.HasPaymentMethod(Cash(user.creditBalance.toInt(), user.currency!!))
                }
            }
            .flowOn(Dispatchers.Default)

    // The navigator just do a startActivity with a clear task
    fun navigateToManagePaymentMethod() = managePaymentMethodNavigator.navigate(true)
}

最有可能出现问题的原因是您将协程异常处理程序(我们将其命名为 CEH)直接传递给您的启动块。这些启动块正在创建新作业(重要 - 普通作业,而不是主管作业),它们成为范围内作业的子作业(Scoped Activity 中的 MainScope)。

普通作业将取消其所有子项和自身,如果它的任何子项引发异常。 CEH 不会阻止这种行为。它将获取这些异常并执行它被告知要对它们执行的操作,但它仍然不会阻止范围内的 Job 及其所有子项的取消。最重要的是,它也会在层次结构中向上传播异常。 TLDR - 不会处理崩溃。

为了让您的 CEH 发挥作用,您需要将其安装在具有 SuperVisorJob(或 NonCancellable)的上下文中。 SupervisorJob 假定您在其范围内监督异常,因此在引发异常时它不会取消自身或其子级(但是,如果根本没有处理异常,它无论如何都会将其传播到层次结构中)。

例如在您的 ScopedActivity 范围中:

abstract class ScopedActivity : BaseActivity(), CoroutineScope {

override val coroutineContext = Dispatchers.Main + SupervisorJob() + CoroutineExceptionHandler { _, error -> 
...
    }

如果您确实需要,可以将 CEH 安装在协程层次结构的更深处。但是它看起来很笨拙,不推荐这样做:

launch {
    val supervisedJob = SupervisorJob(coroutineContext[Job])
    launch(supervisedJob + CEH) { 
        throw Exception() 
    }
    yield()
    println("I am still alive, exception was catched by CEH")
}

但是,如果您想启动一些即发即弃的不可取消的副作用,上述做法可能会有用:

launch(NonCancellable + CEH) {
        throw Exception()
    }

问题出在 Kotlin Flow 试图在取消后发出,这里是我创建的扩展,用于消除生产中发生的崩溃:

/**
 * Check if the channel is not closed and try to emit a value, catching [CancellationException] if the corresponding
 * has been cancelled. This extension is used in call callbackFlow.
 */
@ExperimentalCoroutinesApi
fun <E> SendChannel<E>.safeOffer(value: E): Boolean {
    if (isClosedForSend) return false
    return try {
        offer(value)
    } catch (e: CancellationException) {
        false
    }
}

/**
 * Terminal flow operator that collects the given flow with a provided [action] and catch [CancellationException]
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect { value ->
        try {
            action(value)
        } catch (e: CancellationException) {
            // Do nothing
        }
    }

/**
 * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope] and catch
 * [CancellationException]
 * It is a shorthand for `scope.launch { flow.safeCollect {} }`.
 */
fun <T> Flow<T>.safeLaunchIn(scope: CoroutineScope) = scope.launch {
    this@safeLaunchIn.safeCollect { /* Do nothing */ }
}

希望对您有所帮助