找到 findAny 匹配项后如何停止并行流?

How to stop parallel stream once findAny match found?

我正在尝试找到与给定谓词匹配的列表中的第一个(任何)成员,如下所示:

Item item = items.parallelStream()
  .map(i -> i.doSomethingExpensive())
  .filter(predicate)
  .findAny()
  .orElse(null);

我希望一旦 findAny() 获得匹配,它会立即 return,但情况似乎并非如此。相反,它似乎在 returning 之前等待 map 方法完成大部分元素。如何立即 return 第一个结果并取消其他并行流?有没有比使用 CompletableFuture?

这样的流更好的方法呢?

这里有一个简单的例子来展示行为:

private static void log(String msg) {
    private static void log(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(sdf.format(new Date()) + " " + msg);
}

Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
  .map(n -> {
    long delay = Math.abs(random.nextLong()) % 10000;
    log("Waiting on " + n + " for " + delay + " ms");
    try { Thread.sleep(delay); }
    catch (InterruptedException e) { System.err.println("Interruption error"); }
    return n * n;
  })
  .filter(n -> n < 30)
  .peek(n -> log("Found match: " + n))
  .findAny();

log("First match: " + num);

日志输出:

14:52:27.061 Waiting on 9 for 2271 ms
14:52:27.061 Waiting on 2 for 1124 ms
14:52:27.061 Waiting on 13 for 547 ms
14:52:27.061 Waiting on 4 for 517 ms
14:52:27.061 Waiting on 1 for 1210 ms
14:52:27.061 Waiting on 6 for 2646 ms
14:52:27.061 Waiting on 0 for 4393 ms
14:52:27.061 Waiting on 12 for 5520 ms
14:52:27.581 Found match: 16
14:52:27.582 Waiting on 3 for 5365 ms
14:52:28.188 Found match: 4
14:52:28.275 Found match: 1
14:52:31.457 Found match: 0
14:52:32.950 Found match: 9
14:52:32.951 First match: Optional[0]

一旦找到匹配项(在本例中为 16),findAny() 不会立即 return,而是阻塞直到其余线程完成。在这种情况下,在找到匹配项后 returning 之前,调用者会额外等待 5 秒。

您可以使用这段代码来说明 parallelStream 的工作原理:

final List<String> list = Arrays.asList("first", "second", "third", "4th", "5th", "7th", "8th", "9th", "10th", "11th", "12th", "13th");

    String result = list.parallelStream()
                        .map(s -> {
                            System.out.println("map: " + s);
                            return s;
                        })
                        .filter(s -> {
                            System.out.println("fiter: " + s);
                            return s.equals("8th");
                        })
                        .findFirst()
                        .orElse(null);

    System.out.println("result=" + result);

有两个选项可以实现您正在寻找的东西,以停止使用过滤器进行昂贵的操作:

  1. 根本不要使用流,使用简单的 for 或增强的 for
  2. 先过滤,再映射昂贵的操作

Instead it seems to wait for the map method to finish on most of the elements before returning.

这是不正确的。

当谈到已经在处理的元素时,它将等待所有元素完成,因为流API允许并发处理数据本质上不是线程安全的结构。它必须确保在从终端操作 returning 之前完成所有潜在的并发访问。

当谈到整个流时,在 8 核机器上测试只有 14 个元素的流是不公平的。当然,至少会启动8个并发操作,仅此而已。您正在使用 findFirst() 而不是 findAny() 来火上浇油,因为这并不意味着 return 处理顺序中第一个找到的元素,而是遇到顺序中的第一个元素,即在你的例子中恰好为零,所以处理其他块的线程不能假设他们的结果是正确的答案并且比 findAny().

更愿意帮助处理其他候选者

当你使用

List<Integer> nums = IntStream.range(0, 200).boxed().collect(Collectors.toList());
Optional<Integer> num = nums.parallelStream()
        .map(n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        })
        .filter(n -> n < 40_000)
        .peek(n -> log("Found match: " + n))
        .findAny();

log("First match: " + num);

尽管流元素的数量要多得多,但您将获得相似数量的任务 运行 完成。

请注意 CompletableFuture 也不支持中断,因此我想到的 return 处理任何结果并取消其他作业的唯一内置功能是旧的 ExecutorService.invokeAny.

要为其构建映射和过滤功能,我们可以使用以下辅助函数:

static <T,R> Callable<R> mapAndfilter(T t, Function<T,R> f, Predicate<? super R> p) {
    return () -> {
        R r = f.apply(t);
        if(!p.test(r)) throw new NoSuchElementException();
        return r;
    };
}

不幸的是,只有用值或异常完成的选项,因此我们必须对不匹配的元素使用异常。

那我们就可以这样使用了

ExecutorService es = ForkJoinPool.commonPool();
Integer result = es.invokeAny(IntStream.range(0, 100)
    .mapToObj(i -> mapAndfilter(i,
        n -> {
            long delay = ThreadLocalRandom.current().nextInt(10_000);
            log("Waiting on " + n + " for " + delay + " ms");
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(delay));
            return n * n;
        },
        n -> n < 10_000))
    .collect(Collectors.toList()));

log("result: "+result);

它不仅会取消挂起的任务,还会 return 不等待它们完成。

当然,这意味着源数据(作业所依据的数据)必须是不可变的或线程安全的。

这里有几件事在起作用。第一件事是 parallelStream() 默认使用公共 ForkJoinPool,这使得调用线程也参与其中。这意味着如果其中一个慢速任务当前正在 运行 调用线程上,它必须在调用者取回控制权之前完成。

