合并 Java 个流

Conflating Java streams

我有很大的版本化文档流,按文档 ID 和版本排序。

例如Av1、Av2、Bv1、Cv1、Cv2

我必须将其转换为另一个 Stream,其记录按文档 ID 聚合。

A[v1,v2],B[v1],C[v1,V2]

可以不使用 Collectors.groupBy() 来完成吗?我不想使用 groupBy() 因为它会在对它们进行分组之前将流中的所有项目加载到内存中。理论上,不需要将整个流加载到内存中,因为它是有序的。

这是我想出的解决方案:

    Stream<Document> stream = Stream.of(
            new Document("A", "v1"),
            new Document("A", "v2"),
            new Document("B", "v1"),
            new Document("C", "v1"),
            new Document("C", "v2")
    );

    Iterator<Document> iterator = stream.iterator();
    Stream<GroupedDocument> result = Stream.generate(new Supplier<GroupedDocument>() {

        Document lastDoc = null;
        @Override
        public GroupedDocument get() {
            try {
                Document doc = Optional.ofNullable(lastDoc).orElseGet(iterator::next);

                String id = doc.getId();
                GroupedDocument gd = new GroupedDocument(doc.getId());
                gd.getVersions().add(doc.getVersion());

                if (!iterator.hasNext()) {
                    return null;
                }

                while (iterator.hasNext() && (doc = iterator.next()).getId().equals(id)) {
                    gd.getVersions().add(doc.getVersion());
                }
                lastDoc = doc;
                return gd;
            } catch (NoSuchElementException ex) {
                return null;
            }
        }
    });

这里是 DocumentGroupedDocument 类:

class Document {
    private String id;
    private String version;

    public Document(String id, String version) {
        this.id = id;
        this.version = version;
    }

    public String getId() {
        return id;
    }

    public String getVersion() {
        return version;
    }
}

class GroupedDocument {
    private String id;
    private List<String> versions;

    public GroupedDocument(String id) {
        this.id = id;
        versions = new ArrayList<>();
    }

    public String getId() {
        return id;
    }

    public List<String> getVersions() {
        return versions;
    }

    @Override
    public String toString() {
        return "GroupedDocument{" +
                "id='" + id + '\'' +
                ", versions=" + versions +
                '}';
    }
}

请注意,生成的流是无限流。在所有组之后会有无限数量的 nulls。您可以使用 Java 9 中的 takeWhile 获取所有不为 null 的元素,或者查看此 post.

Map<String, Stream<String>> 会帮助您解决您的需要吗?

A - v1, v2
B - v1
C - v1, v2

 String[] docs = { "Av1", "Av2", "Bv1", "Cv1", "Cv2"};
 Map<String, Stream<String>> map = Stream.<String>of(docs).
         map(s ->s.substring(0, 1)).distinct().                      //leave only A B C
            collect(Collectors.toMap( s1 -> s1,                      //A B C as keys
                                      s1 ->Stream.<String>of(docs).  //value is filtered stream of docs
                                        filter(s2 -> s1.substring(0, 1).
                                          equals(s2.substring(0, 1)) ).
                                            map(s3 -> s3.substring(1, s3.length())) //trim A B C
                                     ));        

您可以为此使用 groupRuns in the StreamEx library

class Document {
    public String id;
    public int version;
    public Document(String id, int version) {
        this.id = id;
        this.version = version;
    }
    public String toString() {
        return "Document{"+id+version+ "}";
    }
}

public class MyClass {
    private static List<Document> docs = asList(
        new Document("A", 1),
        new Document("A", 2),
        new Document("B", 1),
        new Document("C", 1),
        new Document("C", 2)
    );

    public static void main(String args[]) {
        StreamEx<List<Document>> groups = StreamEx.of(docs).groupRuns((l, r) -> l.id.equals(r.id));
        for (List<Document> grp: groups.collect(toList())) {
            out.println(grp);
        }
    }
}

输出:

[Document{A1}, Document{A2}]
[Document{B1}]
[Document{C1}, Document{C2}]

我无法验证这不会消耗整个流,但我无法想象为什么它需要给出 groupRuns 的目的。