将每个 return 列表的两个 CompletableFutures 链接在一起
Chain together two CompletableFutures that each return Lists
我很难弄清楚这个问题,可以向比我更有经验和知识的人寻求帮助。
基本问题是我需要获取对象列表,然后为每个返回的对象获取一些细节,并将细节拼接到对象中。我想对此有效率;我需要先获取 DataFiles 列表,但一旦我有了它,我就可以同时调用以获取它们的标签,然后等待所有 getTags 响应返回,然后再处理它们。
public class DataFile {
// DataFileDao returns all DataFile properties, except Tags
private List<Tags> tags;
...
}
我只是想不出如何使用 CompletableFutures 和流在功能上做到这一点。不过,这是我正在使用的基本代码,如果有人能帮助我完成任务,我将不胜感激:
public CompletableFuture<List<DataFile>> getDataFilesWithTags() {
final CompletableFuture<List<DataFile>> dataFileFutures = this.dataFileDao.getDataFiles()
.thenApply(HttpResponse::body).thenApply(this::toDataFileList);
final CompletableFuture<List<List<Tag>>> tagFutures = dataFileFutures
.thenCompose(dataFiles -> HttpUtils.allAsList(dataFiles.stream()
.map(file -> this.tagDao.getLatestTagsForDataFile(file.getId())).collect(toList())));
final CompletableFuture<List<DataFile>> filesWithTags = dataFileFutures.thenCombine(tagFutures,
(files, tags) -> {
for (int i = 0; i < files.size(); i++) {
files.get(i).setTags(tags.get(i));
}
return files;
});
return fileWithTags;
}
/**
* Transforms a generic {@link List} of {@link CompletableFuture}s to a {@link CompletableFuture} containing a
* generic {@link List}.
*
* @param futures the {@code List} of {@code CompletableFuture}s to transform
* @param <T> the type of {@link CompletableFuture} to be applied to the {@link List}
* @return a {@code CompletableFuture} containing a {@code List}
* @throws NullPointerException if {@code futures} is null
*/
public static <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
Validate.notNull(futures, "futures cannot be null");
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
必须有一种更简洁、更实用的方法来执行此操作,对吧?
我想做的事情的抽象表示:
public class ParentObject {
// RETURNED BY ParentObjectDao.getAllParentObjects()
private String id;
// *NOT* RETURNED BY ParentObjectDao.getAllParentObjects()
// MUST BE RETRIEVED BY MAKING SUPPLEMENTAL CALL TO ParentObjectDao.getParentsChildren(String parentObjectId)
private List<ChildObject> details;
}
public class ChildObject {
private String id;
private String description;
}
public class ParentObjectDao {
public CompletableFuture<List<ParentObject>> getAllParentObjects();
public CompletableFuture<List<ChildObject>> getChildrenForParent(String parentObjectId);
}
public class Something {
private final ParentObjectDao dao;
public List<ParentObject> getParentObjectsWithChildren(){
// PSEUDO-LOGIC
final List<ParentObject> parentsWithChildren = dao.getAllParentObjects()
.thenApply(List::stream)
.thenCompose(parentObject -> dao.getChildrenForParent(parentObject.getId()))
.thenApply(parentObject::setChildren)
.collect(toList);
return parentsWithChildren;
}
}
你的代码并没有真正并行化。您一次只处理一个 CompletableFuture
,将操作链接到它。所以如果你有1000个数据文件,它们仍然会被顺序处理。
此外,从设计和可读性的角度来看,CompletableFuture
的运行级别太低(您真的需要链接 thenApply(HttpResponse::body).thenApply(this::toDataFileList)
不能封装转换吗?正确并让 CompletableFuture
只代表一种方法?)
使用你的伪代码,像这样的东西怎么样:
CompletableFuture<List<ParentObject>> populateAsync(List<ParentObject> parents) {
//get the children of each parent in parallel, store the futures in a list
List<CompletableFuture<ParentObject>> futures =
parents.stream()
.map(parent ->
parentObjectDao.getChildrenForParent(parent.getId())
.thenApply(parent::setChildren)) //assuming setChildren returns the parent object
.collect(Collectors.toList()); //we need this stream terminal operation to start all futures before we join on the first one
//wait for all of them to finish and then put the result in a list
return CompletableFuture.supplyAsync(() ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
然后你就可以做这样的事情了:
CompletableFuture<List<ParentObject>> getAllParentObjects()
.thenApply(this::populateAsync)
(我直接写到这里可能会有一些语法错误,但你应该明白了)。
我很难弄清楚这个问题,可以向比我更有经验和知识的人寻求帮助。
基本问题是我需要获取对象列表,然后为每个返回的对象获取一些细节,并将细节拼接到对象中。我想对此有效率;我需要先获取 DataFiles 列表,但一旦我有了它,我就可以同时调用以获取它们的标签,然后等待所有 getTags 响应返回,然后再处理它们。
public class DataFile {
// DataFileDao returns all DataFile properties, except Tags
private List<Tags> tags;
...
}
我只是想不出如何使用 CompletableFutures 和流在功能上做到这一点。不过,这是我正在使用的基本代码,如果有人能帮助我完成任务,我将不胜感激:
public CompletableFuture<List<DataFile>> getDataFilesWithTags() {
final CompletableFuture<List<DataFile>> dataFileFutures = this.dataFileDao.getDataFiles()
.thenApply(HttpResponse::body).thenApply(this::toDataFileList);
final CompletableFuture<List<List<Tag>>> tagFutures = dataFileFutures
.thenCompose(dataFiles -> HttpUtils.allAsList(dataFiles.stream()
.map(file -> this.tagDao.getLatestTagsForDataFile(file.getId())).collect(toList())));
final CompletableFuture<List<DataFile>> filesWithTags = dataFileFutures.thenCombine(tagFutures,
(files, tags) -> {
for (int i = 0; i < files.size(); i++) {
files.get(i).setTags(tags.get(i));
}
return files;
});
return fileWithTags;
}
/**
* Transforms a generic {@link List} of {@link CompletableFuture}s to a {@link CompletableFuture} containing a
* generic {@link List}.
*
* @param futures the {@code List} of {@code CompletableFuture}s to transform
* @param <T> the type of {@link CompletableFuture} to be applied to the {@link List}
* @return a {@code CompletableFuture} containing a {@code List}
* @throws NullPointerException if {@code futures} is null
*/
public static <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
Validate.notNull(futures, "futures cannot be null");
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
必须有一种更简洁、更实用的方法来执行此操作,对吧?
我想做的事情的抽象表示:
public class ParentObject {
// RETURNED BY ParentObjectDao.getAllParentObjects()
private String id;
// *NOT* RETURNED BY ParentObjectDao.getAllParentObjects()
// MUST BE RETRIEVED BY MAKING SUPPLEMENTAL CALL TO ParentObjectDao.getParentsChildren(String parentObjectId)
private List<ChildObject> details;
}
public class ChildObject {
private String id;
private String description;
}
public class ParentObjectDao {
public CompletableFuture<List<ParentObject>> getAllParentObjects();
public CompletableFuture<List<ChildObject>> getChildrenForParent(String parentObjectId);
}
public class Something {
private final ParentObjectDao dao;
public List<ParentObject> getParentObjectsWithChildren(){
// PSEUDO-LOGIC
final List<ParentObject> parentsWithChildren = dao.getAllParentObjects()
.thenApply(List::stream)
.thenCompose(parentObject -> dao.getChildrenForParent(parentObject.getId()))
.thenApply(parentObject::setChildren)
.collect(toList);
return parentsWithChildren;
}
}
你的代码并没有真正并行化。您一次只处理一个 CompletableFuture
,将操作链接到它。所以如果你有1000个数据文件,它们仍然会被顺序处理。
此外,从设计和可读性的角度来看,CompletableFuture
的运行级别太低(您真的需要链接 thenApply(HttpResponse::body).thenApply(this::toDataFileList)
不能封装转换吗?正确并让 CompletableFuture
只代表一种方法?)
使用你的伪代码,像这样的东西怎么样:
CompletableFuture<List<ParentObject>> populateAsync(List<ParentObject> parents) {
//get the children of each parent in parallel, store the futures in a list
List<CompletableFuture<ParentObject>> futures =
parents.stream()
.map(parent ->
parentObjectDao.getChildrenForParent(parent.getId())
.thenApply(parent::setChildren)) //assuming setChildren returns the parent object
.collect(Collectors.toList()); //we need this stream terminal operation to start all futures before we join on the first one
//wait for all of them to finish and then put the result in a list
return CompletableFuture.supplyAsync(() ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
然后你就可以做这样的事情了:
CompletableFuture<List<ParentObject>> getAllParentObjects()
.thenApply(this::populateAsync)
(我直接写到这里可能会有一些语法错误,但你应该明白了)。