将每个 return 列表的两个 CompletableFutures 链接在一起

Chain together two CompletableFutures that each return Lists

我很难弄清楚这个问题,可以向比我更有经验和知识的人寻求帮助。

基本问题是我需要获取对象列表,然后为每个返回的对象获取一些细节,并将细节拼接到对象中。我想对此有效率;我需要先获取 DataFiles 列表,但一旦我有了它,我就可以同时调用以获取它们的标签,然后等待所有 getTags 响应返回,然后再处理它们。

public class DataFile {
    // DataFileDao returns all DataFile properties, except Tags
    private List<Tags> tags;
    ...
}

我只是想不出如何使用 CompletableFutures 和流在功能上做到这一点。不过,这是我正在使用的基本代码,如果有人能帮助我完成任务,我将不胜感激:

public CompletableFuture<List<DataFile>> getDataFilesWithTags() {

    final CompletableFuture<List<DataFile>> dataFileFutures = this.dataFileDao.getDataFiles()
        .thenApply(HttpResponse::body).thenApply(this::toDataFileList);

    final CompletableFuture<List<List<Tag>>> tagFutures = dataFileFutures
        .thenCompose(dataFiles -> HttpUtils.allAsList(dataFiles.stream()
            .map(file -> this.tagDao.getLatestTagsForDataFile(file.getId())).collect(toList())));

    final CompletableFuture<List<DataFile>> filesWithTags = dataFileFutures.thenCombine(tagFutures,
        (files, tags) -> {
            for (int i = 0; i < files.size(); i++) {
                files.get(i).setTags(tags.get(i));
            }

            return files;
        });

    return fileWithTags;
}

/**
 * Transforms a generic {@link List} of {@link CompletableFuture}s to a {@link CompletableFuture} containing a
 * generic {@link List}.
 *
 * @param futures the {@code List} of {@code CompletableFuture}s to transform
 * @param         <T> the type of {@link CompletableFuture} to be applied to the {@link List}
 * @return a {@code CompletableFuture} containing a {@code List}
 * @throws NullPointerException if {@code futures} is null
 */
public static <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    Validate.notNull(futures, "futures cannot be null");
    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

必须有一种更简洁、更实用的方法来执行此操作,对吧?

我想做的事情的抽象表示:

public class ParentObject {

    // RETURNED BY ParentObjectDao.getAllParentObjects()
    private String id;

    // *NOT* RETURNED BY ParentObjectDao.getAllParentObjects()
    // MUST BE RETRIEVED BY MAKING SUPPLEMENTAL CALL TO ParentObjectDao.getParentsChildren(String parentObjectId)
    private List<ChildObject> details;
}

public class ChildObject {

    private String id;
    private String description;
}

public class ParentObjectDao {

    public CompletableFuture<List<ParentObject>> getAllParentObjects();

    public CompletableFuture<List<ChildObject>> getChildrenForParent(String parentObjectId);
}

public class Something {

    private final ParentObjectDao dao;

    public List<ParentObject> getParentObjectsWithChildren(){

        // PSEUDO-LOGIC
        final List<ParentObject> parentsWithChildren = dao.getAllParentObjects()
            .thenApply(List::stream)
            .thenCompose(parentObject -> dao.getChildrenForParent(parentObject.getId()))
            .thenApply(parentObject::setChildren)
            .collect(toList);

        return parentsWithChildren;
    }
}

你的代码并没有真正并行化。您一次只处理一个 CompletableFuture,将操作链接到它。所以如果你有1000个数据文件,它们仍然会被顺序处理。

此外,从设计和可读性的角度来看,CompletableFuture 的运行级别太低(您真的需要链接 thenApply(HttpResponse::body).thenApply(this::toDataFileList) 不能封装转换吗?正确并让 CompletableFuture 只代表一种方法?)

使用你的伪代码,像这样的东西怎么样:

CompletableFuture<List<ParentObject>> populateAsync(List<ParentObject> parents) {

  //get the children of each parent in parallel, store the futures in a list
  List<CompletableFuture<ParentObject>> futures = 
    parents.stream() 
           .map(parent ->
                   parentObjectDao.getChildrenForParent(parent.getId())
                                  .thenApply(parent::setChildren))  //assuming setChildren returns the parent object
           .collect(Collectors.toList()); //we need this stream terminal operation to start all futures before we join on the first one

  //wait for all of them to finish and then put the result in a list
  return CompletableFuture.supplyAsync(() -> 
                                     futures.stream()
                                            .map(CompletableFuture::join)
                                            .collect(Collectors.toList());    
}

然后你就可以做这样的事情了:

CompletableFuture<List<ParentObject>> getAllParentObjects()
          .thenApply(this::populateAsync)

(我直接写到这里可能会有一些语法错误,但你应该明白了)。