如何重新排序 CompletableFutures 流?
How to reorder Stream of CompletableFutures?
我处理 CompletableFutures 流。这些需要不同的时间来完成。那些需要更长的块流处理而其他人可能已经完成(我知道并行流)
因此我想重新排序 Stream 中的项目(例如使用缓冲区)以提前完成 Futures。
例如,如果一个 getUser 调用花费很长时间,此代码将阻止流处理
public static Boolean isValid(User user) { ... }
emails.stream()
// not using ::
// getUser() returns CompletableFuture<User>
.map( e -> getUser(e))
// this line blocks Stream processing
.filter( userF -> isValid( userF.get()) )
.map( f -> f.thenApply(User::getName))
我想要类似的东西
emails.stream()
.map( e -> getUser(e))
// this moves Futures into a bounded buffer
// and puts those finished first
// like CompletionService [1]
// and returns a Stream again
.collect(FutureReorderer.collector())
// this is not the same Stream but
// the one created by FutureReorderer.collector()
.filter( userF -> isValid( userF.get()) )
.map( f -> f.thenApply(User::getName))
[1] 例如 CompletionService https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html returns 在调用 take() 时完成任务,否则阻塞。但是 CompletionService 不接受期货,是否需要做 cs.sumbit( () -> f.get() ) ?
我该怎么做?
[编辑]
- 更改示例以包含 filter()
- 已添加评论
- 添加了完成服务link
拥有更多上下文肯定有助于定制答案 - 我感觉问题出在其他地方,可以用更简单的方法解决。
但是,如果您的问题是如何在开始时以某种方式保留已完成的期货,那么选择很少:
使用自定义 Comparator
对 Stream
进行排序:
.sorted(Comparator.comparing(f -> !f.isDone()))
请记住 isDone
returns 不仅在未来成功完成时为真。
将期货存储在 PriorityQueue
PriorityQueue<CompletableFuture<String>> queue
= new PriorityQueue<>(Comparator.comparing(f -> !f.isDone()));
轮询元素时,队列将根据元素提供的顺序返回元素。
这是实际操作:
PriorityQueue<CompletableFuture<String>> queue
= new PriorityQueue<>(Comparator.comparing(f -> !f.isDone()));
queue.add(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) { }
return "42";
}));
queue.add(CompletableFuture.completedFuture("completed"));
queue.poll(); // "completed"
queue.poll(); // still going on
重要的是要记住,如果您确实想将 PriorityQueue
转换为 Stream
,您不能仅使用 stream()
来完成此操作 - 这不会保留优先顺序。
这是正确的方法:
Stream.generate(queue::poll).limit(queue.size())
我假设 OP 中的要求是并发执行 getUser
并按完成顺序处理结果 Futures。这是 ExecutorCompletionService
的解决方案:
final CompletionService<User> ecs = new ExecutorCompletionService<>(executor);
emails.stream().map(e -> ecs.submit(() -> getUser(e).get()))
.collect(Collectors.collectingAndThen(Collectors.toList(), fs -> fs.stream())) // collect the future list for concurrent execution
.map(f -> {
try {
return ecs.take().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.filter(u -> isValid(u)).map(User::getName)... //TODO;
或者:
final BlockingQueue<Future<User>> queue = new ArrayBlockingQueue<>(emails.size());
final CompletionService<User> ecs = new ExecutorCompletionService<>(executor, queue);
emails.stream().forEach(e -> ecs.submit(() -> getUser(e).get()));
IntStream.range(0, emails.size())
.mapToObj(i -> {
try {
return queue.poll().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.filter(u -> isValid(u)).map(User::getName);
简单但不直接。
我处理 CompletableFutures 流。这些需要不同的时间来完成。那些需要更长的块流处理而其他人可能已经完成(我知道并行流)
因此我想重新排序 Stream 中的项目(例如使用缓冲区)以提前完成 Futures。
例如,如果一个 getUser 调用花费很长时间,此代码将阻止流处理
public static Boolean isValid(User user) { ... }
emails.stream()
// not using ::
// getUser() returns CompletableFuture<User>
.map( e -> getUser(e))
// this line blocks Stream processing
.filter( userF -> isValid( userF.get()) )
.map( f -> f.thenApply(User::getName))
我想要类似的东西
emails.stream()
.map( e -> getUser(e))
// this moves Futures into a bounded buffer
// and puts those finished first
// like CompletionService [1]
// and returns a Stream again
.collect(FutureReorderer.collector())
// this is not the same Stream but
// the one created by FutureReorderer.collector()
.filter( userF -> isValid( userF.get()) )
.map( f -> f.thenApply(User::getName))
[1] 例如 CompletionService https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorCompletionService.html returns 在调用 take() 时完成任务,否则阻塞。但是 CompletionService 不接受期货,是否需要做 cs.sumbit( () -> f.get() ) ?
我该怎么做?
[编辑]
- 更改示例以包含 filter()
- 已添加评论
- 添加了完成服务link
拥有更多上下文肯定有助于定制答案 - 我感觉问题出在其他地方,可以用更简单的方法解决。
但是,如果您的问题是如何在开始时以某种方式保留已完成的期货,那么选择很少:
使用自定义 Comparator
对 Stream
进行排序:
.sorted(Comparator.comparing(f -> !f.isDone()))
请记住 isDone
returns 不仅在未来成功完成时为真。
将期货存储在 PriorityQueue
PriorityQueue<CompletableFuture<String>> queue
= new PriorityQueue<>(Comparator.comparing(f -> !f.isDone()));
轮询元素时,队列将根据元素提供的顺序返回元素。
这是实际操作:
PriorityQueue<CompletableFuture<String>> queue
= new PriorityQueue<>(Comparator.comparing(f -> !f.isDone()));
queue.add(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) { }
return "42";
}));
queue.add(CompletableFuture.completedFuture("completed"));
queue.poll(); // "completed"
queue.poll(); // still going on
重要的是要记住,如果您确实想将 PriorityQueue
转换为 Stream
,您不能仅使用 stream()
来完成此操作 - 这不会保留优先顺序。
这是正确的方法:
Stream.generate(queue::poll).limit(queue.size())
我假设 OP 中的要求是并发执行 getUser
并按完成顺序处理结果 Futures。这是 ExecutorCompletionService
的解决方案:
final CompletionService<User> ecs = new ExecutorCompletionService<>(executor);
emails.stream().map(e -> ecs.submit(() -> getUser(e).get()))
.collect(Collectors.collectingAndThen(Collectors.toList(), fs -> fs.stream())) // collect the future list for concurrent execution
.map(f -> {
try {
return ecs.take().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.filter(u -> isValid(u)).map(User::getName)... //TODO;
或者:
final BlockingQueue<Future<User>> queue = new ArrayBlockingQueue<>(emails.size());
final CompletionService<User> ecs = new ExecutorCompletionService<>(executor, queue);
emails.stream().forEach(e -> ecs.submit(() -> getUser(e).get()));
IntStream.range(0, emails.size())
.mapToObj(i -> {
try {
return queue.poll().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.filter(u -> isValid(u)).map(User::getName);
简单但不直接。