如何使用 CompletableFuture 并行化 web 服务请求?
How to parallelize webservice requests with CompletableFuture?
我有一个 servlet 请求,基本上请求输入日期给出的数据。因为我有多个日期,所以我必须发送多个请求,然后汇总结果。例如:
List<Result> results = new ArrayList<>();
for (LocalDate date : dates) {
ServletReq req = new ServletReq(date);
try {
ServletRsp rsp = webservice.send(req);
results.addAll(rsp.getResults());
} catch (SpecificException e) {
//just ignore this result and continue
}
}
问题:如何并行化上面的代码?意思是:发送多个ServletReq
异步,并将结果收集到列表中。等待所有请求完成(可能超时),并忽略 SpecificException
.
我是这样开始的,但是我也不知道这个方向对不对,也没有把上面的代码完全转过来。特别是关于要忽略的异常。
ExecutorService service = Executors.newCachedThreadPool();
List<CompletableFuture<ServletRsp>> futures = new ArrayList<>();
for (LocalDate date : dates) {
ServletReq req = new ServletReq(date);
CompletableFuture future = CompletableFuture.supplyAsync(() -> webservice.send(req), service);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
到目前为止,但是:如何在异步结果上调用 rsp.getResults()
,并将所有内容放入 list
。以及如何在异步执行期间忽略 SpecificException
? (我无法修改 webservice.send()
方法!)。
我认为你在这方面做得很好。
问题是,除了自己收集外,没有任何机制可以很好地收集结果:
ExecutorService service = Executors.newCachedThreadPool();
List<CompletableFuture<Void>> futures = new ArrayList<>(); // these are only references to tell you when the request finishes
Queue<ServletRsp> results = new ConcurrentLinkedQueue<>(); // this has to be thread-safe
for (LocalDate date : dates) {
ServletReq req = new ServletReq(date);
CompletableFuture future = CompletableFuture
.supplyAsync(() -> webservice.send(req), service)
.thenAcceptAsync(results::add);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
// do stuff with results
我尽量保留您编写的大部分代码。也许流更干净一些:
List<CompletableFuture<Void>> collect = dates
.map(date -> CompletableFuture
.supplyAsync(() -> webservice.send(new ServletReq(date)), service)
.thenAcceptAsync(results::add))
.collect(Collectors.toList());
// wait for all requests to finish
CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()])).thenAcceptAsync(ignored -> {
//you can also handle the response async.
});
- 在供应商中捕获它们,return 例如
null
。只有在你真的什么都不做的情况下才这样做。要在 future.get()
处获得结果,您必须处理 null
和 ExecutionException
s.
例如
CompletableFuture<ServletRsp> future = CompletableFuture.supplyAsync(() -> {
try {
return webservice.send(new ServletReq(date));
} catch (SpecificException e) {
return null;
}
});
- 将它们作为(自定义?)
RuntimeException
重新抛出,这样您就不会丢失它们。现在你最后只处理异常,但有些是双重包装的。
- 手动完成未来。
例如
CompletableFuture<ServletRsp> future = new CompletableFuture<>();
service.execute(() -> {
try {
future.complete(webservice.send(new ServletReq(date));
} catch (SpecificException e) {
future.completeExceptionally(e);
}
});
futures.add(future);
除了 ExecutionException
之外不再换行。 CompletableFuture.supplyAsync
正是这样做的,但没有代码来处理已检查的异常。
- 只需使用接受抛出代码的旧
ExecutorService#submit(Callable<T> callable)
方法:
例如
List<Callable<String>> tasks = dates.stream()
.map(d -> (Callable<ServletRsp>) () -> send(new ServletReq(d)))
.collect(Collectors.toList());
List<Future<ServletRsp>> completed = service.invokeAll(tasks);
我有一个 servlet 请求,基本上请求输入日期给出的数据。因为我有多个日期,所以我必须发送多个请求,然后汇总结果。例如:
List<Result> results = new ArrayList<>();
for (LocalDate date : dates) {
ServletReq req = new ServletReq(date);
try {
ServletRsp rsp = webservice.send(req);
results.addAll(rsp.getResults());
} catch (SpecificException e) {
//just ignore this result and continue
}
}
问题:如何并行化上面的代码?意思是:发送多个ServletReq
异步,并将结果收集到列表中。等待所有请求完成(可能超时),并忽略 SpecificException
.
我是这样开始的,但是我也不知道这个方向对不对,也没有把上面的代码完全转过来。特别是关于要忽略的异常。
ExecutorService service = Executors.newCachedThreadPool();
List<CompletableFuture<ServletRsp>> futures = new ArrayList<>();
for (LocalDate date : dates) {
ServletReq req = new ServletReq(date);
CompletableFuture future = CompletableFuture.supplyAsync(() -> webservice.send(req), service);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
到目前为止,但是:如何在异步结果上调用 rsp.getResults()
,并将所有内容放入 list
。以及如何在异步执行期间忽略 SpecificException
? (我无法修改 webservice.send()
方法!)。
我认为你在这方面做得很好。
问题是,除了自己收集外,没有任何机制可以很好地收集结果:
ExecutorService service = Executors.newCachedThreadPool();
List<CompletableFuture<Void>> futures = new ArrayList<>(); // these are only references to tell you when the request finishes
Queue<ServletRsp> results = new ConcurrentLinkedQueue<>(); // this has to be thread-safe
for (LocalDate date : dates) {
ServletReq req = new ServletReq(date);
CompletableFuture future = CompletableFuture
.supplyAsync(() -> webservice.send(req), service)
.thenAcceptAsync(results::add);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
// do stuff with results
我尽量保留您编写的大部分代码。也许流更干净一些:
List<CompletableFuture<Void>> collect = dates
.map(date -> CompletableFuture
.supplyAsync(() -> webservice.send(new ServletReq(date)), service)
.thenAcceptAsync(results::add))
.collect(Collectors.toList());
// wait for all requests to finish
CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()])).thenAcceptAsync(ignored -> {
//you can also handle the response async.
});
- 在供应商中捕获它们,return 例如
null
。只有在你真的什么都不做的情况下才这样做。要在future.get()
处获得结果,您必须处理null
和ExecutionException
s.
例如
CompletableFuture<ServletRsp> future = CompletableFuture.supplyAsync(() -> {
try {
return webservice.send(new ServletReq(date));
} catch (SpecificException e) {
return null;
}
});
- 将它们作为(自定义?)
RuntimeException
重新抛出,这样您就不会丢失它们。现在你最后只处理异常,但有些是双重包装的。 - 手动完成未来。
例如
CompletableFuture<ServletRsp> future = new CompletableFuture<>();
service.execute(() -> {
try {
future.complete(webservice.send(new ServletReq(date));
} catch (SpecificException e) {
future.completeExceptionally(e);
}
});
futures.add(future);
除了 ExecutionException
之外不再换行。 CompletableFuture.supplyAsync
正是这样做的,但没有代码来处理已检查的异常。
- 只需使用接受抛出代码的旧
ExecutorService#submit(Callable<T> callable)
方法:
例如
List<Callable<String>> tasks = dates.stream()
.map(d -> (Callable<ServletRsp>) () -> send(new ServletReq(d)))
.collect(Collectors.toList());
List<Future<ServletRsp>> completed = service.invokeAll(tasks);