使用并行流来 return 最快提供的值
Using parallel stream to return fastest supplied value
我有一组供应商,它们都提供相同的结果,但速度不同(且不同)。
我想要一种同时启动供应商的优雅方式,一旦其中一个供应商产生了价值,return 它(丢弃其他结果)。
我已经尝试使用并行流和 Stream.findAny()
为此,但它似乎总是阻塞,直到生成所有结果。
这是一个单元测试来证明我的问题:
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.Assert.*;
public class RaceTest {
@Test
public void testRace() {
// Set up suppliers
Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
try {
Thread.sleep(10_000);
return "slow";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}); // This supplier takes 10 seconds to produce a value
Stream<Supplier<String>> stream = suppliers.parallelStream();
assertTrue(stream.isParallel()); // Stream can work in parallel
long start = System.currentTimeMillis();
Optional<String> winner = stream
.map(Supplier::get)
.findAny();
long duration = System.currentTimeMillis() - start;
assertTrue(winner.isPresent()); // Some value was produced
assertEquals("fast", winner.get()); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
}
}
测试的结果是最后一个断言失败,因为整个测试大约需要 10 秒才能完成。
我做错了什么?
Stream API 不适合这样的事情,因为它不能保证任务何时完成。更好的解决方案是使用 CompletableFuture
:
long start = System.currentTimeMillis();
String winner = CompletableFuture
.anyOf(suppliers.stream().map(CompletableFuture::supplyAsync)
.toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
请注意,如果普通FJP 的并行度不够,仍然可能无法并行启动所有供应商。要解决此问题,您可以创建自己的具有所需并行级别的池:
long start = System.currentTimeMillis();
ForkJoinPool fjp = new ForkJoinPool(suppliers.size());
String winner = CompletableFuture
.anyOf(suppliers.stream().map(s -> CompletableFuture.supplyAsync(s, fjp))
.toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
fjp.shutdownNow();
您当前使用的代码是不确定的。引用 findAny()
的 Javadoc:
The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream.
您可以使用 CompletionService
and submit all the tasks to it. Then, CompletionService.take()
将 return 第一个完成的任务 Future
。
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
在这种情况下,您最好使用 Callable
而不是 Supplier
(相同的功能签名)并使用自 Java 以来存在的良好的旧并发性 API 5:
Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
Thread.sleep(10_000);
return "slow";
}
);
ExecutorService es=Executors.newCachedThreadPool();
try {
String result = es.invokeAny(suppliers);
System.out.println(result);
} catch (InterruptedException|ExecutionException ex) {
Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();
请注意,整个“运行 all and return the fastest”如何变成单个方法调用...
它还有取消/中断所有未决操作的好处,只要有一个结果可用,所以缓慢的操作实际上不会在这里等待整整十秒(好吧,在大多数情况下,因为时间不是确定性的)。
我有一组供应商,它们都提供相同的结果,但速度不同(且不同)。
我想要一种同时启动供应商的优雅方式,一旦其中一个供应商产生了价值,return 它(丢弃其他结果)。
我已经尝试使用并行流和 Stream.findAny()
为此,但它似乎总是阻塞,直到生成所有结果。
这是一个单元测试来证明我的问题:
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.Assert.*;
public class RaceTest {
@Test
public void testRace() {
// Set up suppliers
Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
try {
Thread.sleep(10_000);
return "slow";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}); // This supplier takes 10 seconds to produce a value
Stream<Supplier<String>> stream = suppliers.parallelStream();
assertTrue(stream.isParallel()); // Stream can work in parallel
long start = System.currentTimeMillis();
Optional<String> winner = stream
.map(Supplier::get)
.findAny();
long duration = System.currentTimeMillis() - start;
assertTrue(winner.isPresent()); // Some value was produced
assertEquals("fast", winner.get()); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
}
}
测试的结果是最后一个断言失败,因为整个测试大约需要 10 秒才能完成。
我做错了什么?
Stream API 不适合这样的事情,因为它不能保证任务何时完成。更好的解决方案是使用 CompletableFuture
:
long start = System.currentTimeMillis();
String winner = CompletableFuture
.anyOf(suppliers.stream().map(CompletableFuture::supplyAsync)
.toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
请注意,如果普通FJP 的并行度不够,仍然可能无法并行启动所有供应商。要解决此问题,您可以创建自己的具有所需并行级别的池:
long start = System.currentTimeMillis();
ForkJoinPool fjp = new ForkJoinPool(suppliers.size());
String winner = CompletableFuture
.anyOf(suppliers.stream().map(s -> CompletableFuture.supplyAsync(s, fjp))
.toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
fjp.shutdownNow();
您当前使用的代码是不确定的。引用 findAny()
的 Javadoc:
The behavior of this operation is explicitly nondeterministic; it is free to select any element in the stream.
您可以使用 CompletionService
and submit all the tasks to it. Then, CompletionService.take()
将 return 第一个完成的任务 Future
。
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
在这种情况下,您最好使用 Callable
而不是 Supplier
(相同的功能签名)并使用自 Java 以来存在的良好的旧并发性 API 5:
Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
Thread.sleep(10_000);
return "slow";
}
);
ExecutorService es=Executors.newCachedThreadPool();
try {
String result = es.invokeAny(suppliers);
System.out.println(result);
} catch (InterruptedException|ExecutionException ex) {
Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();
请注意,整个“运行 all and return the fastest”如何变成单个方法调用...
它还有取消/中断所有未决操作的好处,只要有一个结果可用,所以缓慢的操作实际上不会在这里等待整整十秒(好吧,在大多数情况下,因为时间不是确定性的)。