上传到 google 云存储时,输出数据以随机顺序出现

Output data appears in a random order when uploaded to google cloud storage

我一直在使用 google-dataflow-sdk 将 CSV 文件上传到 google 云存储。 当我将文件上传到 google 云项目时,我的数据以随机顺序出现在云上的文件中。 csv 上的每一行都是正确的,但行到处都是。

csv 的 header )即attribute, attribute, attribute) 一直在另一行,永远不会在应该在的顶部。再次强调,每一列的数据都很好,只是随机排列的行。

这是最初读取数据的代码:

PCollection<String> csvData = pipeline.apply(TextIO.Read.named("ReadItems")
                                             .from(filename));

这是写入 google 云项目的代码:

csvData.apply(TextIO.Write.named("WriteToCloud")
                          .to("gs://dbm-poc/"+partnerId+"/"+dateOfReport+modifiedFileName)
                          .withSuffix(".csv"));

感谢您的帮助。

首先,要修复您的 header,请使用:

public static TextIO.Write.Bound<String> withHeader(@Nullable String header)

https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write#withHeader-java.lang.String-

例如:

...
TextIO.Write.withHeader("<header>").apply(..)
...

其次,Dataflow 目前不支持 ordered/sorted 写入 Sinks。这很可能是由于其 distributed/paralell 架构。如果您真的愿意,可以编写自己的自定义 Sink。有关详细信息,请参阅类似问题 here

虽然我同意 Graham Polley 提供的答案是正确的,但我设法找到了一种更简单的方法来让数据以有序的方式写入。

我改为使用 google 云存储库将我需要的文件存储到云中,如下所示:

public static String writeFile(byte[] content, String filename, String partnerId, String dateOfReport) {
    Storage storage = StorageOptions.defaultInstance().service();
    BlobId blobId = BlobId.of("dbm-poc", partnerId + "/" + dateOfReport + "-" + filename + ".csv");
    BlobInfo blobInfo = BlobInfo.builder(blobId).contentType("binary/octet-stream").build();
    storage.create(blobInfo, content);

    return filename;
}

public static byte[] readFile(String filename) throws IOException {
    return Files.readAllBytes(Paths.get(filename));
}

结合使用这两种方法,我不仅能够将文件上传到我想要的存储桶而不会丢失任何内容排序,而且我还能够更改上传文件的格式从文本到 binary/octet-stream 文件,这意味着可以访问和下载它。

这种方法似乎也不再需要通过管道来上传数据。