并行流是否以线程安全的方式处理上游迭代器?
Do parallel streams treat upstream iterators in a thread safe way?
今天我使用的是在地图之后执行 parallel()
操作的流;底层源是一个迭代器,它不是线程安全的,类似于 BufferedReader.lines 实现。
我原本以为会在创建的线程上调用trySplit,然而;我观察到对迭代器的访问来自多个线程。
举例来说,以下愚蠢的迭代器实现仅设置了足够的元素以引起拆分,并且还跟踪访问 hasNext
方法的唯一线程。
class SillyIterator implements Iterator<String> {
private final ArrayDeque<String> src =
IntStream.range(1, 10000)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayDeque::new));
private Map<String, String> ts = new ConcurrentHashMap<>();
public Set<String> threads() { return ts.keySet(); }
private String nextRecord = null;
@Override
public boolean hasNext() {
var n = Thread.currentThread().getName();
ts.put(n, n);
if (nextRecord != null) {
return true;
} else {
nextRecord = src.poll();
return nextRecord != null;
}
}
@Override
public String next() {
if (nextRecord != null || hasNext()) {
var rec = nextRecord;
nextRecord = null;
return rec;
}
throw new NoSuchElementException();
}
}
使用它创建流如下:
var iter = new SillyIterator();
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL
), false)
.map(n -> "value = " + n)
.parallel()
.collect(toList());
System.out.println(iter.threads());
这在我的系统上输出了两个 fork join 线程以及主线程,这让我有点害怕。
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
线程安全并不一定意味着只能被一个线程访问。重要的方面是没有并发访问,即没有多个线程同时访问。如果不同线程的访问是临时排序的,并且这种排序也确保了必要的内存可见性,这是调用者的责任,它仍然是线程安全的用法。
The Spliterator
documentation 说:
Despite their obvious utility in parallel algorithms, spliterators are not expected to be thread-safe; instead, implementations of parallel algorithms using spliterators should ensure that the spliterator is only used by one thread at a time. This is generally easy to attain via serial thread-confinement, which often is a natural consequence of typical parallel algorithms that work by recursive decomposition.
拆分器不需要在其整个生命周期内都被限制在同一个线程中,但调用方应该有一个明确的交接,确保旧线程在新线程开始使用它之前停止使用它。
但重要的是,拆分器不需要是线程安全的,因此,拆分器包装的迭代器也不需要是线程安全的。
请注意,典型的行为是在开始遍历之前拆分和移交,但是由于普通的Iterator
不支持拆分,因此包装拆分器必须迭代和缓冲元素来实现拆分。因此,Iterator
在 Stream
实现的角度尚未开始遍历时,经历了不同线程的遍历(但一次一个)。
就是说,BufferedReader
的 lines()
实现是一个你不应该效仿的坏例子。由于它以单个 readLine()
调用为中心,因此很自然地直接实现 Spliterator
而不是实现更复杂的 Iterator
并通过 spliteratorUnknownSize(…)
.[=27 包装它=]
由于您的示例同样以单个 poll()
调用为中心,因此直接实现 Spliterator
也很简单:
class SillySpliterator extends Spliterators.AbstractSpliterator<String> {
private final ArrayDeque<String> src = IntStream.range(1, 10000)
.mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));
SillySpliterator() {
super(Long.MAX_VALUE, ORDERED | NONNULL);
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
String nextRecord = src.poll();
if(nextRecord == null) return false;
action.accept(nextRecord);
return true;
}
}
根据您的实际情况,您还可以将实际的双端队列大小传递给构造函数并提供 SIZED
特性。
那么,你可以像这样使用它
var result = StreamSupport.stream(new SillySpliterator(), true)
.map(n -> "value = " + n)
.collect(toList());
今天我使用的是在地图之后执行 parallel()
操作的流;底层源是一个迭代器,它不是线程安全的,类似于 BufferedReader.lines 实现。
我原本以为会在创建的线程上调用trySplit,然而;我观察到对迭代器的访问来自多个线程。
举例来说,以下愚蠢的迭代器实现仅设置了足够的元素以引起拆分,并且还跟踪访问 hasNext
方法的唯一线程。
class SillyIterator implements Iterator<String> {
private final ArrayDeque<String> src =
IntStream.range(1, 10000)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayDeque::new));
private Map<String, String> ts = new ConcurrentHashMap<>();
public Set<String> threads() { return ts.keySet(); }
private String nextRecord = null;
@Override
public boolean hasNext() {
var n = Thread.currentThread().getName();
ts.put(n, n);
if (nextRecord != null) {
return true;
} else {
nextRecord = src.poll();
return nextRecord != null;
}
}
@Override
public String next() {
if (nextRecord != null || hasNext()) {
var rec = nextRecord;
nextRecord = null;
return rec;
}
throw new NoSuchElementException();
}
}
使用它创建流如下:
var iter = new SillyIterator();
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL
), false)
.map(n -> "value = " + n)
.parallel()
.collect(toList());
System.out.println(iter.threads());
这在我的系统上输出了两个 fork join 线程以及主线程,这让我有点害怕。
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
线程安全并不一定意味着只能被一个线程访问。重要的方面是没有并发访问,即没有多个线程同时访问。如果不同线程的访问是临时排序的,并且这种排序也确保了必要的内存可见性,这是调用者的责任,它仍然是线程安全的用法。
The Spliterator
documentation 说:
Despite their obvious utility in parallel algorithms, spliterators are not expected to be thread-safe; instead, implementations of parallel algorithms using spliterators should ensure that the spliterator is only used by one thread at a time. This is generally easy to attain via serial thread-confinement, which often is a natural consequence of typical parallel algorithms that work by recursive decomposition.
拆分器不需要在其整个生命周期内都被限制在同一个线程中,但调用方应该有一个明确的交接,确保旧线程在新线程开始使用它之前停止使用它。
但重要的是,拆分器不需要是线程安全的,因此,拆分器包装的迭代器也不需要是线程安全的。
请注意,典型的行为是在开始遍历之前拆分和移交,但是由于普通的Iterator
不支持拆分,因此包装拆分器必须迭代和缓冲元素来实现拆分。因此,Iterator
在 Stream
实现的角度尚未开始遍历时,经历了不同线程的遍历(但一次一个)。
就是说,BufferedReader
的 lines()
实现是一个你不应该效仿的坏例子。由于它以单个 readLine()
调用为中心,因此很自然地直接实现 Spliterator
而不是实现更复杂的 Iterator
并通过 spliteratorUnknownSize(…)
.[=27 包装它=]
由于您的示例同样以单个 poll()
调用为中心,因此直接实现 Spliterator
也很简单:
class SillySpliterator extends Spliterators.AbstractSpliterator<String> {
private final ArrayDeque<String> src = IntStream.range(1, 10000)
.mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));
SillySpliterator() {
super(Long.MAX_VALUE, ORDERED | NONNULL);
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
String nextRecord = src.poll();
if(nextRecord == null) return false;
action.accept(nextRecord);
return true;
}
}
根据您的实际情况,您还可以将实际的双端队列大小传递给构造函数并提供 SIZED
特性。
那么,你可以像这样使用它
var result = StreamSupport.stream(new SillySpliterator(), true)
.map(n -> "value = " + n)
.collect(toList());