对 Flow 的进出端口使用单个参与者

Using a single actor for in and out ports of a Flow

我开始探索 akka-stream API 并希望对其进行测试。我有一个特定的用例,我似乎找不到合适的流媒体 API 来使用。

这个想法类似于下面的想法,但有点不同:

Source(1 to 10).mapAsync(num => actorRef ? Request(num)).to(Sink.foreach(println))

在这种情况下,询问将 return 通过未来的单个消息。对于发送给演员的每个请求,我想 return 多条消息。例如:

  1. ActorRef 被发送 Request(2)
  2. ActorRef 从流中发送 N 条消息。
  3. ActorRef 被发送 Request(3)
  4. ActorRef 从流中发送 M 条消息。

我可以在 Actor 中添加额外的逻辑 aggregate/buffer 这些额外的消息进入某种 Iterable 但我想知道我是否在 API 中遗漏了一些东西来处理一个案例像这样。

我在浏览文档时最接近的是使用 Sink.actorRefWithAck 和 Source.queue 以及 Flow.fromSinkAndSourceMat 并在确认 onInitMessage 之前将物化的 SourceQueue 传递给 actorRef。这允许在处理流中的一对多消息时控制上游和下游背压。使图形易于理解似乎适得其反。

我相信您正在寻找 flatMapConcat

Source(1 to 10).flatMapConcat(num => 
  // Dummy example - outputs first 5 successors for each incoming element
  Source(num to (num + 5))
).to(Sink.foreach(println))

通过这种方式,您可以让 Akka 提升所有反应流的重量,并且不会让裸露的 actor 污染您的管道。

文档 here.

很遗憾,您要查找的内容似乎不存在。 ask pattern,例如?方法,只能return1个值。因此,当你做类似

的事情时
type Input = ???
type Outupt = ???

def askActorFlow(actorRef : ActorRef) : Flow[Input, Output, NotUsed] = 
  Flow[Input] mapAsync { input => (actorRef ? input).mapTo[Output] }

您一次只能从 ActorRef 返回 1 个值。

您建议的工作,例如"add extra logic into the Actor to aggregate/buffer these extra messages into a Iterable",无论如何是一个更好的解决方案。它使您的 Actor 用户更容易知道他们对任何查询只会得到 1 个响应。否则,用户永远不会真正知道他们是否在查询后从 Actor 那里得到了 "complete" 答复,或者是否有更多数据即将到来。