使用 Akka Streams 询问与告知或转发 Actors

Ask vs Tell or forward for Actors using Akka Streams

嗨,我正在与 akka streamsakka-stream-kafka 一起工作。我正在使用以下设置设置流:

Source (Kafka) --> | Akka Actor Flow | --> Sink (MongoDB)

Actor Flow 基本上由将处理数据的 Actors 组成,下面是层次结构:

                                      System
                                         | 
                                     Master Actor  
                                      /       \
                          URLTypeHandler     SerializedTypeHandler
                             /       \                   |
                     Type1Handler   Type2Handler     SomeOtherHandler

所以 Kafka 有消息,我在 atMostOnceSource 配置中写下消费者和 运行 它并使用

Consumer.Control control =
            Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(TOPIC))
                    .mapAsyncUnordered(10, record -> processAccessLog(rootHandler, record.value()))
                    .to(Sink.foreach(it -> System.out.println("FinalReturnedString--> " + it)))
                    .run(materializer);

我最初使用打印作为接收器,只是为了获得流量 运行ning。

并且 processAccessLog 定义为:

private static CompletionStage<String> processAccessLog(ActorRef handler, byte[] value) {

    handler.tell(value, ActorRef.noSender());

    return CompletableFuture.completedFuture("");
}

现在,根据定义 ask 必须在演员期待响应时使用,在这种情况下有意义,因为我想 return 值写入接收器。

但是每个人(包括文档)都提到要避免 ask 而是使用 tellforward,上面写了一个很棒的博客 Don't Ask, Tell

在他提到的博客中,如果是嵌套的actors,第一条消息使用tell,然后使用forward让消息到达目的地,然后在处理后直接发送消息回到根演员。

现在问题来了,

  1. 如何将消息从 D 发送回 A,以便我仍然可以使用接收器。
  2. 开放式流是一种好习惯吗?例如Sink 无关紧要的流,因为演员已经完成了工作。 (我认为不建议这样做,似乎有缺陷)。

ask 仍然是正确的模式

从链接的博客文章中,ask 中的一篇 "drawback" 是:

blocking an actor itself, which cannot pick any new messages until the response arrives and processing finishes.

然而,在 akka-stream 中,这正是我们正在寻找的特征,a.k.a。 "back-pressure"。如果 FlowSink 需要很长时间来处理数据,那么我们希望 Source 放慢速度。

附带说明一下,我认为博客 post 中关于附加侦听器 Actor 导致实现 "dozens times heavier" 的说法有些夸张。显然,中间 Actor 会增加一些延迟开销,但不会增加 12x

背压消除

您正在寻找的任何实现都将有效地消除背压。仅使用 tell 的中间流会不断将需求传播回源,而不管处理程序 Actors 中的处理逻辑是否以与源生成数据相同的速度完成计算。 考虑一个极端的例子:如果您的 Source 每秒可以产生 100 万条消息,但是通过 tell 接收这些消息的 Actor 每秒只能处理 1 条消息,那会怎样?那个 Actor 的邮箱会怎样?

通过 ,您有意将处理程序的速度与您的源生成数据的速度联系起来。

如果您愿意移除从 Sink 到 Source 的背压信号,那么您最好一开始就不要使用 akka-stream。您可以使用背压或非阻塞消息传递,但不能同时使用两者。

Ramon J Romero y Vigil 是对的,但我会尝试扩展响应。

1) 我认为 "Don't ask, tell" 教条主要针对 Actor 系统架构。这里你需要 return 一个 Future 以便流可以解析处理后的结果,你有两个选择:

  • 使用询问
  • 为每个事件创建一个 actor 并传递给他们 Promise,这样当这个 actor 接收到数据时 Future 就会完成(您可以使用 getSender 方法,这样 D 就可以将响应发送给 A)。无法在消息中发送 Promise 或 Future(它们不可序列化)因此无法避免创建这种短命的 actor。

最后你做的基本上是一样的...

2) 使用空的 Sink 来完成流是完全没问题的(实际上 akka 提供了 Sink.ignore() 方法来这样做)。

似乎您没有找到使用流的原因,它们是提供可组合性、并发性和背压的很酷的抽象。另一方面,Actor 无法组合并且难以处理背压。如果您不需要此功能并且您的演员可以轻松完成工作,那么您不应该首先使用 akka-streams。