使用 Vertx HttpClient 和 RxJava 进行背压

Back pressure with Vertx HttpClient and RxJava

我在创建 "back pressure system" 时遇到问题。我使用 Vertx HttpClient 和 RxJava。 我需要向一个外部服务做 6000 个请求,为了避免 waitingForQueue 满,因为这个外部服务不能处理得像我发送的那么快,我在 request/response.

之间设置了一个延迟

由于此旅程是以批处理方式进行的,因此无需担心需要一分钟。

这是我的代码

return from(subGroups)
        .flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
        .delay(50, TimeUnit.MILLISECONDS)

此方法是从 Observable 间隔调用的,该间隔 运行 每 24 小时传递此子组列表 (6000)

但是在检查我的日志后,我看不到我的请求之间有 50 毫秒的延迟

这是我的 3 篇日志

{"@timestamp":"2016-11-30T10:32:48.973+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/comercial?category=T15EB&clientId=ERROR_NOT_SUPPLIED","requestHash":189630582,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.978+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EE&clientId=ERROR_NOT_SUPPLIED","requestHash":1296199359,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.981+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EG&clientId=ERROR_NOT_SUPPLIED","requestHash":228306365,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}

知道我需要做什么才能实现这个目标吗?

此致。

解决方案

我最终使用concatMap如果你有更好的解决方案请告诉我

return from(subGroups)
        .concatMap(subGroup -> Observable.just(subGroup).delay(50, TimeUnit.MILLISECONDS))
        .flatMap(subGroup -> getProductIdsForSubGroup(subGroup))

请注意,delay 只会延迟发射,所以这实际上是在浪费时间。

如果您可以使用最多 10 个并发请求/连接来查询远程系统,则可以使用 2 参数 flatMap:

return from(subGroups)
    .flatMap(subGroup -> getProductIdsForSubGroup(subGroup), 10);