您可以通过稍微修改代码来记录线程名称,并在完成等待时记录:

private static void log(String msg) {
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    System.out.println(sdf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + " " + msg);
}

public static void main(String[] args) {
    Random random = new Random();
    List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
    Optional<Integer> num = nums.parallelStream()
            .map(n -> {
                long delay = Math.abs(random.nextLong()) % 10000;
                log("Waiting on " + n + " for " + delay + " ms");
                try {
                    Thread.sleep(delay);
                } catch (InterruptedException e) {
                    System.err.println("Interruption error");
                }
                log("finished waiting");
                return n * n;
            })
            .filter(n -> n < 30)
            .peek(n -> log("Found match: " + n))
            .findAny();

    log("First match: " + num);
}

示例输出:

13:56:52.954 [main]  Waiting on 9 for 9936 ms
13:56:52.956 [ForkJoinPool.commonPool-worker-1]  Waiting on 4 for 7436 ms
13:56:52.970 [ForkJoinPool.commonPool-worker-2]  Waiting on 1 for 6523 ms
13:56:52.983 [ForkJoinPool.commonPool-worker-3]  Waiting on 6 for 7488 ms
13:56:59.494 [ForkJoinPool.commonPool-worker-2]  finished waiting
13:56:59.496 [ForkJoinPool.commonPool-worker-2]  Found match: 1
13:57:00.392 [ForkJoinPool.commonPool-worker-1]  finished waiting
13:57:00.392 [ForkJoinPool.commonPool-worker-1]  Found match: 16
13:57:00.471 [ForkJoinPool.commonPool-worker-3]  finished waiting
13:57:02.892 [main]  finished waiting
13:57:02.894 [main]  First match: Optional[1]

如您所见,找到了 2 个匹配项,但主线程仍然很忙,因此现在无法 return 匹配项。

虽然这并不总能解释所有情况:

13:58:52.116 [main]  Waiting on 9 for 5256 ms
13:58:52.143 [ForkJoinPool.commonPool-worker-1]  Waiting on 4 for 4220 ms
13:58:52.148 [ForkJoinPool.commonPool-worker-2]  Waiting on 1 for 2136 ms
13:58:52.158 [ForkJoinPool.commonPool-worker-3]  Waiting on 6 for 7262 ms
13:58:54.294 [ForkJoinPool.commonPool-worker-2]  finished waiting
13:58:54.295 [ForkJoinPool.commonPool-worker-2]  Found match: 1
13:58:56.364 [ForkJoinPool.commonPool-worker-1]  finished waiting
13:58:56.364 [ForkJoinPool.commonPool-worker-1]  Found match: 16
13:58:57.399 [main]  finished waiting
13:58:59.422 [ForkJoinPool.commonPool-worker-3]  finished waiting
13:58:59.424 [main]  First match: Optional[1]

这可以用 fork-join 池合并结果的方式来解释。似乎可以进行一些改进。

作为替代方案,您确实可以使用 CompletableFuture:

// you should probably also pass your own executor to supplyAsync()
List<CompletableFuture<Integer>> futures = nums.stream().map(n -> CompletableFuture.supplyAsync(() -> {
    long delay = Math.abs(random.nextLong()) % 10000;
    log("Waiting on " + n + " for " + delay + " ms");
    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        System.err.println("Interruption error");
    }
    log("finished waiting");
    return n * n;
})).collect(Collectors.toList());
CompletableFuture<Integer> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(unused -> futures.stream().map(CompletableFuture::join).filter(n -> n < 30).findAny().orElse(null));
// shortcircuiting
futures.forEach(f -> f.thenAccept(r -> {
    if (r < 30) {
        log("Found match: " + r);
        result.complete(r);
    }
}));
// cancelling remaining tasks
result.whenComplete((r, t) -> futures.forEach(f -> f.cancel(true)));

log("First match: " + result.join());

输出:

14:57:39.815 [ForkJoinPool.commonPool-worker-1]  Waiting on 0 for 7964 ms
14:57:39.815 [ForkJoinPool.commonPool-worker-3]  Waiting on 2 for 5743 ms
14:57:39.817 [ForkJoinPool.commonPool-worker-2]  Waiting on 1 for 9179 ms
14:57:45.562 [ForkJoinPool.commonPool-worker-3]  finished waiting
14:57:45.563 [ForkJoinPool.commonPool-worker-3]  Found match: 4
14:57:45.564 [ForkJoinPool.commonPool-worker-3]  Waiting on 3 for 7320 ms
14:57:45.566 [main]  First match: 4

请注意,cancel(true)实际上并没有取消正在进行的任务(例如不会发生中断),但它阻止了进一步的任务运行(你甚至可以看到它可能不会立即执行,因为 worker 3 仍然开始执行下一个。

您还应该使用自己的执行器,根据它是更 CPU 还是 I/O 密集使用适当的大小。如您所见,默认使用公共池,因此它不使用所有内核。

allOf()主要是在找不到匹配项的情况下需要的。如果你能保证至少有一个匹配项,你可以简单地使用 `new CompletableFuture() 来代替。

最后,作为一种简单的方法,我重复了 filter 检查,但是很容易将该逻辑移动到主逻辑、return null 或标记中,然后进行测试在这两个地方。

另见 How to make a future that gets completed when any of the given CompletableFutures is completed with a result that matches a certain predicate?