自定义姐妹流
Custom Akka Streams
我有一组流阶段(源、流和汇),我想向其中添加一些元数据信息。
因此,而不是源产生 A -> (A, StreamMetaData)。我已经设法使用自定义流阶段做到这一点,从而在抓取(输入)元素时,我推出(输出,(elem,StreamMetaData))。实际上,它不是 'converting' 现有源,而是将其传递给流以重新创建新源。
现在我正在尝试实现以下 MetaStream 阶段:
因此,假设源正在生成 (A, StreamMetaData) 的元组,我想将 A 传递给现有的 Flow 以进行一些计算,然后将生成的输出 'B' 与 StreamMetaData 合并.然后这些将被传递到接受(B,StreamMetaData)的接收器。
你建议我怎么做。我被告知部分图表是最好的选择,并且有助于完成这样的任务。 UniformFanOut 和 UniformFanIn 使用 Unzip((A streamMetaData), A, StreamMetaData) 和 Zip(A,B)
val fanOut = GraphDSL.create() { implicit b =>
val unzip = b.add(Unzip[T, StreamMetaData])
UniformFanOutShape(unzip.in, unzip.out0, unzip.out1)
}
val fanIn = GraphDSL.create() { implicit b =>
val zip = b.add(Zip[T ,StreamMetaData]())
UniformFanInShape(zip)
}
如何连接 fanIn 和 fanOut 以实现与图片中相同的行为?
我有这样的想法;
def metaFlow[T, B, Mat](flow: Flow[T, B, Mat]): Unit = {
val wrappedFlow =
Flow.fromGraph(GraphDSL.create(){ implicit b =>
import GraphDSL.Implicits._
val unzip: FanOutShape2[(T, StreamMetaData), T, StreamMetaData] = b.add(Unzip[T, StreamMetaData])
val existingFlow = b.add(flow)
val zip: FanInShape2[B,StreamMetaData,(B,StreamMetaData)] = b.add(Zip[B, StreamMetaData])
unzip.out0 ~> existingFlow ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape(unzip.in, zip.out)
})
}
提前致谢。
这个创建新的 SourceShape 堆叠流程图的方法可以工作,与您的 flowShape 实现有点不同。
def sourceGraph[A, B](f: A => B, source: Source[(A, StreamMetaData), NotUsed]) = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[A, StreamMetaData]())
val zip = builder.add(Zip[B, StreamMetaData]())
val flow0 = builder.add(Flow[A].map { f(_) })
val flow1 = source ~> unzip.in
unzip.out0 ~> flow0 ~> zip.in0
unzip.out1 ~> zip.in1
SourceShape(zip.out)
})
def flowGraph[A, B](f: A => B) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[A, StreamMetaData]())
val zip = builder.add(Zip[B, StreamMetaData]())
val flow0 = builder.add(Flow[A].map { f(_) })
unzip.out0 ~> flow0 ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape(unzip.in, zip.out)
})
我有一组流阶段(源、流和汇),我想向其中添加一些元数据信息。
因此,而不是源产生 A -> (A, StreamMetaData)。我已经设法使用自定义流阶段做到这一点,从而在抓取(输入)元素时,我推出(输出,(elem,StreamMetaData))。实际上,它不是 'converting' 现有源,而是将其传递给流以重新创建新源。
现在我正在尝试实现以下 MetaStream 阶段:
因此,假设源正在生成 (A, StreamMetaData) 的元组,我想将 A 传递给现有的 Flow 以进行一些计算,然后将生成的输出 'B' 与 StreamMetaData 合并.然后这些将被传递到接受(B,StreamMetaData)的接收器。
你建议我怎么做。我被告知部分图表是最好的选择,并且有助于完成这样的任务。 UniformFanOut 和 UniformFanIn 使用 Unzip((A streamMetaData), A, StreamMetaData) 和 Zip(A,B)
val fanOut = GraphDSL.create() { implicit b =>
val unzip = b.add(Unzip[T, StreamMetaData])
UniformFanOutShape(unzip.in, unzip.out0, unzip.out1)
}
val fanIn = GraphDSL.create() { implicit b =>
val zip = b.add(Zip[T ,StreamMetaData]())
UniformFanInShape(zip)
}
如何连接 fanIn 和 fanOut 以实现与图片中相同的行为?
我有这样的想法;
def metaFlow[T, B, Mat](flow: Flow[T, B, Mat]): Unit = {
val wrappedFlow =
Flow.fromGraph(GraphDSL.create(){ implicit b =>
import GraphDSL.Implicits._
val unzip: FanOutShape2[(T, StreamMetaData), T, StreamMetaData] = b.add(Unzip[T, StreamMetaData])
val existingFlow = b.add(flow)
val zip: FanInShape2[B,StreamMetaData,(B,StreamMetaData)] = b.add(Zip[B, StreamMetaData])
unzip.out0 ~> existingFlow ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape(unzip.in, zip.out)
})
}
提前致谢。
这个创建新的 SourceShape 堆叠流程图的方法可以工作,与您的 flowShape 实现有点不同。
def sourceGraph[A, B](f: A => B, source: Source[(A, StreamMetaData), NotUsed]) = Source.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[A, StreamMetaData]())
val zip = builder.add(Zip[B, StreamMetaData]())
val flow0 = builder.add(Flow[A].map { f(_) })
val flow1 = source ~> unzip.in
unzip.out0 ~> flow0 ~> zip.in0
unzip.out1 ~> zip.in1
SourceShape(zip.out)
})
def flowGraph[A, B](f: A => B) = Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[A, StreamMetaData]())
val zip = builder.add(Zip[B, StreamMetaData]())
val flow0 = builder.add(Flow[A].map { f(_) })
unzip.out0 ~> flow0 ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape(unzip.in, zip.out)
})