异步填充 Java 地图并将其作为未来返回
Asynchronously populating a Java Map and returning it as a future
我有一个对象映射,创建起来很昂贵,所以我想创建对象并与我的应用程序中的其他进程并行填充映射。仅当主线程实际需要访问地图时,应用程序才应等待填充地图的异步任务完成。我怎样才能最优雅地做到这一点?
当前方法
目前,我可以使用 CompletableFuture.runAsync(Runnable, Executor)
在地图中异步创建每个单独的对象,类似于下面的示例代码,但我不确定如何构建 Future
/CompletableFuture
类型的机制,用于在准备就绪时返回 Map
本身:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, backgroundJobExecutor);
incrementingJobs.add(incrementingJob);
}
// TODO: This blocks until the training is done; Instead, return a
// future to the caller somehow
CompletableFuture.allOf(incrementingJobs.build().toArray(CompletableFuture[]::new)).join();
return result;
}
}
但是,对于上面的代码,当代码调用 AsynchronousTest.create(Map<String,Integer)
时,它已经阻塞,直到方法 returns 完全填充 ConcurrentMap<String,Integer>
;我怎样才能把它变成类似 Future<Map<String,Integer>>
的东西,以便我以后可以使用它?:
Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
...
// Do lots of other stuff
...
Map<String,Integer> completedModels = futureClassModels.get();
正如@Holger 在他的评论中指出的那样,您必须避免调用 .join()
并改为依赖 thenApply()
,例如像这样:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, backgroundJobExecutor);
incrementingJobs.add(incrementingJob);
}
// using thenApply instead of join here:
return CompletableFuture.allOf(
incrementingJobs.build().toArray(
CompletableFuture[]::new
)
).thenApply(x -> result);
}
}
我有一个对象映射,创建起来很昂贵,所以我想创建对象并与我的应用程序中的其他进程并行填充映射。仅当主线程实际需要访问地图时,应用程序才应等待填充地图的异步任务完成。我怎样才能最优雅地做到这一点?
当前方法
目前,我可以使用 CompletableFuture.runAsync(Runnable, Executor)
在地图中异步创建每个单独的对象,类似于下面的示例代码,但我不确定如何构建 Future
/CompletableFuture
类型的机制,用于在准备就绪时返回 Map
本身:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, backgroundJobExecutor);
incrementingJobs.add(incrementingJob);
}
// TODO: This blocks until the training is done; Instead, return a
// future to the caller somehow
CompletableFuture.allOf(incrementingJobs.build().toArray(CompletableFuture[]::new)).join();
return result;
}
}
但是,对于上面的代码,当代码调用 AsynchronousTest.create(Map<String,Integer)
时,它已经阻塞,直到方法 returns 完全填充 ConcurrentMap<String,Integer>
;我怎样才能把它变成类似 Future<Map<String,Integer>>
的东西,以便我以后可以使用它?:
Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
...
// Do lots of other stuff
...
Map<String,Integer> completedModels = futureClassModels.get();
正如@Holger 在他的评论中指出的那样,您必须避免调用 .join()
并改为依赖 thenApply()
,例如像这样:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, backgroundJobExecutor);
incrementingJobs.add(incrementingJob);
}
// using thenApply instead of join here:
return CompletableFuture.allOf(
incrementingJobs.build().toArray(
CompletableFuture[]::new
)
).thenApply(x -> result);
}
}