CompletableFuture 异步执行多个数据库查询
CompletableFuture to execute multiple DB queries asynchronously
我想并行执行多个数据库查询并将结果存储在地图中。我正在尝试这样做,但是当我访问地图时地图没有完全填充。
我做错了什么吗?
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));
return instrumentsEdgesMap;
}
任何帮助将不胜感激,在此先感谢。
您必须等待期货完成,然后才能 return 结果。
试试
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x)))
.get(); // wait for completion of all three subtasks
return instrumentsEdgesMap;
}
在上面的方法中,supplyAsync
将由 ForkJoinPool 中的 Async
线程执行,但 thenApply
方法始终由调用线程执行。因此,您的查询将 运行 一个接一个地按顺序进行,这不是异步的
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
示例如下
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
return "SupplyAsync";
}).thenAccept(i->{
System.out.println(Thread.currentThread().getName()+"--"+i);
});
输出:
ForkJoinPool.commonPool-worker-3
main--SupplyAsync
因此,如果您希望您的进程是 Async
,那么首先使用 supplyAsync
触发所有三个数据库查询,并在 CompletableFuture
中捕获输出
CompletableFuture<Set<String>> first = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));
CompletableFuture<Set<String>> second = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));
CompletableFuture<Set<String>> third = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));
然后现在创建一个包含其中三个的流,然后将它们收集到 Map
Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
.forEach(entry->{
entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
});
我想并行执行多个数据库查询并将结果存储在地图中。我正在尝试这样做,但是当我访问地图时地图没有完全填充。
我做错了什么吗?
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x));
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp)).
thenApply(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x));
return instrumentsEdgesMap;
}
任何帮助将不胜感激,在此先感谢。
您必须等待期货完成,然后才能 return 结果。
试试
public Map<MapKeyEnums, Set<String>> doDBCalls(String phoneNumber, long timestamp) {
Map<MapKeyEnums, Set<String>> instrumentsEdgesMap = new EnumMap<>(MapKeyEnums.class);
CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.ABC, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.XYZ, x)),
CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp))
.thenAccept(x -> instrumentsEdgesMap.put(MapKeyEnums.DEF, x)))
.get(); // wait for completion of all three subtasks
return instrumentsEdgesMap;
}
在上面的方法中,supplyAsync
将由 ForkJoinPool 中的 Async
线程执行,但 thenApply
方法始终由调用线程执行。因此,您的查询将 运行 一个接一个地按顺序进行,这不是异步的
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task).
示例如下
CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
return "SupplyAsync";
}).thenAccept(i->{
System.out.println(Thread.currentThread().getName()+"--"+i);
});
输出:
ForkJoinPool.commonPool-worker-3
main--SupplyAsync
因此,如果您希望您的进程是 Async
,那么首先使用 supplyAsync
触发所有三个数据库查询,并在 CompletableFuture
CompletableFuture<Set<String>> first = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "ABC", timestamp));
CompletableFuture<Set<String>> second = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "XYZ", timestamp));
CompletableFuture<Set<String>> third = CompletableFuture.supplyAsync(() -> dbReadService.getCall(phoneNumber, PhoneNumber.class, "DEF", timestamp));
然后现在创建一个包含其中三个的流,然后将它们收集到 Map
Stream.of(new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.ABC, first),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.XYZ, second),
new AbstractMap.SimpleEntry<MapKeyEnums, CompletableFuture<Set<String>>>(MapKeyEnums.DEF, third))
.forEach(entry->{
entry.getValue().thenAccept(val-> instrumentsEdgesMap.put(entry.getKey(), val));
});