Vertx with Futures 中任意数量调用的顺序组合
Sequential composition for arbitrary number of calls in Vertx with Futures
我们在 vertx 中使用 Futures,例如:
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
asyncResult -> {
if (asyncResult.succeeded()) {
LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
}
else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
例如,我们处理 2 个调用的地方。
或者我有另一个片段,我可以在其中处理任意数量的方法:
List<Future> futures = new ArrayList<>();
conversation.getRequestList().forEach(req -> {
Future<Message<Object>> senderFuture = Future.future();
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());
// sent successfully. save the replyAddress and the conversation for later/callback
log.info("Saving the conversation for the request.", conversation.getReplyAddress());
pendingCommands.put(req.getBody().getString(MSG_ID), conversation);
futures.add(senderFuture);
});
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture());
} else {
log.error("forwardToVWClient VW got result : {}", ar.cause());
handler.handle(Future.failedFuture(ar.cause()));
}
});
在这里,我们将 conversation.getRequestList()
中的所有请求链接在一起,但事先不知道它们的计数。
但是.all()
方法的缺点是,我们无法控制顺序。
如何将任意数量的方法与 Vertx Futures 链接起来(不知道确切的调用次数)?
编辑:
官方指南讲的是顺序组合,但给出的例子有3次调用。它没有解释如何针对任意数量的调用执行此操作。
参见 http://vertx.io/docs/vertx-core/java/
中的 "Sequential composition"
我希望这是清楚的。
如果您想将上一个请求的响应提供给下一个请求,并且假设您对每个响应都有不同的处理程序。您可以添加辅助方法
private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
Future<T> result = init;
for (Function<T, Future<T>> handler : handlers) {
result = result.compose(handler);
}
return result;
}
然后像这样更改代码
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
vehicleDoor(routingContext, client, vehicleJson, lock);
Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
// add here new request by using information from someJsonObj
LOG.info("Hello from trivial handler {} ", someJsonObj);
return Future.succeededFuture(someJsonObj);
};
List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();
handlers.add(vehicleResponseHandler);
handlers.add(anotherTrivialHandler);
chain(fetchVehicle, handlers).setHandler( asyncResult -> {
if (asyncResult.succeeded()) {
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
} else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
但是这个实现有一个限制,它要求每个链接的 Future
必须有相同的类型参数 T
。
这是一个使用 map & reduce
的解决方案,它以有序的方式执行方法,returns 以 Future<String>
的形式累积结果
public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
return list.stream().reduce(Future.succeededFuture(),// the initial "future"
(acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
(a,b) -> Future.future()); // not used! only useful for parallel stream.
}
可以在下面的例子中使用:
chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);
其中 sendApiRequestViaBus
是:
/**
* @param request The request to process
* @return The result of the request processing.
*/
Future<String> sendApiRequestViaBus(ApiRequest request) {
Future<String> future = Future.future();
String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
log.debug("Chain call start msgId {}", request.getId());
vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
log.debug("Chain call returns {}", request.getId());
if (res.succeeded()) {
future.complete("OK");
} else {
future.fail("KO");
}
});
return future;
}
希望对您有所帮助。
这里有一些方便的东西。希望能帮助到你。
public static <R> Future<List<R>> allOfFutures(List<Future<R>> futures) {
return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()]))
.map(v -> futures.stream()
.map(Future::result)
.collect(Collectors.toList())
);
}
我们在 vertx 中使用 Futures,例如:
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
asyncResult -> {
if (asyncResult.succeeded()) {
LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
}
else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
例如,我们处理 2 个调用的地方。
或者我有另一个片段,我可以在其中处理任意数量的方法:
List<Future> futures = new ArrayList<>();
conversation.getRequestList().forEach(req -> {
Future<Message<Object>> senderFuture = Future.future();
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());
// sent successfully. save the replyAddress and the conversation for later/callback
log.info("Saving the conversation for the request.", conversation.getReplyAddress());
pendingCommands.put(req.getBody().getString(MSG_ID), conversation);
futures.add(senderFuture);
});
CompositeFuture.all(futures).setHandler(ar -> {
if (ar.succeeded()) {
handler.handle(Future.succeededFuture());
} else {
log.error("forwardToVWClient VW got result : {}", ar.cause());
handler.handle(Future.failedFuture(ar.cause()));
}
});
在这里,我们将 conversation.getRequestList()
中的所有请求链接在一起,但事先不知道它们的计数。
但是.all()
方法的缺点是,我们无法控制顺序。
如何将任意数量的方法与 Vertx Futures 链接起来(不知道确切的调用次数)?
编辑:
官方指南讲的是顺序组合,但给出的例子有3次调用。它没有解释如何针对任意数量的调用执行此操作。
参见 http://vertx.io/docs/vertx-core/java/
中的 "Sequential composition"我希望这是清楚的。
如果您想将上一个请求的响应提供给下一个请求,并且假设您对每个响应都有不同的处理程序。您可以添加辅助方法
private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
Future<T> result = init;
for (Function<T, Future<T>> handler : handlers) {
result = result.compose(handler);
}
return result;
}
然后像这样更改代码
Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);
Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
vehicleDoor(routingContext, client, vehicleJson, lock);
Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
// add here new request by using information from someJsonObj
LOG.info("Hello from trivial handler {} ", someJsonObj);
return Future.succeededFuture(someJsonObj);
};
List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();
handlers.add(vehicleResponseHandler);
handlers.add(anotherTrivialHandler);
chain(fetchVehicle, handlers).setHandler( asyncResult -> {
if (asyncResult.succeeded()) {
handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
} else {
handler.handle(Future.failedFuture(asyncResult.cause()));
}
});
但是这个实现有一个限制,它要求每个链接的 Future
必须有相同的类型参数 T
。
这是一个使用 map & reduce
的解决方案,它以有序的方式执行方法,returns 以 Future<String>
public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
return list.stream().reduce(Future.succeededFuture(),// the initial "future"
(acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
(a,b) -> Future.future()); // not used! only useful for parallel stream.
}
可以在下面的例子中使用:
chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);
其中 sendApiRequestViaBus
是:
/**
* @param request The request to process
* @return The result of the request processing.
*/
Future<String> sendApiRequestViaBus(ApiRequest request) {
Future<String> future = Future.future();
String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
log.debug("Chain call start msgId {}", request.getId());
vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
log.debug("Chain call returns {}", request.getId());
if (res.succeeded()) {
future.complete("OK");
} else {
future.fail("KO");
}
});
return future;
}
希望对您有所帮助。
这里有一些方便的东西。希望能帮助到你。
public static <R> Future<List<R>> allOfFutures(List<Future<R>> futures) {
return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()]))
.map(v -> futures.stream()
.map(Future::result)
.collect(Collectors.toList())
);
}