Groupby apache beam java 中 json 字符串行中存在的现有属性

Groupby existing attribute present in json string line in apache beam java

我正在从 GCS 读取 json 文件,我必须将数据加载到不同的 BigQuery table。这些文件可能有同一客户的多条记录,时间戳不同。我必须为每个客户挑选最新的。我计划实现如下

  1. 读取文件
  2. 按客户 ID 分组
  3. 应用DoFn比较每组记录的时间戳,只有最新的一个
  4. 扁平化,转换为 table 行插入 BQ。

但我无法继续第 1 步。我看到 GroupByKey.create() 但无法使其使用客户 ID 作为键。

我正在使用 JAVA 实施。任何建议都会有很大帮助。谢谢。

GroupByKey 之前,您需要将数据集设置为键值对。如果您已经展示了您的一些代码,那将会很好,但是在您了解不多的情况下,您会执行以下操作:

PCollection<JsonObject> objects = p.apply(FileIO.read(....)).apply(FormatData...)

// Once we have the data in JsonObjects, we key by customer ID:
PCollection<KV<String, Iterable<JsonObject>>> groupedData = 
     objects.apply(MapElements.via(elm -> KV.of(elm.getString("customerId"), elm)))
            .apply(GroupByKey.create())

完成后,您可以检查时间戳并丢弃所有最新的机器人。

请注意,您需要设置编码器等 - 如果您遇到困难,我们可以迭代。

作为提示/技巧,您可以考虑this example of a Json Coder