Groupby apache beam java 中 json 字符串行中存在的现有属性
Groupby existing attribute present in json string line in apache beam java
我正在从 GCS 读取 json 文件,我必须将数据加载到不同的 BigQuery table。这些文件可能有同一客户的多条记录,时间戳不同。我必须为每个客户挑选最新的。我计划实现如下
- 读取文件
- 按客户 ID 分组
- 应用DoFn比较每组记录的时间戳,只有最新的一个
- 扁平化,转换为 table 行插入 BQ。
但我无法继续第 1 步。我看到 GroupByKey.create()
但无法使其使用客户 ID 作为键。
我正在使用 JAVA 实施。任何建议都会有很大帮助。谢谢。
在 GroupByKey
之前,您需要将数据集设置为键值对。如果您已经展示了您的一些代码,那将会很好,但是在您了解不多的情况下,您会执行以下操作:
PCollection<JsonObject> objects = p.apply(FileIO.read(....)).apply(FormatData...)
// Once we have the data in JsonObjects, we key by customer ID:
PCollection<KV<String, Iterable<JsonObject>>> groupedData =
objects.apply(MapElements.via(elm -> KV.of(elm.getString("customerId"), elm)))
.apply(GroupByKey.create())
完成后,您可以检查时间戳并丢弃所有最新的机器人。
请注意,您需要设置编码器等 - 如果您遇到困难,我们可以迭代。
作为提示/技巧,您可以考虑this example of a Json Coder。
我正在从 GCS 读取 json 文件,我必须将数据加载到不同的 BigQuery table。这些文件可能有同一客户的多条记录,时间戳不同。我必须为每个客户挑选最新的。我计划实现如下
- 读取文件
- 按客户 ID 分组
- 应用DoFn比较每组记录的时间戳,只有最新的一个
- 扁平化,转换为 table 行插入 BQ。
但我无法继续第 1 步。我看到 GroupByKey.create()
但无法使其使用客户 ID 作为键。
我正在使用 JAVA 实施。任何建议都会有很大帮助。谢谢。
在 GroupByKey
之前,您需要将数据集设置为键值对。如果您已经展示了您的一些代码,那将会很好,但是在您了解不多的情况下,您会执行以下操作:
PCollection<JsonObject> objects = p.apply(FileIO.read(....)).apply(FormatData...)
// Once we have the data in JsonObjects, we key by customer ID:
PCollection<KV<String, Iterable<JsonObject>>> groupedData =
objects.apply(MapElements.via(elm -> KV.of(elm.getString("customerId"), elm)))
.apply(GroupByKey.create())
完成后,您可以检查时间戳并丢弃所有最新的机器人。
请注意,您需要设置编码器等 - 如果您遇到困难,我们可以迭代。
作为提示/技巧,您可以考虑this example of a Json Coder。