单元测试回调流程

unit testing callbackFlow

我有一个基于 API 的回调,如下所示:

  class CallbackApi {
    fun addListener(callback: Callback) {
      // todo
    }

    fun removeListener(callback: Callback) {
      // todo
    }

    interface Callback {
      fun onResult(result: Int)
    }
  }

和一个将 API 转换为 hot 冷流的扩展函数:

  fun CallbackApi.toFlow() = callbackFlow<Int> {
    val callback = object : CallbackApi.Callback {
      override fun onResult(result: Int) {
        trySendBlocking(result)
      }
    }
    addListener(callback)
    awaitClose { removeListener(callback) }
  }

您介意建议如何编写单元测试以确保 API 正确转换为热流吗?

这是我的尝试。通过反复试验,我想出了这个解决方案。

  @Test
  fun callbackFlowTest() = runBlocking {
    val callbackApi = mockk<CallbackApi>()
    val callbackSlot = slot<CallbackApi.Callback>()
    every { callbackApi.addListener(capture(callbackSlot)) } just Runs
    every { callbackApi.removeListener(any()) } just Runs
    val list = mutableListOf<Int>()
    val flow: Flow<Int> = callbackApi.toFlow().onEach { list.add(it) }
    val coroutineScope = CoroutineScope(this.coroutineContext + SupervisorJob())
    flow.launchIn(coroutineScope)
    yield()
    launch {
      callbackSlot.captured.onResult(10)
      callbackApi.removeListener(mockk()) // this was a misunderstanding
    }.join()
    assert(list.single() == 10)
  }

但是我不明白这个解决方案的两个部分。

1- 如果没有 SupervisorJob(),测试似乎永远不会结束。也许出于某种原因收集流量永远不会结束,我不明白。我在单独的协程中提供捕获的回调。

2- 如果我删除其中 callbackSlot.captured.onResult(10)launch 主体,测试将失败并出现此错误 UninitializedPropertyAccessException: lateinit property captured has not been initialized。我认为 yield 应该开始流程。

and an extension function which converts the API into a hot flow

这个扩展看起来是正确的,但是流量并不热(也不应该)。它仅在实际收集开始时注册回调,并在收集器取消时取消注册(这包括收集器使用限制项目数量的终端运算符时,例如 .first().take(n))。

对于您的其他问题,请牢记这一点。

In the absence of that SupervisorJob(), it appears that test will never end. Maybe collecting the flow never ends for some reason, which I don't understand

如上所述,由于流的构造方式(以及 CallbackApi 的工作方式),流收集不能由生产者决定结束(回调 API)。它只能通过取消收集器来停止,这也会注销相应的回调(这很好)。

您的自定义作业允许测试结束的原因可能是因为您通过不将当前作业作为父作业的自定义作业覆盖上下文中的作业来逃避结构化并发。但是,您可能仍在从永不取消的范围中泄漏永无止境的协程。

I'm feeding captured callback in a separate coroutine.

没错,虽然我不明白你为什么要从这个单独的协同程序中调用 removeListener。你在这里注销什么回调?请注意,这也不会对流程产生任何影响,因为即使您可以取消注册在 callbackFlow 构建器中创建的回调,它也不会神奇地关闭 callbackFlow 的通道,因此flow 无论如何都不会结束(我假设这就是你在这里尝试做的)。

此外,从外部取消注册回调会阻止您检查它实际上是否已被您的生产代码取消注册。

2- If I remove the launch body which callbackSlot.captured.onResult(10) is inside it, test will fail with this error UninitializedPropertyAccessException: lateinit property captured has not been initialized. I would think that yield should start the flow.

yield() 相当脆弱。如果你使用它,你必须非常清楚当前每个并发协程的代码是如何编写的。它脆弱的原因是它只会将线程让给其他协程,直到下一个挂起点。你无法预知 yield 时会执行哪个协程,也无法预知线程到达挂起点后会恢复哪个协程。如果有几次停赛,所有的赌注都会被取消。如果还有其他 运行ning 协程,则所有赌注也都取消了。

