当所有协程都已经用 CouroutineExceptionHandler 包装时,如何发现 "Job was cancelled" 异常来自哪里?
How to spot where "Job was cancelled" exception comes from when all your coroutines are already wrapped with a CouroutineExceptionHandler?
我阅读了所有 kotlinx UI docs 并实现了那里描述的 ScopedActivity(参见下面的代码)。
在我的 ScopedActivity 实现中,我还添加了一个 CouroutineExceptionHandler,尽管我将我的异常处理程序传递给了我的所有协程,但我的用户遇到了崩溃,我在堆栈跟踪中获得的唯一信息是 "Job was cancelled"。
我搜索了几天,但没有找到解决方案,我的用户仍然随机崩溃,但我不明白为什么...
这是我的 ScopedActivity 实现
abstract class ScopedActivity : BaseActivity(), CoroutineScope by MainScope() {
val errorHandler by lazy { CoroutineExceptionHandler { _, throwable -> onError(throwable) } }
open fun onError(e: Throwable? = null) {
e ?: return
Timber.i(e)
}
override fun onDestroy() {
super.onDestroy()
cancel()
}
}
这是一个 activity 实现它的例子:
class ManageBalanceActivity : ScopedActivity() {
@Inject
lateinit var viewModel: ManageBalanceViewModel
private var stateJob: Job? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_manage_balance)
AndroidInjection.inject(this)
init()
}
private fun init() {
SceneManager.create(
SceneCreator.with(this)
.add(Scene.MAIN, R.id.activity_manage_balance_topup_view)
.add(Scene.MAIN, R.id.activity_manage_balance_topup_bt)
.add(Scene.SPINNER, R.id.activity_manage_balance_spinner)
.add(Scene.SPINNER, R.id.activity_manage_balance_info_text)
.add(Scene.PLACEHOLDER, R.id.activity_manage_balance_error_text)
.first(Scene.SPINNER)
)
// Setting some onClickListeners ...
bindViewModel()
}
private fun bindViewModel() {
showProgress()
stateJob = launch(errorHandler) {
viewModel.state.collect { manageState(it) }
}
}
private fun manageState(state: ManageBalanceState) = when (state) {
is ManageBalanceState.NoPaymentMethod -> viewModel.navigateToManagePaymentMethod()
is ManageBalanceState.HasPaymentMethod -> onPaymentMethodAvailable(state.balance)
}
private fun onPaymentMethodAvailable(balance: Cash) {
toolbarTitle.text = formatCost(balance)
activity_manage_balance_topup_view.currency = balance.currency
SceneManager.scene(this, Scene.MAIN)
}
override fun onError(e: Throwable?) {
super.onError(e)
when (e) {
is NotLoggedInException -> loadErrorScene(R.string.error_pls_signin)
else -> loadErrorScene()
}
}
private fun loadErrorScene(@StringRes textRes: Int = R.string.generic_error) {
activity_manage_balance_error_text.setOnClickListener(this::reload)
SceneManager.scene(this, Scene.PLACEHOLDER)
}
private fun reload(v: View) {
v.setOnClickListener(null)
stateJob.cancelIfPossible()
bindViewModel()
}
private fun showProgress(@StringRes textRes: Int = R.string.please_wait_no_dot) {
activity_manage_balance_info_text.setText(textRes)
SceneManager.scene(this, Scene.SPINNER)
}
override fun onDestroy() {
super.onDestroy()
SceneManager.release(this)
}
}
fun Job?.cancelIfPossible() {
if (this?.isActive == true) cancel()
}
这是 ViewModel
class ManageBalanceViewModel @Inject constructor(
private val userGateway: UserGateway,
private val paymentGateway: PaymentGateway,
private val managePaymentMethodNavigator: ManagePaymentMethodNavigator
) {
val state: Flow<ManageBalanceState>
get() = paymentGateway.collectSelectedPaymentMethod()
.combine(userGateway.collectLoggedUser()) { paymentMethod, user ->
when (paymentMethod) {
null -> ManageBalanceState.NoPaymentMethod
else -> ManageBalanceState.HasPaymentMethod(Cash(user.creditBalance.toInt(), user.currency!!))
}
}
.flowOn(Dispatchers.Default)
// The navigator just do a startActivity with a clear task
fun navigateToManagePaymentMethod() = managePaymentMethodNavigator.navigate(true)
}
最有可能出现问题的原因是您将协程异常处理程序(我们将其命名为 CEH)直接传递给您的启动块。这些启动块正在创建新作业(重要 - 普通作业,而不是主管作业),它们成为范围内作业的子作业(Scoped Activity 中的 MainScope)。
普通作业将取消其所有子项和自身,如果它的任何子项引发异常。 CEH 不会阻止这种行为。它将获取这些异常并执行它被告知要对它们执行的操作,但它仍然不会阻止范围内的 Job 及其所有子项的取消。最重要的是,它也会在层次结构中向上传播异常。 TLDR - 不会处理崩溃。
为了让您的 CEH 发挥作用,您需要将其安装在具有 SuperVisorJob(或 NonCancellable)的上下文中。 SupervisorJob 假定您在其范围内监督异常,因此在引发异常时它不会取消自身或其子级(但是,如果根本没有处理异常,它无论如何都会将其传播到层次结构中)。
例如在您的 ScopedActivity 范围中:
abstract class ScopedActivity : BaseActivity(), CoroutineScope {
override val coroutineContext = Dispatchers.Main + SupervisorJob() + CoroutineExceptionHandler { _, error ->
...
}
如果您确实需要,可以将 CEH 安装在协程层次结构的更深处。但是它看起来很笨拙,不推荐这样做:
launch {
val supervisedJob = SupervisorJob(coroutineContext[Job])
launch(supervisedJob + CEH) {
throw Exception()
}
yield()
println("I am still alive, exception was catched by CEH")
}
但是,如果您想启动一些即发即弃的不可取消的副作用,上述做法可能会有用:
launch(NonCancellable + CEH) {
throw Exception()
}
问题出在 Kotlin Flow 试图在取消后发出,这里是我创建的扩展,用于消除生产中发生的崩溃:
/**
* Check if the channel is not closed and try to emit a value, catching [CancellationException] if the corresponding
* has been cancelled. This extension is used in call callbackFlow.
*/
@ExperimentalCoroutinesApi
fun <E> SendChannel<E>.safeOffer(value: E): Boolean {
if (isClosedForSend) return false
return try {
offer(value)
} catch (e: CancellationException) {
false
}
}
/**
* Terminal flow operator that collects the given flow with a provided [action] and catch [CancellationException]
*/
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (value: T) -> Unit): Unit =
collect { value ->
try {
action(value)
} catch (e: CancellationException) {
// Do nothing
}
}
/**
* Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope] and catch
* [CancellationException]
* It is a shorthand for `scope.launch { flow.safeCollect {} }`.
*/
fun <T> Flow<T>.safeLaunchIn(scope: CoroutineScope) = scope.launch {
this@safeLaunchIn.safeCollect { /* Do nothing */ }
}
希望对您有所帮助
我阅读了所有 kotlinx UI docs 并实现了那里描述的 ScopedActivity(参见下面的代码)。
在我的 ScopedActivity 实现中,我还添加了一个 CouroutineExceptionHandler,尽管我将我的异常处理程序传递给了我的所有协程,但我的用户遇到了崩溃,我在堆栈跟踪中获得的唯一信息是 "Job was cancelled"。
我搜索了几天,但没有找到解决方案,我的用户仍然随机崩溃,但我不明白为什么...
这是我的 ScopedActivity 实现
abstract class ScopedActivity : BaseActivity(), CoroutineScope by MainScope() {
val errorHandler by lazy { CoroutineExceptionHandler { _, throwable -> onError(throwable) } }
open fun onError(e: Throwable? = null) {
e ?: return
Timber.i(e)
}
override fun onDestroy() {
super.onDestroy()
cancel()
}
}
这是一个 activity 实现它的例子:
class ManageBalanceActivity : ScopedActivity() {
@Inject
lateinit var viewModel: ManageBalanceViewModel
private var stateJob: Job? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_manage_balance)
AndroidInjection.inject(this)
init()
}
private fun init() {
SceneManager.create(
SceneCreator.with(this)
.add(Scene.MAIN, R.id.activity_manage_balance_topup_view)
.add(Scene.MAIN, R.id.activity_manage_balance_topup_bt)
.add(Scene.SPINNER, R.id.activity_manage_balance_spinner)
.add(Scene.SPINNER, R.id.activity_manage_balance_info_text)
.add(Scene.PLACEHOLDER, R.id.activity_manage_balance_error_text)
.first(Scene.SPINNER)
)
// Setting some onClickListeners ...
bindViewModel()
}
private fun bindViewModel() {
showProgress()
stateJob = launch(errorHandler) {
viewModel.state.collect { manageState(it) }
}
}
private fun manageState(state: ManageBalanceState) = when (state) {
is ManageBalanceState.NoPaymentMethod -> viewModel.navigateToManagePaymentMethod()
is ManageBalanceState.HasPaymentMethod -> onPaymentMethodAvailable(state.balance)
}
private fun onPaymentMethodAvailable(balance: Cash) {
toolbarTitle.text = formatCost(balance)
activity_manage_balance_topup_view.currency = balance.currency
SceneManager.scene(this, Scene.MAIN)
}
override fun onError(e: Throwable?) {
super.onError(e)
when (e) {
is NotLoggedInException -> loadErrorScene(R.string.error_pls_signin)
else -> loadErrorScene()
}
}
private fun loadErrorScene(@StringRes textRes: Int = R.string.generic_error) {
activity_manage_balance_error_text.setOnClickListener(this::reload)
SceneManager.scene(this, Scene.PLACEHOLDER)
}
private fun reload(v: View) {
v.setOnClickListener(null)
stateJob.cancelIfPossible()
bindViewModel()
}
private fun showProgress(@StringRes textRes: Int = R.string.please_wait_no_dot) {
activity_manage_balance_info_text.setText(textRes)
SceneManager.scene(this, Scene.SPINNER)
}
override fun onDestroy() {
super.onDestroy()
SceneManager.release(this)
}
}
fun Job?.cancelIfPossible() {
if (this?.isActive == true) cancel()
}
这是 ViewModel
class ManageBalanceViewModel @Inject constructor(
private val userGateway: UserGateway,
private val paymentGateway: PaymentGateway,
private val managePaymentMethodNavigator: ManagePaymentMethodNavigator
) {
val state: Flow<ManageBalanceState>
get() = paymentGateway.collectSelectedPaymentMethod()
.combine(userGateway.collectLoggedUser()) { paymentMethod, user ->
when (paymentMethod) {
null -> ManageBalanceState.NoPaymentMethod
else -> ManageBalanceState.HasPaymentMethod(Cash(user.creditBalance.toInt(), user.currency!!))
}
}
.flowOn(Dispatchers.Default)
// The navigator just do a startActivity with a clear task
fun navigateToManagePaymentMethod() = managePaymentMethodNavigator.navigate(true)
}
最有可能出现问题的原因是您将协程异常处理程序(我们将其命名为 CEH)直接传递给您的启动块。这些启动块正在创建新作业(重要 - 普通作业,而不是主管作业),它们成为范围内作业的子作业(Scoped Activity 中的 MainScope)。
普通作业将取消其所有子项和自身,如果它的任何子项引发异常。 CEH 不会阻止这种行为。它将获取这些异常并执行它被告知要对它们执行的操作,但它仍然不会阻止范围内的 Job 及其所有子项的取消。最重要的是,它也会在层次结构中向上传播异常。 TLDR - 不会处理崩溃。
为了让您的 CEH 发挥作用,您需要将其安装在具有 SuperVisorJob(或 NonCancellable)的上下文中。 SupervisorJob 假定您在其范围内监督异常,因此在引发异常时它不会取消自身或其子级(但是,如果根本没有处理异常,它无论如何都会将其传播到层次结构中)。
例如在您的 ScopedActivity 范围中:
abstract class ScopedActivity : BaseActivity(), CoroutineScope {
override val coroutineContext = Dispatchers.Main + SupervisorJob() + CoroutineExceptionHandler { _, error ->
...
}
如果您确实需要,可以将 CEH 安装在协程层次结构的更深处。但是它看起来很笨拙,不推荐这样做:
launch {
val supervisedJob = SupervisorJob(coroutineContext[Job])
launch(supervisedJob + CEH) {
throw Exception()
}
yield()
println("I am still alive, exception was catched by CEH")
}
但是,如果您想启动一些即发即弃的不可取消的副作用,上述做法可能会有用:
launch(NonCancellable + CEH) {
throw Exception()
}
问题出在 Kotlin Flow 试图在取消后发出,这里是我创建的扩展,用于消除生产中发生的崩溃:
/**
* Check if the channel is not closed and try to emit a value, catching [CancellationException] if the corresponding
* has been cancelled. This extension is used in call callbackFlow.
*/
@ExperimentalCoroutinesApi
fun <E> SendChannel<E>.safeOffer(value: E): Boolean {
if (isClosedForSend) return false
return try {
offer(value)
} catch (e: CancellationException) {
false
}
}
/**
* Terminal flow operator that collects the given flow with a provided [action] and catch [CancellationException]
*/
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (value: T) -> Unit): Unit =
collect { value ->
try {
action(value)
} catch (e: CancellationException) {
// Do nothing
}
}
/**
* Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope] and catch
* [CancellationException]
* It is a shorthand for `scope.launch { flow.safeCollect {} }`.
*/
fun <T> Flow<T>.safeLaunchIn(scope: CoroutineScope) = scope.launch {
this@safeLaunchIn.safeCollect { /* Do nothing */ }
}
希望对您有所帮助