使用 Akka Streams 询问与告知或转发 Actors
Ask vs Tell or forward for Actors using Akka Streams
嗨,我正在与 akka streams
和 akka-stream-kafka
一起工作。我正在使用以下设置设置流:
Source (Kafka) --> | Akka Actor Flow | --> Sink (MongoDB)
Actor Flow
基本上由将处理数据的 Actors 组成,下面是层次结构:
System
|
Master Actor
/ \
URLTypeHandler SerializedTypeHandler
/ \ |
Type1Handler Type2Handler SomeOtherHandler
所以 Kafka 有消息,我在 atMostOnceSource
配置中写下消费者和 运行 它并使用
Consumer.Control control =
Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(TOPIC))
.mapAsyncUnordered(10, record -> processAccessLog(rootHandler, record.value()))
.to(Sink.foreach(it -> System.out.println("FinalReturnedString--> " + it)))
.run(materializer);
我最初使用打印作为接收器,只是为了获得流量 运行ning。
并且 processAccessLog
定义为:
private static CompletionStage<String> processAccessLog(ActorRef handler, byte[] value) {
handler.tell(value, ActorRef.noSender());
return CompletableFuture.completedFuture("");
}
现在,根据定义 ask
必须在演员期待响应时使用,在这种情况下有意义,因为我想 return 值写入接收器。
但是每个人(包括文档)都提到要避免 ask
而是使用 tell
和 forward
,上面写了一个很棒的博客 Don't Ask, Tell。
在他提到的博客中,如果是嵌套的actors,第一条消息使用tell
,然后使用forward
让消息到达目的地,然后在处理后直接发送消息回到根演员。
现在问题来了,
- 如何将消息从 D 发送回 A,以便我仍然可以使用接收器。
- 开放式流是一种好习惯吗?例如Sink 无关紧要的流,因为演员已经完成了工作。 (我认为不建议这样做,似乎有缺陷)。
ask
仍然是正确的模式
从链接的博客文章中,ask
中的一篇 "drawback" 是:
blocking an actor itself, which cannot pick any new messages until the
response arrives and processing finishes.
然而,在 akka-stream
中,这正是我们正在寻找的特征,a.k.a。 "back-pressure"。如果 Flow
或 Sink
需要很长时间来处理数据,那么我们希望 Source
放慢速度。
附带说明一下,我认为博客 post 中关于附加侦听器 Actor
导致实现 "dozens times heavier" 的说法有些夸张。显然,中间 Actor 会增加一些延迟开销,但不会增加 12x
。
背压消除
您正在寻找的任何实现都将有效地消除背压。仅使用 tell
的中间流会不断将需求传播回源,而不管处理程序 Actors 中的处理逻辑是否以与源生成数据相同的速度完成计算。
考虑一个极端的例子:如果您的 Source 每秒可以产生 100 万条消息,但是通过 tell
接收这些消息的 Actor 每秒只能处理 1 条消息,那会怎样?那个 Actor 的邮箱会怎样?
通过 ,您有意将处理程序的速度与您的源生成数据的速度联系起来。
如果您愿意移除从 Sink 到 Source 的背压信号,那么您最好一开始就不要使用 akka-stream。您可以使用背压或非阻塞消息传递,但不能同时使用两者。
Ramon J Romero y Vigil 是对的,但我会尝试扩展响应。
1) 我认为 "Don't ask, tell" 教条主要针对 Actor 系统架构。这里你需要 return 一个 Future 以便流可以解析处理后的结果,你有两个选择:
- 使用询问
- 为每个事件创建一个 actor 并传递给他们 Promise,这样当这个 actor 接收到数据时 Future 就会完成(您可以使用
getSender
方法,这样 D 就可以将响应发送给 A)。无法在消息中发送 Promise 或 Future(它们不可序列化)因此无法避免创建这种短命的 actor。
最后你做的基本上是一样的...
2) 使用空的 Sink 来完成流是完全没问题的(实际上 akka 提供了 Sink.ignore()
方法来这样做)。
似乎您没有找到使用流的原因,它们是提供可组合性、并发性和背压的很酷的抽象。另一方面,Actor 无法组合并且难以处理背压。如果您不需要此功能并且您的演员可以轻松完成工作,那么您不应该首先使用 akka-streams。
嗨,我正在与 akka streams
和 akka-stream-kafka
一起工作。我正在使用以下设置设置流:
Source (Kafka) --> | Akka Actor Flow | --> Sink (MongoDB)
Actor Flow
基本上由将处理数据的 Actors 组成,下面是层次结构:
System
|
Master Actor
/ \
URLTypeHandler SerializedTypeHandler
/ \ |
Type1Handler Type2Handler SomeOtherHandler
所以 Kafka 有消息,我在 atMostOnceSource
配置中写下消费者和 运行 它并使用
Consumer.Control control =
Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(TOPIC))
.mapAsyncUnordered(10, record -> processAccessLog(rootHandler, record.value()))
.to(Sink.foreach(it -> System.out.println("FinalReturnedString--> " + it)))
.run(materializer);
我最初使用打印作为接收器,只是为了获得流量 运行ning。
并且 processAccessLog
定义为:
private static CompletionStage<String> processAccessLog(ActorRef handler, byte[] value) {
handler.tell(value, ActorRef.noSender());
return CompletableFuture.completedFuture("");
}
现在,根据定义 ask
必须在演员期待响应时使用,在这种情况下有意义,因为我想 return 值写入接收器。
但是每个人(包括文档)都提到要避免 ask
而是使用 tell
和 forward
,上面写了一个很棒的博客 Don't Ask, Tell。
在他提到的博客中,如果是嵌套的actors,第一条消息使用tell
,然后使用forward
让消息到达目的地,然后在处理后直接发送消息回到根演员。
现在问题来了,
- 如何将消息从 D 发送回 A,以便我仍然可以使用接收器。
- 开放式流是一种好习惯吗?例如Sink 无关紧要的流,因为演员已经完成了工作。 (我认为不建议这样做,似乎有缺陷)。
ask
仍然是正确的模式
从链接的博客文章中,ask
中的一篇 "drawback" 是:
blocking an actor itself, which cannot pick any new messages until the response arrives and processing finishes.
然而,在 akka-stream
中,这正是我们正在寻找的特征,a.k.a。 "back-pressure"。如果 Flow
或 Sink
需要很长时间来处理数据,那么我们希望 Source
放慢速度。
附带说明一下,我认为博客 post 中关于附加侦听器 Actor
导致实现 "dozens times heavier" 的说法有些夸张。显然,中间 Actor 会增加一些延迟开销,但不会增加 12x
。
背压消除
您正在寻找的任何实现都将有效地消除背压。仅使用 tell
的中间流会不断将需求传播回源,而不管处理程序 Actors 中的处理逻辑是否以与源生成数据相同的速度完成计算。
考虑一个极端的例子:如果您的 Source 每秒可以产生 100 万条消息,但是通过 tell
接收这些消息的 Actor 每秒只能处理 1 条消息,那会怎样?那个 Actor 的邮箱会怎样?
通过
如果您愿意移除从 Sink 到 Source 的背压信号,那么您最好一开始就不要使用 akka-stream。您可以使用背压或非阻塞消息传递,但不能同时使用两者。
Ramon J Romero y Vigil 是对的,但我会尝试扩展响应。
1) 我认为 "Don't ask, tell" 教条主要针对 Actor 系统架构。这里你需要 return 一个 Future 以便流可以解析处理后的结果,你有两个选择:
- 使用询问
- 为每个事件创建一个 actor 并传递给他们 Promise,这样当这个 actor 接收到数据时 Future 就会完成(您可以使用
getSender
方法,这样 D 就可以将响应发送给 A)。无法在消息中发送 Promise 或 Future(它们不可序列化)因此无法避免创建这种短命的 actor。
最后你做的基本上是一样的...
2) 使用空的 Sink 来完成流是完全没问题的(实际上 akka 提供了 Sink.ignore()
方法来这样做)。
似乎您没有找到使用流的原因,它们是提供可组合性、并发性和背压的很酷的抽象。另一方面,Actor 无法组合并且难以处理背压。如果您不需要此功能并且您的演员可以轻松完成工作,那么您不应该首先使用 akka-streams。