更好的方法是使用 kotlinx-coroutines-test,它提供像 advanceUntilIdle 这样的实用程序,确保其他协程全部完成或等待挂起点。


现在如何解决这个测试?我现在无法测试任何东西,但我可能会这样处理:

  • 使用 kotlinx-coroutines-test 中的 runTest 而不是 runBlocking 来更好地控制其他协同程序 运行 (并等待流集合做某事)
  • 在协程中启动流收集(只是 launch/launchIn(this),没有自定义范围)并保留启动的 Job 的句柄(return [的值=22=]/launchIn)
  • 用一个值调用捕获的回调,advanceUntilIdle()确保流收集器的协程可以处理它,然后断言列表得到了元素(注意:因为一切都是单线程的,回调不是挂起,如果没有缓冲区,这会死锁,但是 callbackFlow 使用默认缓冲区,所以应该没问题)
  • 可选:用不同的值重复以上几次并确认它们被流收集
  • 取消收集作业,advanceUntilIdle(),然后测试回调是否未注册(我不是 Mockk 专家,但应该有一些东西可以检查 removeListener 是否被调用)

注意:也许我是老派,但如果您的 CallbackApi 是一个界面(在您的示例中是 class,但我不确定它在多大程度上反映了现实) ,我宁愿使用通道手动实现模拟来模拟事件并断言期望。我发现推理和调试更容易。这里是an example of what I mean

这是我根据@Joffrey 有用的指南找到的解决方案:

  @Test
  fun callbackFlowTestSolution() = runTest {
    val callbackApi = mockk<CallbackApi>()
    val callbackSlot = slot<CallbackApi.Callback>()
    every { callbackApi.addListener(capture(callbackSlot)) } just Runs
    every { callbackApi.removeListener(any()) } just Runs
    val itemsToSend = Array(9) { index -> index } // [0, 1, 2, 3, 4, 5, 6, 7, 8]
    val collectedItems = mutableListOf<Int>()

    val flow: Flow<Int> = callbackApi.toFlow().onEach { collectedItems.add(it) }
    val collectJob = launch { flow.collect() }
    advanceUntilIdle() // wait for callbackFlow builder to call addListener
    itemsToSend.forEach { callbackSlot.captured.onResult(it) }
    advanceUntilIdle() // wait for flow collection
    collectJob.cancel()
    advanceUntilIdle() // wait for awaitClose

    verify { callbackApi.removeListener(callbackSlot.captured) }
    assertArrayEquals(itemsToSend.toIntArray(), collectedItems.toIntArray())
  }

请考虑升级到 kotlinx-coroutines-test:1.6.0 以使用 runTest。在旧版本中,我们可以使用 runBlockingTest,但它已被弃用。


更新: 如果你想检查流是否可以一个一个收集物品

  @Test
  fun callbackFlowTestSolution() = runTest {
    val callbackApi = mockk<CallbackApi>()
    val callbackSlot = slot<CallbackApi.Callback>()
    every { callbackApi.addListener(capture(callbackSlot)) } just Runs
    every { callbackApi.removeListener(any()) } just Runs
    val itemsToSend = Array(9) { index -> index } // [0, 1, 2, 3, 4, 5, 6, 7, 8]
    var lastCollectedItem: Int? = null

    val flow: Flow<Int> = callbackApi.toFlow().onEach { lastCollectedItem = it }
    val collectJob = launch { flow.collect() }
    advanceUntilIdle() // wait for callbackFlow builder to call addListener
    itemsToSend.forEach {
      callbackSlot.captured.onResult(it)
      advanceUntilIdle() // wait for flow collection
      
      assertEquals(it, lastCollectedItem)
    }
    collectJob.cancel()
    advanceUntilIdle() // wait for awaitClose

    verify { callbackApi.removeListener(callbackSlot.captured) }
  }