测试以无限流完成的 akka-http 路由

Testing akka-http route that completes with infinite stream

我有一个 akka-http 路由,它返回一个包含无限实体流的 Source。我如何使用路由测试包测试它?我只想检查流的前 n 个元素,但我查看了测试包代码,看起来没有直接的方法来访问响应中的 Source。它总是转换为 ByteString 的序列,在我的例子中这只会导致 TimeoutException,因为流永远不会终止。

作为参考,问题可以通过如下所示的路线重现:

case class Bar(wibble: String, wobble: String)

path("stream") {
  get {
    complete {
      import JsonSupport._
      implicit val streamingSupport = EntityStreamingSupport.json()
      Source.unfold(1) { i =>
        Thread.sleep(10)
        Some((i + 1, Bar(i.toString, (i + 1).toString)))
      }
    }
  }
}

看起来每当您访问测试包边界内的响应和实体时,测试包都会尝试完全提取它。

您可以查看 RouteTestResultComponent class 中的私有方法 awaitAllElements 以进一步了解。

您可以尝试简单地使用 akka-streams 组合器。我认为它不会使您的测试代码膨胀太多。示例如下:

def firstNElements(n: Int) = Source.single(yourRequest)
  .via(RouteResult.route2HandlerFlow(route))
  .flatMapConcat(_.entity.dataBytes)
  .take(n)
  .runWith(Sink.seq)
  .futureValue

// assertions