Akka Streams 为什么分区在不使用 builder.add 时已经连接?
Akka Streams why partition is already connected when not using builder.add?
我正在试用 Akka Stream API,但我不知道为什么会抛出 java.lang.IllegalArgumentException:[Partition.in] 已在第 5 行连接
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val intSource = Source.fromIterator(() => Iterator.continually(Random.nextInt(10).toString))
val validateInput: Flow[String, Message, NotUsed] = Flow[String].map(Message.fromString)
val validationPartitioner = Partition[Message](2, { // #5 error here
case _: Data => 0
case _ => 1
})
val outputStream = Sink.foreach[Message](println(_))
val errorStream = Sink.ignore
intSource ~> validateInput ~> validationPartitioner.in
validationPartitioner.out(0) ~> outputStream
validationPartitioner.out(1) ~> errorStream
ClosedShape
})
但是如果我将 validationPartitioner 更改为包装在 builder.add(...) 中并从
中删除 .in
intSource ~> validateInput ~> validationPartitioner.in
一切正常。如果我只是删除 .in,代码将无法编译。为什么强制使用生成器,我是漏掉了什么还是错误?
我正在试用 Akka Stream API,但我不知道为什么会抛出 java.lang.IllegalArgumentException:[Partition.in] 已在第 5 行连接
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val intSource = Source.fromIterator(() => Iterator.continually(Random.nextInt(10).toString))
val validateInput: Flow[String, Message, NotUsed] = Flow[String].map(Message.fromString)
val validationPartitioner = Partition[Message](2, { // #5 error here
case _: Data => 0
case _ => 1
})
val outputStream = Sink.foreach[Message](println(_))
val errorStream = Sink.ignore
intSource ~> validateInput ~> validationPartitioner.in
validationPartitioner.out(0) ~> outputStream
validationPartitioner.out(1) ~> errorStream
ClosedShape
})
但是如果我将 validationPartitioner 更改为包装在 builder.add(...) 中并从
中删除 .inintSource ~> validateInput ~> validationPartitioner.in
一切正常。如果我只是删除 .in,代码将无法编译。为什么强制使用生成器,我是漏掉了什么还是错误?