Akka 流图 - 如何从 akka.stream.contrib 测试 PartitionWith

Akka Stream Graphs - How to test PartitionWith from akka.stream.contrib

我编写了一个简单的测试来探索 akka.stream.contrib 中的 PartitionWith 图形功能。这是代码片段:

class Scratch
    extends TestKit(ActorSystem("PartitionWith"))
    with WordSpecLike
    with ScalaFutures
    with Eventually
    with Matchers {

  "PartitionWith" should {
    "split source" in {
      val source: Source[Either[Int, String], NotUsed] = Source(List(Left(1), Right("one")))
      val leftHeadSink = Sink.head[Int]
      val rightHeadSink = Sink.head[String]

      val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftHeadSink, rightHeadSink)((_, _, _)) {
        implicit builder: GraphDSL.Builder[(NotUsed, Future[Int], Future[String])] => (s, l, r) =>
          import GraphDSL.Implicits._

          val pw = builder.add(PartitionWith.apply[Either[Int, String], Int, String](identity))

          s ~> pw.in
          pw.out0 ~> l.in
          pw.out1 ~> r.in

          ClosedShape
      })

      val event = flow.run()
      event._2.futureValue shouldBe 1     // first check
      event._3.futureValue shouldBe "one" // second check
    }

当我运行上面的测试时,它抛出这个错误:

The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
org.scalatest.exceptions.TestFailedException: The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.

第二次检查似乎失败了,因为 rightHeadSink 是空的。我想知道 Source(List(Left(1), Right("one"))) 中的 Right("one") 是否从未被处理过??

我该如何解决这个问题?

随着第一个 Sink.head 完成,似乎整个流都已完成,因此第二个接收器无法检索其元素。

尝试使用 2 Sink.seq 秒而不是 2 Sink.head 秒进行测试,以避免您的流过早终止。