Akka Streams,将元组项目分开?
Akka Streams, break tuple item apart?
正在使用superPool
from akka-http
, I have a stream that passes down a tuple. I would like to pipeline it to the Alpakka Google Pub/Sub connector。在 HTTP 处理结束时,我对 pub/sub 连接器的所有内容进行编码并以
结束
(PublishRequest, Long) // long is a timestamp
但是连接器的接口是
Flow[PublishRequest, Seq[String], NotUsed]
第一种方法是杀死一个部分:
.map{ case(publishRequest, timestamp) => publishRequest }
.via(publishFlow)
有没有一种优雅的方法可以在保留 Long
信息的同时创建此管道?
编辑:在答案中添加了我不太优雅的解决方案。欢迎更多回答。
我不太优雅的解决方案是使用重新组合事物的自定义流程:
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
val bc = b.add(Broadcast[(PublishRequest, Long)](2))
val publisher = b.add(Flow[(PublishRequest, Long)]
.map { case (pr, _) => pr }
.via(publishFlow))
val zipper = b.add(Zip[Seq[String], Long]).
bc.out(0) ~> publisher ~> zipper.in0
bc.out(1).map { case (pr, long) => long } ~> zipper.in1
FlowShape(bc.in, zipper.out)
})
我没有发现您使用 GraphDSL.create()
的解决方案有任何不雅之处,我认为它具有通过图解 ~>
子句可视化流结构的优势。我确实在您的代码中看到了问题。例如,我认为 publisher
不应定义为 add
-ing 流向构建器。
下面是我认为 publishAndRecombine
应该看起来像的框架版本(经过简要测试):
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = ???
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(PublishRequest, Long)](2))
val zipper = b.add(Zip[Seq[String], Long])
val publisher = Flow[(PublishRequest, Long)].
map{ case (pr, _) => pr }.
via(publishFlow)
val timestamp = Flow[(PublishRequest, Long)].
map{ case (_, ts) => ts }
bcast.out(0) ~> publisher ~> zipper.in0
bcast.out(1) ~> timestamp ~> zipper.in1
FlowShape(bcast.in, zipper.out)
})
现在有一个更好的解决方案,将在 Akka 2.6.19 中发布(参见 https://github.com/akka/akka/pull/31123)。
为了使用前面提到的 unsafeViaData
,您首先必须使用 FlowWithContext
/SourceWithContext
表示 (PublishRequest, Long)
。 FlowWithContext
/SourceWithContext
是专门为解决这个问题而设计的抽象(参见 https://doc.akka.io/docs/akka/current/stream/stream-context.html)。问题是你有一个流,其中的数据部分通常是你想要操作的(在你的情况下是 ByteString
),然后你有上下文(又名元数据)部分,你通常只是不加修改地传递(在你的情况下 Long
).
所以最后你会得到这样的东西
val myFlow: FlowWithContext[PublishRequest, Long, PublishRequest, Long, NotUsed] =
FlowWithContext.fromTuples(originalFlowAsTuple) // Original flow that has `(PublishRequest, Long)` as an output
myFlow.unsafeViaData(publishFlow)
与 相比,此解决方案不仅因为它是 akka 的一部分而涉及的样板更少,而且它还保留了具体化的价值而不是丢失它并总是以 NotUsed
结尾.
对于那些想知道为什么方法 unsafeViaData
的名称中有 unsafe
的人来说,这是因为您传递给此方法的 Flow
不能添加、删除或重新排序任何流中的元素(这样做意味着上下文不再正确对应于流的数据部分)。理想情况下,我们会使用 Scala 的类型系统在编译时捕获此类错误,但这样做需要对 akka-stream 进行大量更改,特别是如果更改需要保持向后兼容性(在处理 akka 时我们这样做)。更多细节在前面提到的 PR 中。
正在使用superPool
from akka-http
, I have a stream that passes down a tuple. I would like to pipeline it to the Alpakka Google Pub/Sub connector。在 HTTP 处理结束时,我对 pub/sub 连接器的所有内容进行编码并以
(PublishRequest, Long) // long is a timestamp
但是连接器的接口是
Flow[PublishRequest, Seq[String], NotUsed]
第一种方法是杀死一个部分:
.map{ case(publishRequest, timestamp) => publishRequest }
.via(publishFlow)
有没有一种优雅的方法可以在保留 Long
信息的同时创建此管道?
编辑:在答案中添加了我不太优雅的解决方案。欢迎更多回答。
我不太优雅的解决方案是使用重新组合事物的自定义流程:
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
val bc = b.add(Broadcast[(PublishRequest, Long)](2))
val publisher = b.add(Flow[(PublishRequest, Long)]
.map { case (pr, _) => pr }
.via(publishFlow))
val zipper = b.add(Zip[Seq[String], Long]).
bc.out(0) ~> publisher ~> zipper.in0
bc.out(1).map { case (pr, long) => long } ~> zipper.in1
FlowShape(bc.in, zipper.out)
})
我没有发现您使用 GraphDSL.create()
的解决方案有任何不雅之处,我认为它具有通过图解 ~>
子句可视化流结构的优势。我确实在您的代码中看到了问题。例如,我认为 publisher
不应定义为 add
-ing 流向构建器。
下面是我认为 publishAndRecombine
应该看起来像的框架版本(经过简要测试):
val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = ???
val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[(PublishRequest, Long)](2))
val zipper = b.add(Zip[Seq[String], Long])
val publisher = Flow[(PublishRequest, Long)].
map{ case (pr, _) => pr }.
via(publishFlow)
val timestamp = Flow[(PublishRequest, Long)].
map{ case (_, ts) => ts }
bcast.out(0) ~> publisher ~> zipper.in0
bcast.out(1) ~> timestamp ~> zipper.in1
FlowShape(bcast.in, zipper.out)
})
现在有一个更好的解决方案,将在 Akka 2.6.19 中发布(参见 https://github.com/akka/akka/pull/31123)。
为了使用前面提到的 unsafeViaData
,您首先必须使用 FlowWithContext
/SourceWithContext
表示 (PublishRequest, Long)
。 FlowWithContext
/SourceWithContext
是专门为解决这个问题而设计的抽象(参见 https://doc.akka.io/docs/akka/current/stream/stream-context.html)。问题是你有一个流,其中的数据部分通常是你想要操作的(在你的情况下是 ByteString
),然后你有上下文(又名元数据)部分,你通常只是不加修改地传递(在你的情况下 Long
).
所以最后你会得到这样的东西
val myFlow: FlowWithContext[PublishRequest, Long, PublishRequest, Long, NotUsed] =
FlowWithContext.fromTuples(originalFlowAsTuple) // Original flow that has `(PublishRequest, Long)` as an output
myFlow.unsafeViaData(publishFlow)
与 NotUsed
结尾.
对于那些想知道为什么方法 unsafeViaData
的名称中有 unsafe
的人来说,这是因为您传递给此方法的 Flow
不能添加、删除或重新排序任何流中的元素(这样做意味着上下文不再正确对应于流的数据部分)。理想情况下,我们会使用 Scala 的类型系统在编译时捕获此类错误,但这样做需要对 akka-stream 进行大量更改,特别是如果更改需要保持向后兼容性(在处理 akka 时我们这样做)。更多细节在前面提到的 PR 中。