并行流是否以线程安全的方式处理上游迭代器?

Do parallel streams treat upstream iterators in a thread safe way?

今天我使用的是在地图之后执行 parallel() 操作的流;底层源是一个迭代器,它不是线程安全的,类似于 BufferedReader.lines 实现。

我原本以为会在创建的线程上调用trySplit,然而;我观察到对迭代器的访问来自多个线程。

举例来说,以下愚蠢的迭代器实现仅设置了足够的元素以引起拆分,并且还跟踪访问 hasNext 方法的唯一线程。

class SillyIterator implements Iterator<String> {

    private final ArrayDeque<String> src =
        IntStream.range(1, 10000)
            .mapToObj(Integer::toString)
            .collect(toCollection(ArrayDeque::new));
    private Map<String, String> ts = new ConcurrentHashMap<>();
    public Set<String> threads() { return ts.keySet(); }
    private String nextRecord = null;

    @Override
    public boolean hasNext() {
        var n = Thread.currentThread().getName();
        ts.put(n, n);
        if (nextRecord != null) {
            return true;
        } else {
            nextRecord = src.poll();
            return nextRecord != null;
        }
    }
    @Override
    public String next() {
        if (nextRecord != null || hasNext()) {
            var rec = nextRecord;
            nextRecord = null;
            return rec;
        }
        throw new NoSuchElementException();
    }

}

使用它创建流如下:

var iter = new SillyIterator();
StreamSupport
    .stream(Spliterators.spliteratorUnknownSize(
        iter, Spliterator.ORDERED | Spliterator.NONNULL
    ), false)
    .map(n -> "value = " + n)
    .parallel()
    .collect(toList());

System.out.println(iter.threads());

这在我的系统上输出了两个 fork join 线程以及主线程,这让我有点害怕。

[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]

线程安全并不一定意味着只能被一个线程访问。重要的方面是没有并发访问,即没有多个线程同时访问。如果不同线程的访问是临时排序的,并且这种排序也确保了必要的内存可见性,这是​​调用者的责任,它仍然是线程安全的用法。

The Spliterator documentation 说:

Despite their obvious utility in parallel algorithms, spliterators are not expected to be thread-safe; instead, implementations of parallel algorithms using spliterators should ensure that the spliterator is only used by one thread at a time. This is generally easy to attain via serial thread-confinement, which often is a natural consequence of typical parallel algorithms that work by recursive decomposition.

拆分器不需要在其整个生命周期内都被限制在同一个线程中,但调用方应该有一个明确的交接,确保旧线程在新线程开始使用它之前停止使用它。

但重要的是,拆分器不需要是线程安全的,因此,拆分器包装的迭代器也不需要是线程安全的。

请注意,典型的行为是在开始遍历之前拆分和移交,但是由于普通的Iterator不支持拆分,因此包装拆分器必须迭代和缓冲元素来实现拆分。因此,IteratorStream 实现的角度尚未开始遍历时,经历了不同线程的遍历(但一次一个)。


就是说,BufferedReaderlines() 实现是一个你不应该效仿的坏例子。由于它以单个 readLine() 调用为中心,因此很自然地直接实现 Spliterator 而不是实现更复杂的 Iterator 并通过 spliteratorUnknownSize(…).[=27 包装它=]

由于您的示例同样以单个 poll() 调用为中心,因此直接实现 Spliterator 也很简单:

class SillySpliterator extends Spliterators.AbstractSpliterator<String> {
    private final ArrayDeque<String> src = IntStream.range(1, 10000)
        .mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));

    SillySpliterator() {
        super(Long.MAX_VALUE, ORDERED | NONNULL);
    }

    @Override
    public boolean tryAdvance(Consumer<? super String> action) {
        String nextRecord = src.poll();
        if(nextRecord == null) return false;
        action.accept(nextRecord);
        return true;
    }
}

根据您的实际情况,您还可以将实际的双端队列大小传递给构造函数并提供 SIZED 特性。

那么,你可以像这样使用它

var result = StreamSupport.stream(new SillySpliterator(), true)
    .map(n -> "value = " + n)
    .collect(toList());