合并 Java 个流
Conflating Java streams
我有很大的版本化文档流,按文档 ID 和版本排序。
例如Av1、Av2、Bv1、Cv1、Cv2
我必须将其转换为另一个 Stream,其记录按文档 ID 聚合。
A[v1,v2],B[v1],C[v1,V2]
可以不使用 Collectors.groupBy()
来完成吗?我不想使用 groupBy()
因为它会在对它们进行分组之前将流中的所有项目加载到内存中。理论上,不需要将整个流加载到内存中,因为它是有序的。
这是我想出的解决方案:
Stream<Document> stream = Stream.of(
new Document("A", "v1"),
new Document("A", "v2"),
new Document("B", "v1"),
new Document("C", "v1"),
new Document("C", "v2")
);
Iterator<Document> iterator = stream.iterator();
Stream<GroupedDocument> result = Stream.generate(new Supplier<GroupedDocument>() {
Document lastDoc = null;
@Override
public GroupedDocument get() {
try {
Document doc = Optional.ofNullable(lastDoc).orElseGet(iterator::next);
String id = doc.getId();
GroupedDocument gd = new GroupedDocument(doc.getId());
gd.getVersions().add(doc.getVersion());
if (!iterator.hasNext()) {
return null;
}
while (iterator.hasNext() && (doc = iterator.next()).getId().equals(id)) {
gd.getVersions().add(doc.getVersion());
}
lastDoc = doc;
return gd;
} catch (NoSuchElementException ex) {
return null;
}
}
});
这里是 Document
和 GroupedDocument
类:
class Document {
private String id;
private String version;
public Document(String id, String version) {
this.id = id;
this.version = version;
}
public String getId() {
return id;
}
public String getVersion() {
return version;
}
}
class GroupedDocument {
private String id;
private List<String> versions;
public GroupedDocument(String id) {
this.id = id;
versions = new ArrayList<>();
}
public String getId() {
return id;
}
public List<String> getVersions() {
return versions;
}
@Override
public String toString() {
return "GroupedDocument{" +
"id='" + id + '\'' +
", versions=" + versions +
'}';
}
}
请注意,生成的流是无限流。在所有组之后会有无限数量的 null
s。您可以使用 Java 9 中的 takeWhile
获取所有不为 null 的元素,或者查看此 post.
Map<String, Stream<String>>
会帮助您解决您的需要吗?
A - v1, v2
B - v1
C - v1, v2
String[] docs = { "Av1", "Av2", "Bv1", "Cv1", "Cv2"};
Map<String, Stream<String>> map = Stream.<String>of(docs).
map(s ->s.substring(0, 1)).distinct(). //leave only A B C
collect(Collectors.toMap( s1 -> s1, //A B C as keys
s1 ->Stream.<String>of(docs). //value is filtered stream of docs
filter(s2 -> s1.substring(0, 1).
equals(s2.substring(0, 1)) ).
map(s3 -> s3.substring(1, s3.length())) //trim A B C
));
您可以为此使用 groupRuns
in the StreamEx library:
class Document {
public String id;
public int version;
public Document(String id, int version) {
this.id = id;
this.version = version;
}
public String toString() {
return "Document{"+id+version+ "}";
}
}
public class MyClass {
private static List<Document> docs = asList(
new Document("A", 1),
new Document("A", 2),
new Document("B", 1),
new Document("C", 1),
new Document("C", 2)
);
public static void main(String args[]) {
StreamEx<List<Document>> groups = StreamEx.of(docs).groupRuns((l, r) -> l.id.equals(r.id));
for (List<Document> grp: groups.collect(toList())) {
out.println(grp);
}
}
}
输出:
[Document{A1}, Document{A2}]
[Document{B1}]
[Document{C1}, Document{C2}]
我无法验证这不会消耗整个流,但我无法想象为什么它需要给出 groupRuns
的目的。
我有很大的版本化文档流,按文档 ID 和版本排序。
例如Av1、Av2、Bv1、Cv1、Cv2
我必须将其转换为另一个 Stream,其记录按文档 ID 聚合。
A[v1,v2],B[v1],C[v1,V2]
可以不使用 Collectors.groupBy()
来完成吗?我不想使用 groupBy()
因为它会在对它们进行分组之前将流中的所有项目加载到内存中。理论上,不需要将整个流加载到内存中,因为它是有序的。
这是我想出的解决方案:
Stream<Document> stream = Stream.of(
new Document("A", "v1"),
new Document("A", "v2"),
new Document("B", "v1"),
new Document("C", "v1"),
new Document("C", "v2")
);
Iterator<Document> iterator = stream.iterator();
Stream<GroupedDocument> result = Stream.generate(new Supplier<GroupedDocument>() {
Document lastDoc = null;
@Override
public GroupedDocument get() {
try {
Document doc = Optional.ofNullable(lastDoc).orElseGet(iterator::next);
String id = doc.getId();
GroupedDocument gd = new GroupedDocument(doc.getId());
gd.getVersions().add(doc.getVersion());
if (!iterator.hasNext()) {
return null;
}
while (iterator.hasNext() && (doc = iterator.next()).getId().equals(id)) {
gd.getVersions().add(doc.getVersion());
}
lastDoc = doc;
return gd;
} catch (NoSuchElementException ex) {
return null;
}
}
});
这里是 Document
和 GroupedDocument
类:
class Document {
private String id;
private String version;
public Document(String id, String version) {
this.id = id;
this.version = version;
}
public String getId() {
return id;
}
public String getVersion() {
return version;
}
}
class GroupedDocument {
private String id;
private List<String> versions;
public GroupedDocument(String id) {
this.id = id;
versions = new ArrayList<>();
}
public String getId() {
return id;
}
public List<String> getVersions() {
return versions;
}
@Override
public String toString() {
return "GroupedDocument{" +
"id='" + id + '\'' +
", versions=" + versions +
'}';
}
}
请注意,生成的流是无限流。在所有组之后会有无限数量的 null
s。您可以使用 Java 9 中的 takeWhile
获取所有不为 null 的元素,或者查看此 post.
Map<String, Stream<String>>
会帮助您解决您的需要吗?
A - v1, v2
B - v1
C - v1, v2
String[] docs = { "Av1", "Av2", "Bv1", "Cv1", "Cv2"};
Map<String, Stream<String>> map = Stream.<String>of(docs).
map(s ->s.substring(0, 1)).distinct(). //leave only A B C
collect(Collectors.toMap( s1 -> s1, //A B C as keys
s1 ->Stream.<String>of(docs). //value is filtered stream of docs
filter(s2 -> s1.substring(0, 1).
equals(s2.substring(0, 1)) ).
map(s3 -> s3.substring(1, s3.length())) //trim A B C
));
您可以为此使用 groupRuns
in the StreamEx library:
class Document {
public String id;
public int version;
public Document(String id, int version) {
this.id = id;
this.version = version;
}
public String toString() {
return "Document{"+id+version+ "}";
}
}
public class MyClass {
private static List<Document> docs = asList(
new Document("A", 1),
new Document("A", 2),
new Document("B", 1),
new Document("C", 1),
new Document("C", 2)
);
public static void main(String args[]) {
StreamEx<List<Document>> groups = StreamEx.of(docs).groupRuns((l, r) -> l.id.equals(r.id));
for (List<Document> grp: groups.collect(toList())) {
out.println(grp);
}
}
}
输出:
[Document{A1}, Document{A2}]
[Document{B1}]
[Document{C1}, Document{C2}]
我无法验证这不会消耗整个流,但我无法想象为什么它需要给出 groupRuns
的目的。