分区一个Java8个流
Partition a Java 8 Stream
如何在Java 8 Stream上实现"partition"操作?我所说的分区是指,将一个流分成给定大小的子流。它将以某种方式与 Guava Iterators.partition() 方法相同,只是希望分区是延迟计算的流而不是列表的。
正如 Jon Skeet 在他的 中所展示的那样,似乎不可能使分区惰性化。对于非惰性分区,我已经有了这个代码:
public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;
return StreamSupport.stream(iterable.spliterator(), false);
}
无法将任意源流划分为固定大小的批次,因为这会搞砸并行处理。并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区。
然而,可以从随机访问创建分区流 List
。此类功能可用,例如,在我的 StreamEx
库中:
List<Type> input = Arrays.asList(...);
Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
或者如果你真的想要流的流:
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
如果不想依赖第三方库,可以手动实现这样的ofSubLists
方法:
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1) / length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
这个实现看起来有点长,但它考虑了一些特殊情况,例如 close-to-MAX_VALUE 列表大小。
如果你想要无序流的并行友好解决方案(所以你不关心哪些流元素将在单个批次中组合),你可以像这样使用收集器(感谢@sibnick 的灵感):
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}
用法示例:
List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));
结果:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
这样的收集器是完全线程安全的,并为顺序流生成有序的批次。
如果要对每个批次应用中间转换,可以使用以下版本:
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
例如,通过这种方式,您可以即时对每批中的数字求和:
List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));
我认为内部有某种破解是可能的:
为批次创建实用程序 class:
public static class ConcurrentBatch {
private AtomicLong id = new AtomicLong();
private int batchSize;
public ConcurrentBatch(int batchSize) {
this.batchSize = batchSize;
}
public long next() {
return (id.getAndIncrement()) / batchSize;
}
public int getBatchSize() {
return batchSize;
}
}
和方法:
public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
ConcurrentBatch batch = new ConcurrentBatch(batchSize);
//hack java map: extends and override computeIfAbsent
Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
@Override
public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
List<T> rs = super.computeIfAbsent(key, mappingFunction);
//apply batchFunc to old lists, when new batch list is created
if(rs.isEmpty()){
for(Entry<Long, List<T>> e : entrySet()) {
List<T> batchList = e.getValue();
//todo: need to improve
synchronized (batchList) {
if (batchList.size() == batch.getBatchSize()){
batchFunc.accept(batchList);
remove(e.getKey());
batchList.clear();
}
}
}
}
return rs;
}
};
stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
.collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
.entrySet()
.stream()
//map contains only unprocessed lists (size<batchSize)
.forEach(e -> batchFunc.accept(e.getValue()));
}
如果您想按顺序使用 Stream,则可以对 Stream 进行分区(以及执行相关功能,例如 windowing - 我认为这就是您在这种情况下真正想要的)。
两个支持标准流分区的库是 cyclops-react (I am the author) and jOOλ,其中 cyclops-react 扩展了(以添加窗口化等功能)。
cyclops-streams有一组静态函数StreamUtils用于对JavaStreams进行操作,还有splitAt、headAndTail、splitBy、partition等一系列函数用于分区。
要将一个 Stream window 转换为大小为 30 的嵌套 Stream 的 Stream,您可以使用 window 方法。
就 OP 而言,在 Streaming 术语中,将一个 Stream 拆分为多个给定大小的 Stream 是一个 Windowing 操作(而不是 Partitioning 操作)。
Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
有一个名为 ReactiveSeq that extends jool.Seq 的 Stream 扩展 class 并添加了 Windowing 功能,这可能会使代码更简洁。
ReactiveSeq<Integer> seq;
ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
正如 Tagir 在上面指出的那样,这不适合并行流。如果您想 window 或批处理您希望以多线程方式执行的流。 cyclops-react 中的 LazyFutureStream 可能会有用(窗口化在待办事项列表中,但现在可以使用普通的旧批处理)。
在这种情况下,数据将从执行 Stream 的多个线程传递到 Multi-Producer/Single-Consumer 无等待队列,并且来自该队列的顺序数据可以 windowed 在分发给线程之前再次。
Stream<List<Data>> batched = new LazyReact().range(0,1000)
.grouped(30)
.map(this::process);
的快速解决方案
IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
免责声明:本人是AbacusUtil的开发者
我找到的最优雅最纯粹的java8个解决这个问题的方法:
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
.mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
.collect(toList());
}
//
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
return (list.size() + batchSize- 1) / batchSize;
}
这是一个纯粹的 Java 解决方案,它是惰性求值的,而不是使用 List。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
方法 returns Stream<List<T>>
灵活。您可以通过 partition(something, 10).map(List::stream)
.
轻松将其转换为 Stream<Stream<T>>
我找到了一个优雅的解决方案:Iterable parts = Iterables.partition(stream::iterator, size)
这是一个纯粹的 Java 8 解决方案 - 顺序和并行:
public <T> Collection<List<T>> chunk(Collection<T> collection, int chunkSize) {
final AtomicInteger index = new AtomicInteger();
return collection.stream()
.map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
// LinkedHashMap is used here just to preserve order
.collect(groupingBy(Entry::getKey, LinkedHashMap::new, mapping(Entry::getValue, toList())))
.values();
}
public <T> Collection<List<T>> chunkParallel(Collection<T> collection, int chunkSize) {
final AtomicInteger index = new AtomicInteger();
return collection.parallelStream()
.map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
// So far it is parallel processing ordering cannot be preserved,
// but we have to make it thread safe - using e.g. ConcurrentHashMap
.collect(groupingBy(Entry::getKey, ConcurrentHashMap::new, mapping(Entry::getValue, toList())))
.values();
}
这是一种高效的方式
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
public final class Partition<T> extends AbstractList<List<T>> {
private final List<T> list;
private final int chunkSize;
public Partition(List<T> list, int chunkSize) {
this.list = new ArrayList<>(list);
this.chunkSize = chunkSize;
}
public static <T> Partition<T> ofSize(List<T> list, int chunkSize) {
return new Partition<>(list, chunkSize);
}
@Override
public List<T> get(int index) {
int start = index * chunkSize;
int end = Math.min(start + chunkSize, list.size());
if (start > end) {
throw new IndexOutOfBoundsException("Index " + index + " is out of the list range <0," + (size() - 1) + ">");
}
return new ArrayList<>(list.subList(start, end));
}
@Override
public int size() {
return (int) Math.ceil((double) list.size() / (double) chunkSize);
}
}
用法
Partition<String> partition = Partition.ofSize(paCustomerCodes, chunkSize);
for (List<String> strings : partition) {
}
如何在Java 8 Stream上实现"partition"操作?我所说的分区是指,将一个流分成给定大小的子流。它将以某种方式与 Guava Iterators.partition() 方法相同,只是希望分区是延迟计算的流而不是列表的。
正如 Jon Skeet 在他的
public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;
return StreamSupport.stream(iterable.spliterator(), false);
}
无法将任意源流划分为固定大小的批次,因为这会搞砸并行处理。并行处理时,您可能不知道拆分后第一个子任务中有多少元素,因此在第一个子任务完全处理之前,您无法为下一个子任务创建分区。
然而,可以从随机访问创建分区流 List
。此类功能可用,例如,在我的 StreamEx
库中:
List<Type> input = Arrays.asList(...);
Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
或者如果你真的想要流的流:
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
如果不想依赖第三方库,可以手动实现这样的ofSubLists
方法:
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1) / length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
这个实现看起来有点长,但它考虑了一些特殊情况,例如 close-to-MAX_VALUE 列表大小。
如果你想要无序流的并行友好解决方案(所以你不关心哪些流元素将在单个批次中组合),你可以像这样使用收集器(感谢@sibnick 的灵感):
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}
用法示例:
List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));
结果:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
这样的收集器是完全线程安全的,并为顺序流生成有序的批次。
如果要对每个批次应用中间转换,可以使用以下版本:
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
例如,通过这种方式,您可以即时对每批中的数字求和:
List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));
我认为内部有某种破解是可能的:
为批次创建实用程序 class:
public static class ConcurrentBatch {
private AtomicLong id = new AtomicLong();
private int batchSize;
public ConcurrentBatch(int batchSize) {
this.batchSize = batchSize;
}
public long next() {
return (id.getAndIncrement()) / batchSize;
}
public int getBatchSize() {
return batchSize;
}
}
和方法:
public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
ConcurrentBatch batch = new ConcurrentBatch(batchSize);
//hack java map: extends and override computeIfAbsent
Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
@Override
public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
List<T> rs = super.computeIfAbsent(key, mappingFunction);
//apply batchFunc to old lists, when new batch list is created
if(rs.isEmpty()){
for(Entry<Long, List<T>> e : entrySet()) {
List<T> batchList = e.getValue();
//todo: need to improve
synchronized (batchList) {
if (batchList.size() == batch.getBatchSize()){
batchFunc.accept(batchList);
remove(e.getKey());
batchList.clear();
}
}
}
}
return rs;
}
};
stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
.collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
.entrySet()
.stream()
//map contains only unprocessed lists (size<batchSize)
.forEach(e -> batchFunc.accept(e.getValue()));
}
如果您想按顺序使用 Stream,则可以对 Stream 进行分区(以及执行相关功能,例如 windowing - 我认为这就是您在这种情况下真正想要的)。 两个支持标准流分区的库是 cyclops-react (I am the author) and jOOλ,其中 cyclops-react 扩展了(以添加窗口化等功能)。
cyclops-streams有一组静态函数StreamUtils用于对JavaStreams进行操作,还有splitAt、headAndTail、splitBy、partition等一系列函数用于分区。
要将一个 Stream window 转换为大小为 30 的嵌套 Stream 的 Stream,您可以使用 window 方法。
就 OP 而言,在 Streaming 术语中,将一个 Stream 拆分为多个给定大小的 Stream 是一个 Windowing 操作(而不是 Partitioning 操作)。
Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
有一个名为 ReactiveSeq that extends jool.Seq 的 Stream 扩展 class 并添加了 Windowing 功能,这可能会使代码更简洁。
ReactiveSeq<Integer> seq;
ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
正如 Tagir 在上面指出的那样,这不适合并行流。如果您想 window 或批处理您希望以多线程方式执行的流。 cyclops-react 中的 LazyFutureStream 可能会有用(窗口化在待办事项列表中,但现在可以使用普通的旧批处理)。
在这种情况下,数据将从执行 Stream 的多个线程传递到 Multi-Producer/Single-Consumer 无等待队列,并且来自该队列的顺序数据可以 windowed 在分发给线程之前再次。
Stream<List<Data>> batched = new LazyReact().range(0,1000)
.grouped(30)
.map(this::process);
IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
免责声明:本人是AbacusUtil的开发者
我找到的最优雅最纯粹的java8个解决这个问题的方法:
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
.mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
.collect(toList());
}
//
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
return (list.size() + batchSize- 1) / batchSize;
}
这是一个纯粹的 Java 解决方案,它是惰性求值的,而不是使用 List。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
方法 returns Stream<List<T>>
灵活。您可以通过 partition(something, 10).map(List::stream)
.
Stream<Stream<T>>
我找到了一个优雅的解决方案:Iterable parts = Iterables.partition(stream::iterator, size)
这是一个纯粹的 Java 8 解决方案 - 顺序和并行:
public <T> Collection<List<T>> chunk(Collection<T> collection, int chunkSize) {
final AtomicInteger index = new AtomicInteger();
return collection.stream()
.map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
// LinkedHashMap is used here just to preserve order
.collect(groupingBy(Entry::getKey, LinkedHashMap::new, mapping(Entry::getValue, toList())))
.values();
}
public <T> Collection<List<T>> chunkParallel(Collection<T> collection, int chunkSize) {
final AtomicInteger index = new AtomicInteger();
return collection.parallelStream()
.map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
// So far it is parallel processing ordering cannot be preserved,
// but we have to make it thread safe - using e.g. ConcurrentHashMap
.collect(groupingBy(Entry::getKey, ConcurrentHashMap::new, mapping(Entry::getValue, toList())))
.values();
}
这是一种高效的方式
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
public final class Partition<T> extends AbstractList<List<T>> {
private final List<T> list;
private final int chunkSize;
public Partition(List<T> list, int chunkSize) {
this.list = new ArrayList<>(list);
this.chunkSize = chunkSize;
}
public static <T> Partition<T> ofSize(List<T> list, int chunkSize) {
return new Partition<>(list, chunkSize);
}
@Override
public List<T> get(int index) {
int start = index * chunkSize;
int end = Math.min(start + chunkSize, list.size());
if (start > end) {
throw new IndexOutOfBoundsException("Index " + index + " is out of the list range <0," + (size() - 1) + ">");
}
return new ArrayList<>(list.subList(start, end));
}
@Override
public int size() {
return (int) Math.ceil((double) list.size() / (double) chunkSize);
}
}
用法
Partition<String> partition = Partition.ofSize(paCustomerCodes, chunkSize);
for (List<String> strings : partition) {
}