Akka Streams,将元组项目分开?

Akka Streams, break tuple item apart?

正在使用superPool from akka-http, I have a stream that passes down a tuple. I would like to pipeline it to the Alpakka Google Pub/Sub connector。在 HTTP 处理结束时,我对 pub/sub 连接器的所有内容进行编码并以

结束
(PublishRequest, Long) // long is a timestamp

但是连接器的接口是

Flow[PublishRequest, Seq[String], NotUsed]

第一种方法是杀死一个部分:

  .map{ case(publishRequest, timestamp) => publishRequest }
  .via(publishFlow)

有没有一种优雅的方法可以在保留 Long 信息的同时创建此管道?

编辑:在答案中添加了我不太优雅的解决方案。欢迎更多回答。

我不太优雅的解决方案是使用重新组合事物的自定义流程:

val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>

  val bc = b.add(Broadcast[(PublishRequest, Long)](2))
  val publisher = b.add(Flow[(PublishRequest, Long)]
    .map { case (pr, _) => pr }
    .via(publishFlow))
  val zipper = b.add(Zip[Seq[String], Long]). 

  bc.out(0) ~> publisher ~> zipper.in0
  bc.out(1).map { case (pr, long) => long } ~> zipper.in1

  FlowShape(bc.in, zipper.out)

})

我没有发现您使用 GraphDSL.create() 的解决方案有任何不雅之处,我认为它具有通过图解 ~> 子句可视化流结构的优势。我确实在您的代码中看到了问题。例如,我认为 publisher 不应定义为 add-ing 流向构建器。

下面是我认为 publishAndRecombine 应该看起来像的框架版本(经过简要测试):

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = ???

val publishAndRecombine = Flow.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val bcast = b.add(Broadcast[(PublishRequest, Long)](2))
  val zipper = b.add(Zip[Seq[String], Long])

  val publisher = Flow[(PublishRequest, Long)].
    map{ case (pr, _) => pr }.
    via(publishFlow)
  val timestamp = Flow[(PublishRequest, Long)].
    map{ case (_, ts) => ts }

  bcast.out(0) ~> publisher ~> zipper.in0
  bcast.out(1) ~> timestamp ~> zipper.in1

  FlowShape(bcast.in, zipper.out)
})

现在有一个更好的解决方案,将在 Akka 2.6.19 中发布(参见 https://github.com/akka/akka/pull/31123)。

为了使用前面提到的 unsafeViaData,您首先必须使用 FlowWithContext/SourceWithContext 表示 (PublishRequest, Long)FlowWithContext/SourceWithContext 是专门为解决这个问题而设计的抽象(参见 https://doc.akka.io/docs/akka/current/stream/stream-context.html)。问题是你有一个流,其中的数据部分通常是你想要操作的(在你的情况下是 ByteString),然后你有上下文(又名元数据)部分,你通常只是不加修改地传递(在你的情况下 Long).

所以最后你会得到这样的东西

val myFlow: FlowWithContext[PublishRequest, Long, PublishRequest, Long, NotUsed] =
    FlowWithContext.fromTuples(originalFlowAsTuple) // Original flow that has `(PublishRequest, Long)` as an output

myFlow.unsafeViaData(publishFlow)

相比,此解决方案不仅因为它是 akka 的一部分而涉及的样板更少,而且它还保留了具体化的价值而不是丢失它并总是以 NotUsed 结尾.

对于那些想知道为什么方法 unsafeViaData 的名称中有 unsafe 的人来说,这是因为您传递给此方法的 Flow 不能添加、删除或重新排序任何流中的元素(这样做意味着上下文不再正确对应于流的数据部分)。理想情况下,我们会使用 Scala 的类型系统在编译时捕获此类错误,但这样做需要对 akka-stream 进行大量更改,特别是如果更改需要保持向后兼容性(在处理 akka 时我们这样做)。更多细节在前面提到的 PR 中。