从 PCollection<TableRow> 转换为 PCollection<KV<K,V>>
Convert from PCollection<TableRow> to PCollection<KV<K,V>>
我正在尝试从 BigQuery 中的 2 个表中提取数据,然后通过 CoGroupByKey 将其加入。
虽然 BigQuery 的输出是 PCollection<TableRow>
,但 CoGroupByKey
需要 PCollection<KV<K,V>>
。
如何从 PCollection<TableRow>
转换为 PCollection<KV<K,V>>
?
CoGroupByKey
需要知道 CoGroup
的哪个键 - 这是 KV<K, V>
中的 K
,而 V
是关联的值此集合中的此键。将多个集合组合在一起的结果将为每个键提供每个集合中具有该键的所有值。
因此,您需要将两个 PCollection<TableRow>
转换为 PCollection<KV<YourKey, TableRow>>
,其中 YourKey
是您要加入它们的密钥类型,例如在您的情况下,可能是 String
、Integer
或其他。
进行转换的最佳转换可能是 WithKeys
。例如。这是一个代码示例,将 PCollection<TableRow>
转换为 PCollection<KV<String, TableRow>>
,由 String
:
类型的假设 userId
字段键入
PCollection<TableRow> rows = ...;
PCollection<KV<String, TableRow>> rowsKeyedByUser = rows
.apply(WithKeys.of(new SerializableFunction<TableRow, String>() {
@Override
public String apply(TableRow row) {
return (String)row.get("userId");
}
}));
我正在尝试从 BigQuery 中的 2 个表中提取数据,然后通过 CoGroupByKey 将其加入。
虽然 BigQuery 的输出是 PCollection<TableRow>
,但 CoGroupByKey
需要 PCollection<KV<K,V>>
。
如何从 PCollection<TableRow>
转换为 PCollection<KV<K,V>>
?
CoGroupByKey
需要知道 CoGroup
的哪个键 - 这是 KV<K, V>
中的 K
,而 V
是关联的值此集合中的此键。将多个集合组合在一起的结果将为每个键提供每个集合中具有该键的所有值。
因此,您需要将两个 PCollection<TableRow>
转换为 PCollection<KV<YourKey, TableRow>>
,其中 YourKey
是您要加入它们的密钥类型,例如在您的情况下,可能是 String
、Integer
或其他。
进行转换的最佳转换可能是 WithKeys
。例如。这是一个代码示例,将 PCollection<TableRow>
转换为 PCollection<KV<String, TableRow>>
,由 String
:
userId
字段键入
PCollection<TableRow> rows = ...;
PCollection<KV<String, TableRow>> rowsKeyedByUser = rows
.apply(WithKeys.of(new SerializableFunction<TableRow, String>() {
@Override
public String apply(TableRow row) {
return (String)row.get("userId");
}
}));