在过滤器函数中使用 CompletableFuture

Using CompletableFuture within Filter Function

我有一个用例,我想根据对元素执行的网络调用过滤掉列表中的几个元素。为此,我使用了流、过滤器和 Completable Future。目标是进行异步执行,以便操作变得高效。下面提到了伪代码。

public List<Integer> afterFilteringList(List<Integer> initialList){
   List<Integer> afterFilteringList =initialList.stream().filter(element -> {
        boolean valid = true;
        try{
            valid = makeNetworkCallAndCheck().get();
        } catch (Exception e) {

        }
        return valid;
    }).collect(Collectors.toList());

    return afterFilteringList;
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.completedFuture(resultOfNetWorkCall(value);
 }

我在这里遇到的问题是,我自己是否以异步方式执行此操作?(因为我在过滤器中使用 'get' 函数,它会阻止执行并使其仅按顺序执行)或者在 Java 8.

中使用 Completable Future 和 Filters 以异步方式执行此操作是否有更好的方法

如果使用get(),则不会是Async

get()如有必要,等待此未来完成,然后returns其结果。

如果你想异步处理所有的请求。您可以使用 CompletetableFuture.allOf()

public List<Integer> filterList(List<Integer> initialList){
    List<Integer> filteredList = Collections.synchronizedList(new ArrayList());
    AtomicInteger atomicInteger = new AtomicInteger(0);
    CompletableFuture[] completableFutures = new CompletableFuture[initialList.size()];
    initialList.forEach(x->{
        completableFutures[atomicInteger.getAndIncrement()] = CompletableFuture
            .runAsync(()->{
                if(makeNetworkCallAndCheck(x)){
                    filteredList.add(x);
                }
        });
    });

    CompletableFuture.allOf(completableFutures).join();
    return filteredList;
}

private Boolean makeNetworkCallAndCheck(Integer value){
    // TODO: write the logic;
    return true;
}

当你立即调用get时,你确实在破坏异步执行的好处。解决方案是在加入之前先收集所有异步作业。

public List<Integer> afterFilteringList(List<Integer> initialList){
    Map<Integer,CompletableFuture<Boolean>> jobs = initialList.stream()
        .collect(Collectors.toMap(Function.identity(), this::makeNetworkCallAndCheck));
    return initialList.stream()
        .filter(element -> jobs.get(element).join())
        .collect(Collectors.toList());
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
   return CompletableFuture.supplyAsync(() -> resultOfNetWorkCall(value));
}

当然,方法makeNetworkCallAndCheck也必须启动一个真正的异步操作。同步调用方法并返回 completedFuture 是不够的。我在这里提供了一个简单的示例性异步操作,但对于 I/O 操作,您可能希望提供自己的 Executor,根据您希望允许的同时连接数量身定制。

Collection.parallelStream() 是为集合执行异步操作的简单方法。您可以修改您的代码如下:

public List<Integer> afterFilteringList(List<Integer> initialList){
    List<Integer> afterFilteringList =initialList
            .parallelStream()
            .filter(this::makeNetworkCallAndCheck)
            .collect(Collectors.toList());

    return afterFilteringList;
}
public Boolean makeNetworkCallAndCheck(Integer value){
    return resultOfNetWorkCall(value);
}

您可以通过this way. And the result order is guaranteed according to 自定义您自己的执行器。

我写了下面的代码来验证我说的。

public class  DemoApplication {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(50);
        final List<Integer> integers = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            integers.add(i);
        }
        long before = System.currentTimeMillis();
        List<Integer> items = forkJoinPool.submit(() ->
                integers
                        .parallelStream()
                        .filter(it -> {
                            try {
                                Thread.sleep(10000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            return true;
                        })
                        .collect(Collectors.toList()))
                .get();
        long after = System.currentTimeMillis();
        System.out.println(after - before);
    }
}

我创建了自己的 ForkJoinPool,并行完成 50 个作业需要 10019 毫秒,尽管每个作业需要 10000 毫秒。