找到 findAny 匹配项后如何停止并行流?
How to stop parallel stream once findAny match found?
我正在尝试找到与给定谓词匹配的列表中的第一个(任何)成员,如下所示:
Item item = items.parallelStream()
.map(i -> i.doSomethingExpensive())
.filter(predicate)
.findAny()
.orElse(null);
我希望一旦 findAny()
获得匹配,它会立即 return,但情况似乎并非如此。相反,它似乎在 returning 之前等待 map 方法完成大部分元素。如何立即 return 第一个结果并取消其他并行流?有没有比使用 CompletableFuture
?
这样的流更好的方法呢?
这里有一个简单的例子来展示行为:
private static void log(String msg) {
private static void log(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " " + msg);
}
Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try { Thread.sleep(delay); }
catch (InterruptedException e) { System.err.println("Interruption error"); }
return n * n;
})
.filter(n -> n < 30)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
日志输出:
14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]
一旦找到匹配项(在本例中为 16),findAny()
不会立即 return,而是阻塞直到其余线程完成。在这种情况下,在找到匹配项后 returning 之前,调用者会额外等待 5 秒。
您可以使用这段代码来说明 parallelStream 的工作原理:
final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");
String result = list.parallelStream()
.map(s -> {
System.out.println("map: " + s);
return s;
})
.filter(s -> {
System.out.println("fiter: " + s);
return s.equals("8th");
})
.findFirst()
.orElse(null);
System.out.println("result=" + result);
有两个选项可以实现您正在寻找的东西,以停止使用过滤器进行昂贵的操作:
- 根本不要使用流,使用简单的 for 或增强的 for
- 先过滤,再映射昂贵的操作
Instead it seems to wait for the map method to finish on most of the elements before returning.
这是不正确的。
当谈到已经在处理的元素时,它将等待所有元素完成,因为流API允许并发处理数据本质上不是线程安全的结构。它必须确保在从终端操作 returning 之前完成所有潜在的并发访问。
当谈到整个流时,在 8 核机器上测试只有 14 个元素的流是不公平的。当然,至少会启动8个并发操作,仅此而已。您正在使用 findFirst()
而不是 findAny()
来火上浇油,因为这并不意味着 return 处理顺序中第一个找到的元素,而是遇到顺序中的第一个元素,即在你的例子中恰好为零,所以处理其他块的线程不能假设他们的结果是正确的答案并且比 findAny()
.
更愿意帮助处理其他候选者
当你使用
List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
})
.filter(n -> n < 40_000)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
尽管流元素的数量要多得多,但您将获得相似数量的任务 运行 完成。
请注意 CompletableFuture
也不支持中断,因此我想到的 return 处理任何结果并取消其他作业的唯一内置功能是旧的 ExecutorService.invokeAny
.
要为其构建映射和过滤功能,我们可以使用以下辅助函数:
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
return () -> {
R r = f.apply(t);
if(!p.test(r)) throw new NoSuchElementException();
return r;
};
}
不幸的是,只有用值或异常完成的选项,因此我们必须对不匹配的元素使用异常。
那我们就可以这样使用了
ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
.mapToObj(i -> mapAndfilter(i,
n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
},
n -> n < 10_000))
.collect(Collectors.toList()));
log("result: "+result);
它不仅会取消挂起的任务,还会 return 不等待它们完成。
当然,这意味着源数据(作业所依据的数据)必须是不可变的或线程安全的。
这里有几件事在起作用。第一件事是 parallelStream()
默认使用公共 ForkJoinPool
,这使得调用线程也参与其中。这意味着如果其中一个慢速任务当前正在 运行 调用线程上,它必须在调用者取回控制权之前完成。
您可以通过稍微修改代码来记录线程名称,并在完成等待时记录:
private static void log(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + " " + msg);
}
public static void main(String[] args) {
Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
System.err.println("Interruption error");
}
log("finished waiting");
return n * n;
})
.filter(n -> n < 30)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
}
示例输出:
13:56:52.954 [main] Waiting on 9 for 9936 ms
13:56:52.956 [ForkJoinPool.commonPool-worker-1] Waiting on 4 for 7436 ms
13:56:52.970 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 6523 ms
13:56:52.983 [ForkJoinPool.commonPool-worker-3] Waiting on 6 for 7488 ms
13:56:59.494 [ForkJoinPool.commonPool-worker-2] finished waiting
13:56:59.496 [ForkJoinPool.commonPool-worker-2] Found match: 1
13:57:00.392 [ForkJoinPool.commonPool-worker-1] finished waiting
13:57:00.392 [ForkJoinPool.commonPool-worker-1] Found match: 16
13:57:00.471 [ForkJoinPool.commonPool-worker-3] finished waiting
13:57:02.892 [main] finished waiting
13:57:02.894 [main] First match: Optional[1]
如您所见,找到了 2 个匹配项,但主线程仍然很忙,因此现在无法 return 匹配项。
虽然这并不总能解释所有情况:
13:58:52.116 [main] Waiting on 9 for 5256 ms
13:58:52.143 [ForkJoinPool.commonPool-worker-1] Waiting on 4 for 4220 ms
13:58:52.148 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 2136 ms
13:58:52.158 [ForkJoinPool.commonPool-worker-3] Waiting on 6 for 7262 ms
13:58:54.294 [ForkJoinPool.commonPool-worker-2] finished waiting
13:58:54.295 [ForkJoinPool.commonPool-worker-2] Found match: 1
13:58:56.364 [ForkJoinPool.commonPool-worker-1] finished waiting
13:58:56.364 [ForkJoinPool.commonPool-worker-1] Found match: 16
13:58:57.399 [main] finished waiting
13:58:59.422 [ForkJoinPool.commonPool-worker-3] finished waiting
13:58:59.424 [main] First match: Optional[1]
这可以用 fork-join 池合并结果的方式来解释。似乎可以进行一些改进。
作为替代方案,您确实可以使用 CompletableFuture
:
// you should probably also pass your own executor to supplyAsync()
List<CompletableFuture<Integer>> futures = nums.stream().map(n -> CompletableFuture.supplyAsync(() -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
System.err.println("Interruption error");
}
log("finished waiting");
return n * n;
})).collect(Collectors.toList());
CompletableFuture<Integer> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(unused -> futures.stream().map(CompletableFuture::join).filter(n -> n < 30).findAny().orElse(null));
// shortcircuiting
futures.forEach(f -> f.thenAccept(r -> {
if (r < 30) {
log("Found match: " + r);
result.complete(r);
}
}));
// cancelling remaining tasks
result.whenComplete((r, t) -> futures.forEach(f -> f.cancel(true)));
log("First match: " + result.join());
输出:
14:57:39.815 [ForkJoinPool.commonPool-worker-1] Waiting on 0 for 7964 ms
14:57:39.815 [ForkJoinPool.commonPool-worker-3] Waiting on 2 for 5743 ms
14:57:39.817 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 9179 ms
14:57:45.562 [ForkJoinPool.commonPool-worker-3] finished waiting
14:57:45.563 [ForkJoinPool.commonPool-worker-3] Found match: 4
14:57:45.564 [ForkJoinPool.commonPool-worker-3] Waiting on 3 for 7320 ms
14:57:45.566 [main] First match: 4
请注意,cancel(true)
实际上并没有取消正在进行的任务(例如不会发生中断),但它阻止了进一步的任务运行(你甚至可以看到它可能不会立即执行,因为 worker 3 仍然开始执行下一个。
您还应该使用自己的执行器,根据它是更 CPU 还是 I/O 密集使用适当的大小。如您所见,默认使用公共池,因此它不使用所有内核。
allOf()
主要是在找不到匹配项的情况下需要的。如果你能保证至少有一个匹配项,你可以简单地使用 `new CompletableFuture() 来代替。
最后,作为一种简单的方法,我重复了 filter
检查,但是很容易将该逻辑移动到主逻辑、return null
或标记中,然后进行测试在这两个地方。
另见 How to make a future that gets completed when any of the given CompletableFutures is completed with a result that matches a certain predicate?
我正在尝试找到与给定谓词匹配的列表中的第一个(任何)成员,如下所示:
Item item = items.parallelStream()
.map(i -> i.doSomethingExpensive())
.filter(predicate)
.findAny()
.orElse(null);
我希望一旦 findAny()
获得匹配,它会立即 return,但情况似乎并非如此。相反,它似乎在 returning 之前等待 map 方法完成大部分元素。如何立即 return 第一个结果并取消其他并行流?有没有比使用 CompletableFuture
?
这里有一个简单的例子来展示行为:
private static void log(String msg) {
private static void log(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " " + msg);
}
Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try { Thread.sleep(delay); }
catch (InterruptedException e) { System.err.println("Interruption error"); }
return n * n;
})
.filter(n -> n < 30)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
日志输出:
14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]
一旦找到匹配项(在本例中为 16),findAny()
不会立即 return,而是阻塞直到其余线程完成。在这种情况下,在找到匹配项后 returning 之前,调用者会额外等待 5 秒。
您可以使用这段代码来说明 parallelStream 的工作原理:
final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");
String result = list.parallelStream()
.map(s -> {
System.out.println("map: " + s);
return s;
})
.filter(s -> {
System.out.println("fiter: " + s);
return s.equals("8th");
})
.findFirst()
.orElse(null);
System.out.println("result=" + result);
有两个选项可以实现您正在寻找的东西,以停止使用过滤器进行昂贵的操作:
- 根本不要使用流,使用简单的 for 或增强的 for
- 先过滤,再映射昂贵的操作
Instead it seems to wait for the map method to finish on most of the elements before returning.
这是不正确的。
当谈到已经在处理的元素时,它将等待所有元素完成,因为流API允许并发处理数据本质上不是线程安全的结构。它必须确保在从终端操作 returning 之前完成所有潜在的并发访问。
当谈到整个流时,在 8 核机器上测试只有 14 个元素的流是不公平的。当然,至少会启动8个并发操作,仅此而已。您正在使用 findFirst()
而不是 findAny()
来火上浇油,因为这并不意味着 return 处理顺序中第一个找到的元素,而是遇到顺序中的第一个元素,即在你的例子中恰好为零,所以处理其他块的线程不能假设他们的结果是正确的答案并且比 findAny()
.
当你使用
List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
})
.filter(n -> n < 40_000)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
尽管流元素的数量要多得多,但您将获得相似数量的任务 运行 完成。
请注意 CompletableFuture
也不支持中断,因此我想到的 return 处理任何结果并取消其他作业的唯一内置功能是旧的 ExecutorService.invokeAny
.
要为其构建映射和过滤功能,我们可以使用以下辅助函数:
static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
return () -> {
R r = f.apply(t);
if(!p.test(r)) throw new NoSuchElementException();
return r;
};
}
不幸的是,只有用值或异常完成的选项,因此我们必须对不匹配的元素使用异常。
那我们就可以这样使用了
ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
.mapToObj(i -> mapAndfilter(i,
n -> {
long delay = ThreadLocalRandom.current().nextInt(10_000);
log("Waiting on " + n + " for " + delay + " ms");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
return n * n;
},
n -> n < 10_000))
.collect(Collectors.toList()));
log("result: "+result);
它不仅会取消挂起的任务,还会 return 不等待它们完成。
当然,这意味着源数据(作业所依据的数据)必须是不可变的或线程安全的。
这里有几件事在起作用。第一件事是 parallelStream()
默认使用公共 ForkJoinPool
,这使得调用线程也参与其中。这意味着如果其中一个慢速任务当前正在 运行 调用线程上,它必须在调用者取回控制权之前完成。
您可以通过稍微修改代码来记录线程名称,并在完成等待时记录:
private static void log(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + " " + msg);
}
public static void main(String[] args) {
Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
System.err.println("Interruption error");
}
log("finished waiting");
return n * n;
})
.filter(n -> n < 30)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
}
示例输出:
13:56:52.954 [main] Waiting on 9 for 9936 ms
13:56:52.956 [ForkJoinPool.commonPool-worker-1] Waiting on 4 for 7436 ms
13:56:52.970 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 6523 ms
13:56:52.983 [ForkJoinPool.commonPool-worker-3] Waiting on 6 for 7488 ms
13:56:59.494 [ForkJoinPool.commonPool-worker-2] finished waiting
13:56:59.496 [ForkJoinPool.commonPool-worker-2] Found match: 1
13:57:00.392 [ForkJoinPool.commonPool-worker-1] finished waiting
13:57:00.392 [ForkJoinPool.commonPool-worker-1] Found match: 16
13:57:00.471 [ForkJoinPool.commonPool-worker-3] finished waiting
13:57:02.892 [main] finished waiting
13:57:02.894 [main] First match: Optional[1]
如您所见,找到了 2 个匹配项,但主线程仍然很忙,因此现在无法 return 匹配项。
虽然这并不总能解释所有情况:
13:58:52.116 [main] Waiting on 9 for 5256 ms
13:58:52.143 [ForkJoinPool.commonPool-worker-1] Waiting on 4 for 4220 ms
13:58:52.148 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 2136 ms
13:58:52.158 [ForkJoinPool.commonPool-worker-3] Waiting on 6 for 7262 ms
13:58:54.294 [ForkJoinPool.commonPool-worker-2] finished waiting
13:58:54.295 [ForkJoinPool.commonPool-worker-2] Found match: 1
13:58:56.364 [ForkJoinPool.commonPool-worker-1] finished waiting
13:58:56.364 [ForkJoinPool.commonPool-worker-1] Found match: 16
13:58:57.399 [main] finished waiting
13:58:59.422 [ForkJoinPool.commonPool-worker-3] finished waiting
13:58:59.424 [main] First match: Optional[1]
这可以用 fork-join 池合并结果的方式来解释。似乎可以进行一些改进。
作为替代方案,您确实可以使用 CompletableFuture
:
// you should probably also pass your own executor to supplyAsync()
List<CompletableFuture<Integer>> futures = nums.stream().map(n -> CompletableFuture.supplyAsync(() -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
System.err.println("Interruption error");
}
log("finished waiting");
return n * n;
})).collect(Collectors.toList());
CompletableFuture<Integer> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(unused -> futures.stream().map(CompletableFuture::join).filter(n -> n < 30).findAny().orElse(null));
// shortcircuiting
futures.forEach(f -> f.thenAccept(r -> {
if (r < 30) {
log("Found match: " + r);
result.complete(r);
}
}));
// cancelling remaining tasks
result.whenComplete((r, t) -> futures.forEach(f -> f.cancel(true)));
log("First match: " + result.join());
输出:
14:57:39.815 [ForkJoinPool.commonPool-worker-1] Waiting on 0 for 7964 ms
14:57:39.815 [ForkJoinPool.commonPool-worker-3] Waiting on 2 for 5743 ms
14:57:39.817 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 9179 ms
14:57:45.562 [ForkJoinPool.commonPool-worker-3] finished waiting
14:57:45.563 [ForkJoinPool.commonPool-worker-3] Found match: 4
14:57:45.564 [ForkJoinPool.commonPool-worker-3] Waiting on 3 for 7320 ms
14:57:45.566 [main] First match: 4
请注意,cancel(true)
实际上并没有取消正在进行的任务(例如不会发生中断),但它阻止了进一步的任务运行(你甚至可以看到它可能不会立即执行,因为 worker 3 仍然开始执行下一个。
您还应该使用自己的执行器,根据它是更 CPU 还是 I/O 密集使用适当的大小。如您所见,默认使用公共池,因此它不使用所有内核。
allOf()
主要是在找不到匹配项的情况下需要的。如果你能保证至少有一个匹配项,你可以简单地使用 `new CompletableFuture() 来代替。
最后,作为一种简单的方法,我重复了 filter
检查,但是很容易将该逻辑移动到主逻辑、return null
或标记中,然后进行测试在这两个地方。
另见 How to make a future that gets completed when any of the given CompletableFutures is completed with a result that matches a certain predicate?