在过滤器函数中使用 CompletableFuture
Using CompletableFuture within Filter Function
我有一个用例,我想根据对元素执行的网络调用过滤掉列表中的几个元素。为此,我使用了流、过滤器和 Completable Future。目标是进行异步执行,以便操作变得高效。下面提到了伪代码。
public List<Integer> afterFilteringList(List<Integer> initialList){
List<Integer> afterFilteringList =initialList.stream().filter(element -> {
boolean valid = true;
try{
valid = makeNetworkCallAndCheck().get();
} catch (Exception e) {
}
return valid;
}).collect(Collectors.toList());
return afterFilteringList;
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
return CompletableFuture.completedFuture(resultOfNetWorkCall(value);
}
我在这里遇到的问题是,我自己是否以异步方式执行此操作?(因为我在过滤器中使用 'get' 函数,它会阻止执行并使其仅按顺序执行)或者在 Java 8.
中使用 Completable Future 和 Filters 以异步方式执行此操作是否有更好的方法
如果使用get()
,则不会是Async
get()
:如有必要,等待此未来完成,然后returns其结果。
如果你想异步处理所有的请求。您可以使用 CompletetableFuture.allOf()
public List<Integer> filterList(List<Integer> initialList){
List<Integer> filteredList = Collections.synchronizedList(new ArrayList());
AtomicInteger atomicInteger = new AtomicInteger(0);
CompletableFuture[] completableFutures = new CompletableFuture[initialList.size()];
initialList.forEach(x->{
completableFutures[atomicInteger.getAndIncrement()] = CompletableFuture
.runAsync(()->{
if(makeNetworkCallAndCheck(x)){
filteredList.add(x);
}
});
});
CompletableFuture.allOf(completableFutures).join();
return filteredList;
}
private Boolean makeNetworkCallAndCheck(Integer value){
// TODO: write the logic;
return true;
}
当你立即调用get
时,你确实在破坏异步执行的好处。解决方案是在加入之前先收集所有异步作业。
public List<Integer> afterFilteringList(List<Integer> initialList){
Map<Integer,CompletableFuture<Boolean>> jobs = initialList.stream()
.collect(Collectors.toMap(Function.identity(), this::makeNetworkCallAndCheck));
return initialList.stream()
.filter(element -> jobs.get(element).join())
.collect(Collectors.toList());
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
return CompletableFuture.supplyAsync(() -> resultOfNetWorkCall(value));
}
当然,方法makeNetworkCallAndCheck
也必须启动一个真正的异步操作。同步调用方法并返回 completedFuture
是不够的。我在这里提供了一个简单的示例性异步操作,但对于 I/O 操作,您可能希望提供自己的 Executor
,根据您希望允许的同时连接数量身定制。
Collection.parallelStream()
是为集合执行异步操作的简单方法。您可以修改您的代码如下:
public List<Integer> afterFilteringList(List<Integer> initialList){
List<Integer> afterFilteringList =initialList
.parallelStream()
.filter(this::makeNetworkCallAndCheck)
.collect(Collectors.toList());
return afterFilteringList;
}
public Boolean makeNetworkCallAndCheck(Integer value){
return resultOfNetWorkCall(value);
}
您可以通过this way. And the result order is guaranteed according to 自定义您自己的执行器。
我写了下面的代码来验证我说的。
public class DemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(50);
final List<Integer> integers = new ArrayList<>();
for (int i = 0; i < 50; i++) {
integers.add(i);
}
long before = System.currentTimeMillis();
List<Integer> items = forkJoinPool.submit(() ->
integers
.parallelStream()
.filter(it -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
})
.collect(Collectors.toList()))
.get();
long after = System.currentTimeMillis();
System.out.println(after - before);
}
}
我创建了自己的 ForkJoinPool
,并行完成 50 个作业需要 10019 毫秒,尽管每个作业需要 10000 毫秒。
我有一个用例,我想根据对元素执行的网络调用过滤掉列表中的几个元素。为此,我使用了流、过滤器和 Completable Future。目标是进行异步执行,以便操作变得高效。下面提到了伪代码。
public List<Integer> afterFilteringList(List<Integer> initialList){
List<Integer> afterFilteringList =initialList.stream().filter(element -> {
boolean valid = true;
try{
valid = makeNetworkCallAndCheck().get();
} catch (Exception e) {
}
return valid;
}).collect(Collectors.toList());
return afterFilteringList;
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
return CompletableFuture.completedFuture(resultOfNetWorkCall(value);
}
我在这里遇到的问题是,我自己是否以异步方式执行此操作?(因为我在过滤器中使用 'get' 函数,它会阻止执行并使其仅按顺序执行)或者在 Java 8.
中使用 Completable Future 和 Filters 以异步方式执行此操作是否有更好的方法如果使用get()
,则不会是Async
get()
:如有必要,等待此未来完成,然后returns其结果。
如果你想异步处理所有的请求。您可以使用 CompletetableFuture.allOf()
public List<Integer> filterList(List<Integer> initialList){
List<Integer> filteredList = Collections.synchronizedList(new ArrayList());
AtomicInteger atomicInteger = new AtomicInteger(0);
CompletableFuture[] completableFutures = new CompletableFuture[initialList.size()];
initialList.forEach(x->{
completableFutures[atomicInteger.getAndIncrement()] = CompletableFuture
.runAsync(()->{
if(makeNetworkCallAndCheck(x)){
filteredList.add(x);
}
});
});
CompletableFuture.allOf(completableFutures).join();
return filteredList;
}
private Boolean makeNetworkCallAndCheck(Integer value){
// TODO: write the logic;
return true;
}
当你立即调用get
时,你确实在破坏异步执行的好处。解决方案是在加入之前先收集所有异步作业。
public List<Integer> afterFilteringList(List<Integer> initialList){
Map<Integer,CompletableFuture<Boolean>> jobs = initialList.stream()
.collect(Collectors.toMap(Function.identity(), this::makeNetworkCallAndCheck));
return initialList.stream()
.filter(element -> jobs.get(element).join())
.collect(Collectors.toList());
}
public CompletableFuture<Boolean> makeNetworkCallAndCheck(Integer value){
return CompletableFuture.supplyAsync(() -> resultOfNetWorkCall(value));
}
当然,方法makeNetworkCallAndCheck
也必须启动一个真正的异步操作。同步调用方法并返回 completedFuture
是不够的。我在这里提供了一个简单的示例性异步操作,但对于 I/O 操作,您可能希望提供自己的 Executor
,根据您希望允许的同时连接数量身定制。
Collection.parallelStream()
是为集合执行异步操作的简单方法。您可以修改您的代码如下:
public List<Integer> afterFilteringList(List<Integer> initialList){
List<Integer> afterFilteringList =initialList
.parallelStream()
.filter(this::makeNetworkCallAndCheck)
.collect(Collectors.toList());
return afterFilteringList;
}
public Boolean makeNetworkCallAndCheck(Integer value){
return resultOfNetWorkCall(value);
}
您可以通过this way. And the result order is guaranteed according to
我写了下面的代码来验证我说的。
public class DemoApplication {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(50);
final List<Integer> integers = new ArrayList<>();
for (int i = 0; i < 50; i++) {
integers.add(i);
}
long before = System.currentTimeMillis();
List<Integer> items = forkJoinPool.submit(() ->
integers
.parallelStream()
.filter(it -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
})
.collect(Collectors.toList()))
.get();
long after = System.currentTimeMillis();
System.out.println(after - before);
}
}
我创建了自己的 ForkJoinPool
,并行完成 50 个作业需要 10019 毫秒,尽管每个作业需要 10000 毫秒。