使用 Vertx HttpClient 和 RxJava 进行背压
Back pressure with Vertx HttpClient and RxJava
我在创建 "back pressure system" 时遇到问题。我使用 Vertx HttpClient 和 RxJava。
我需要向一个外部服务做 6000 个请求,为了避免 waitingForQueue 满,因为这个外部服务不能处理得像我发送的那么快,我在 request/response.
之间设置了一个延迟
由于此旅程是以批处理方式进行的,因此无需担心需要一分钟。
这是我的代码
return from(subGroups)
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
.delay(50, TimeUnit.MILLISECONDS)
此方法是从 Observable 间隔调用的,该间隔 运行 每 24 小时传递此子组列表 (6000)
但是在检查我的日志后,我看不到我的请求之间有 50 毫秒的延迟
这是我的 3 篇日志
{"@timestamp":"2016-11-30T10:32:48.973+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/comercial?category=T15EB&clientId=ERROR_NOT_SUPPLIED","requestHash":189630582,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.978+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EE&clientId=ERROR_NOT_SUPPLIED","requestHash":1296199359,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.981+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EG&clientId=ERROR_NOT_SUPPLIED","requestHash":228306365,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
知道我需要做什么才能实现这个目标吗?
此致。
解决方案
我最终使用concatMap
如果你有更好的解决方案请告诉我
return from(subGroups)
.concatMap(subGroup -> Observable.just(subGroup).delay(50, TimeUnit.MILLISECONDS))
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
请注意,delay
只会延迟发射,所以这实际上是在浪费时间。
如果您可以使用最多 10 个并发请求/连接来查询远程系统,则可以使用 2 参数 flatMap
:
return from(subGroups)
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup), 10);
我在创建 "back pressure system" 时遇到问题。我使用 Vertx HttpClient 和 RxJava。 我需要向一个外部服务做 6000 个请求,为了避免 waitingForQueue 满,因为这个外部服务不能处理得像我发送的那么快,我在 request/response.
之间设置了一个延迟由于此旅程是以批处理方式进行的,因此无需担心需要一分钟。
这是我的代码
return from(subGroups)
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
.delay(50, TimeUnit.MILLISECONDS)
此方法是从 Observable 间隔调用的,该间隔 运行 每 24 小时传递此子组列表 (6000)
但是在检查我的日志后,我看不到我的请求之间有 50 毫秒的延迟
这是我的 3 篇日志
{"@timestamp":"2016-11-30T10:32:48.973+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/comercial?category=T15EB&clientId=ERROR_NOT_SUPPLIED","requestHash":189630582,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.978+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EE&clientId=ERROR_NOT_SUPPLIED","requestHash":1296199359,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.981+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EG&clientId=ERROR_NOT_SUPPLIED","requestHash":228306365,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
知道我需要做什么才能实现这个目标吗?
此致。
解决方案
我最终使用concatMap
如果你有更好的解决方案请告诉我
return from(subGroups)
.concatMap(subGroup -> Observable.just(subGroup).delay(50, TimeUnit.MILLISECONDS))
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
请注意,delay
只会延迟发射,所以这实际上是在浪费时间。
如果您可以使用最多 10 个并发请求/连接来查询远程系统,则可以使用 2 参数 flatMap
:
return from(subGroups)
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup), 10);