限制来自演员的异步调用
Throttling async calls made from actors
我们有一个相当高吞吐量的 actor 系统,它通过 http 对外部系统进行异步调用。我们发现下游系统由于收到来自我们的呼叫数量而不堪重负。
对下游系统的调用是使用此处描述的 "pipe to" 模式进行的:https://petabridge.com/blog/akkadotnet-async-actors-using-pipeto/
之所以对下游系统进行如此多的调用是因为参与者在处理其邮箱中的下一条消息之前不等待异步调用的响应返回(它已完成消息当异步调用开始时)。显然这是设计使然,但在这些情况下,它会导致对外部服务进行大量异步调用。
我们需要一种方法来限制正在进行的调用。我可以想到几个可能的解决方案。
通过等待任务完成同步执行对外部服务的调用。为参与者设置一个池路由器,这基本上是一种限制对该外部服务的调用量的方法。
使用 ReceiveAsync 方法而不是 Receive。这基本上与选项 1 完全相同。在我上面发布的 petabride 页面上,虽然它说了这个方法 - "just don’t do it" :)
在进行异步调用之前,开始存储所有传入消息,然后在异步任务完成后取消存储它们。显然使用这种方法吞吐量要有限得多。
我想知道是否有人在使用 akka 时遇到过类似的问题并且能够解决它?
编辑:
所以最后只有选项 1 对我们有用。 IE。有一个带有 Receive() 的池路由器,专门等待它需要进行的 IO 调用(对外部系统的 api 调用)。这似乎工作得很好,我们可以通过设置池大小来控制 'throttling'。
我们尝试了选项 2 (ReceiveAsync),但我们发现在某些时候系统会逐渐停止并变得无响应而不会引发任何错误。我们怀疑它正在陷入僵局。这将是由于异步键工作的方式与简单地使用 .Wait() 或 .Result 等待任务的方式有关。我现在明白了为什么 Petabridge 建议不要使用 ReceiveAsync :)
我们没有尝试选项 3,因为这意味着更重大的变化。
至于我,我已经通过使用路由器创建子角色来解决这个问题,它一次只能处理一条消息。因此,您可以配置具有多个 worker 的外部系统负载。此外,这可以让您能够使用一致性哈希来避免并行处理某些消息。
至于工作人员 - 在一个项目中我使用了第一种方式,但对工作人员使用了固定调度程序 - 所以他们总是有相同的线程来处理消息并且不会影响其他系统部分。如果你有一个相当恒定的负载,这很好。
实际上,第二个选项 (ReceiveAsync
) 是解决您的问题的完全有效的解决方案。唯一的风险是,在这种情况下,您会减慢发送者的速度,因为 actor 现在将异步等待 HTTP 请求完成。这意味着 actor 本身可能会不堪重负,如果高速率的消息将继续不断地推送给它。
如果是这种情况,您可以:
- 增加消费者(HTTP 连接另一端的侦听器)数量以跟上步伐。
- 使用 Akka 流而不是演员为您的问题建模。 Streams 内置了对背压的支持,可以向上游应用背压,直到它到达请求跟踪的原始源。
我们有一个相当高吞吐量的 actor 系统,它通过 http 对外部系统进行异步调用。我们发现下游系统由于收到来自我们的呼叫数量而不堪重负。
对下游系统的调用是使用此处描述的 "pipe to" 模式进行的:https://petabridge.com/blog/akkadotnet-async-actors-using-pipeto/
之所以对下游系统进行如此多的调用是因为参与者在处理其邮箱中的下一条消息之前不等待异步调用的响应返回(它已完成消息当异步调用开始时)。显然这是设计使然,但在这些情况下,它会导致对外部服务进行大量异步调用。
我们需要一种方法来限制正在进行的调用。我可以想到几个可能的解决方案。
通过等待任务完成同步执行对外部服务的调用。为参与者设置一个池路由器,这基本上是一种限制对该外部服务的调用量的方法。
使用 ReceiveAsync 方法而不是 Receive。这基本上与选项 1 完全相同。在我上面发布的 petabride 页面上,虽然它说了这个方法 - "just don’t do it" :)
在进行异步调用之前,开始存储所有传入消息,然后在异步任务完成后取消存储它们。显然使用这种方法吞吐量要有限得多。
我想知道是否有人在使用 akka 时遇到过类似的问题并且能够解决它?
编辑:
所以最后只有选项 1 对我们有用。 IE。有一个带有 Receive() 的池路由器,专门等待它需要进行的 IO 调用(对外部系统的 api 调用)。这似乎工作得很好,我们可以通过设置池大小来控制 'throttling'。
我们尝试了选项 2 (ReceiveAsync),但我们发现在某些时候系统会逐渐停止并变得无响应而不会引发任何错误。我们怀疑它正在陷入僵局。这将是由于异步键工作的方式与简单地使用 .Wait() 或 .Result 等待任务的方式有关。我现在明白了为什么 Petabridge 建议不要使用 ReceiveAsync :)
我们没有尝试选项 3,因为这意味着更重大的变化。
至于我,我已经通过使用路由器创建子角色来解决这个问题,它一次只能处理一条消息。因此,您可以配置具有多个 worker 的外部系统负载。此外,这可以让您能够使用一致性哈希来避免并行处理某些消息。
至于工作人员 - 在一个项目中我使用了第一种方式,但对工作人员使用了固定调度程序 - 所以他们总是有相同的线程来处理消息并且不会影响其他系统部分。如果你有一个相当恒定的负载,这很好。
实际上,第二个选项 (ReceiveAsync
) 是解决您的问题的完全有效的解决方案。唯一的风险是,在这种情况下,您会减慢发送者的速度,因为 actor 现在将异步等待 HTTP 请求完成。这意味着 actor 本身可能会不堪重负,如果高速率的消息将继续不断地推送给它。
如果是这种情况,您可以:
- 增加消费者(HTTP 连接另一端的侦听器)数量以跟上步伐。
- 使用 Akka 流而不是演员为您的问题建模。 Streams 内置了对背压的支持,可以向上游应用背压,直到它到达请求跟踪的原始源。