如何从操作列表中创建接收器
How can I create a sink from a list of operations
我想在由许多操作组成的 akka 流中创建一个接收器。
例如映射、过滤、折叠然后下沉。
我现在能做的最好的是以下内容。
我不喜欢它,因为我必须指定广播,即使我只让一个值通过。
有谁知道这样做的更好方法吗?
def kafkaSink(): Sink[PartialBatchProcessedResult, NotUsed] = {
Sink.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[PartialBatchProcessedResult](1))
broadcast.out(0)
.fold(new BatchPublishingResponseCollator()) { (c, e) => c.consume(e) }
.map(_.build())
.map(a =>
FunctionalTesterResults(sampleProjectorConfig, 0, a)) ~> Sink.foreach(new KafkaTestResultsReporter().report)
SinkShape(broadcast.in)
})
}
要记住 akka-stream
的一个关键点是任何数量的 Flow
值加上一个 Sink
值仍然是一个 Sink。
几个例子证明了这一点属性:
val intSink : Sink[Int, _] = Sink.head[Int]
val anotherSink : Sink[Int, _] =
Flow[Int].filter(_ > 0)
.to(intSink)
val oneMoreSink : Sink[Int, _] =
Flow[Int].filter(_ > 0)
.map(_ + 4)
.to(intSink)
因此,您可以将 map
和 filter
作为流程来实施。你问的 fold
可以用 Sink.fold
.
来实现
我想在由许多操作组成的 akka 流中创建一个接收器。 例如映射、过滤、折叠然后下沉。 我现在能做的最好的是以下内容。 我不喜欢它,因为我必须指定广播,即使我只让一个值通过。 有谁知道这样做的更好方法吗?
def kafkaSink(): Sink[PartialBatchProcessedResult, NotUsed] = {
Sink.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[PartialBatchProcessedResult](1))
broadcast.out(0)
.fold(new BatchPublishingResponseCollator()) { (c, e) => c.consume(e) }
.map(_.build())
.map(a =>
FunctionalTesterResults(sampleProjectorConfig, 0, a)) ~> Sink.foreach(new KafkaTestResultsReporter().report)
SinkShape(broadcast.in)
})
}
要记住 akka-stream
的一个关键点是任何数量的 Flow
值加上一个 Sink
值仍然是一个 Sink。
几个例子证明了这一点属性:
val intSink : Sink[Int, _] = Sink.head[Int]
val anotherSink : Sink[Int, _] =
Flow[Int].filter(_ > 0)
.to(intSink)
val oneMoreSink : Sink[Int, _] =
Flow[Int].filter(_ > 0)
.map(_ + 4)
.to(intSink)
因此,您可以将 map
和 filter
作为流程来实施。你问的 fold
可以用 Sink.fold
.