Akka 流图 - 如何从 akka.stream.contrib 测试 PartitionWith
Akka Stream Graphs - How to test PartitionWith from akka.stream.contrib
我编写了一个简单的测试来探索 akka.stream.contrib
中的 PartitionWith
图形功能。这是代码片段:
class Scratch
extends TestKit(ActorSystem("PartitionWith"))
with WordSpecLike
with ScalaFutures
with Eventually
with Matchers {
"PartitionWith" should {
"split source" in {
val source: Source[Either[Int, String], NotUsed] = Source(List(Left(1), Right("one")))
val leftHeadSink = Sink.head[Int]
val rightHeadSink = Sink.head[String]
val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftHeadSink, rightHeadSink)((_, _, _)) {
implicit builder: GraphDSL.Builder[(NotUsed, Future[Int], Future[String])] => (s, l, r) =>
import GraphDSL.Implicits._
val pw = builder.add(PartitionWith.apply[Either[Int, String], Int, String](identity))
s ~> pw.in
pw.out0 ~> l.in
pw.out1 ~> r.in
ClosedShape
})
val event = flow.run()
event._2.futureValue shouldBe 1 // first check
event._3.futureValue shouldBe "one" // second check
}
当我运行上面的测试时,它抛出这个错误:
The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
org.scalatest.exceptions.TestFailedException: The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
第二次检查似乎失败了,因为 rightHeadSink
是空的。我想知道 Source(List(Left(1), Right("one")))
中的 Right("one")
是否从未被处理过??
我该如何解决这个问题?
随着第一个 Sink.head
完成,似乎整个流都已完成,因此第二个接收器无法检索其元素。
尝试使用 2 Sink.seq
秒而不是 2 Sink.head
秒进行测试,以避免您的流过早终止。
我编写了一个简单的测试来探索 akka.stream.contrib
中的 PartitionWith
图形功能。这是代码片段:
class Scratch
extends TestKit(ActorSystem("PartitionWith"))
with WordSpecLike
with ScalaFutures
with Eventually
with Matchers {
"PartitionWith" should {
"split source" in {
val source: Source[Either[Int, String], NotUsed] = Source(List(Left(1), Right("one")))
val leftHeadSink = Sink.head[Int]
val rightHeadSink = Sink.head[String]
val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftHeadSink, rightHeadSink)((_, _, _)) {
implicit builder: GraphDSL.Builder[(NotUsed, Future[Int], Future[String])] => (s, l, r) =>
import GraphDSL.Implicits._
val pw = builder.add(PartitionWith.apply[Either[Int, String], Int, String](identity))
s ~> pw.in
pw.out0 ~> l.in
pw.out1 ~> r.in
ClosedShape
})
val event = flow.run()
event._2.futureValue shouldBe 1 // first check
event._3.futureValue shouldBe "one" // second check
}
当我运行上面的测试时,它抛出这个错误:
The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
org.scalatest.exceptions.TestFailedException: The future returned an exception of type: java.util.NoSuchElementException, with message: head of empty stream.
第二次检查似乎失败了,因为 rightHeadSink
是空的。我想知道 Source(List(Left(1), Right("one")))
中的 Right("one")
是否从未被处理过??
我该如何解决这个问题?
随着第一个 Sink.head
完成,似乎整个流都已完成,因此第二个接收器无法检索其元素。
尝试使用 2 Sink.seq
秒而不是 2 Sink.head
秒进行测试,以避免您的流过早终止。