限制和无序流的内部变化
Internal changes for limit and unordered stream
基本上这是在尝试回答另一个问题时出现的。假设这段代码:
AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
.parallel()
.peek(x -> count.incrementAndGet())
.limit(5)
.forEach(System.out::println);
System.out.println("count = " + count);
我明白 IntStream#generate
是一个 无序的无限流 并且要完成它必须有一个短路操作 (limit
在这种情况下)。我还了解到,Supplier
可以自由调用 Stream 实现在达到该限制之前感觉的次数。
运行 在 java-8 下,将始终打印 count
512
(可能不总是,但在我的机器上是这样)。
对比运行这个在java-10下很少超过5
。所以我的问题是内部发生了什么变化,短路发生得更好(我试图通过获取资源并尝试做一些差异来自己回答这个问题......)
变化发生在 Java 9,beta 103 和 Java 9,beta 120 (JDK‑8154387) 之间。
负责的 class 是 StreamSpliterators.UnorderedSliceSpliterator.OfInt
,resp.它的超级 class StreamSpliterators.UnorderedSliceSpliterator
.
老版本的class长得像
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of CHUNK_SIZE
if (sb == null)
sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
正如我们所见,它尝试在每个拆分器中缓冲最多 CHUNK_SIZE = 1 << 7
个元素,最终可能达到“CPU 个核心数”×128 个元素。
相比之下,新版本看起来像
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
protected final int chunkSize;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
this.chunkSize = parent.chunkSize;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of chunkSize
if (sb == null)
sb = new ArrayBuffer.OfRef<>(chunkSize);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
所以现在有一个实例字段chunkSize
。当存在定义的限制并且表达式 ((skip + limit) / AbstractTask.LEAF_TARGET) + 1
的计算结果小于 CHUNK_SIZE
时,将使用较小的值。因此,当限制较小时,chunkSize
会小得多。在您的限制为 5
的情况下,块大小将始终为 1
.
基本上这是在尝试回答另一个问题时出现的。假设这段代码:
AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
.parallel()
.peek(x -> count.incrementAndGet())
.limit(5)
.forEach(System.out::println);
System.out.println("count = " + count);
我明白 IntStream#generate
是一个 无序的无限流 并且要完成它必须有一个短路操作 (limit
在这种情况下)。我还了解到,Supplier
可以自由调用 Stream 实现在达到该限制之前感觉的次数。
运行 在 java-8 下,将始终打印 count
512
(可能不总是,但在我的机器上是这样)。
对比运行这个在java-10下很少超过5
。所以我的问题是内部发生了什么变化,短路发生得更好(我试图通过获取资源并尝试做一些差异来自己回答这个问题......)
变化发生在 Java 9,beta 103 和 Java 9,beta 120 (JDK‑8154387) 之间。
负责的 class 是 StreamSpliterators.UnorderedSliceSpliterator.OfInt
,resp.它的超级 class StreamSpliterators.UnorderedSliceSpliterator
.
老版本的class长得像
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of CHUNK_SIZE
if (sb == null)
sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
正如我们所见,它尝试在每个拆分器中缓冲最多 CHUNK_SIZE = 1 << 7
个元素,最终可能达到“CPU 个核心数”×128 个元素。
相比之下,新版本看起来像
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
protected final int chunkSize;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
this.chunkSize = parent.chunkSize;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of chunkSize
if (sb == null)
sb = new ArrayBuffer.OfRef<>(chunkSize);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
所以现在有一个实例字段chunkSize
。当存在定义的限制并且表达式 ((skip + limit) / AbstractTask.LEAF_TARGET) + 1
的计算结果小于 CHUNK_SIZE
时,将使用较小的值。因此,当限制较小时,chunkSize
会小得多。在您的限制为 5
的情况下,块大小将始终为 1
.