在 Apache Beam 中加入行
Joining rows in Apache Beam
我无法理解 Apache Beam 中的联接(例如 http://www.waitingforcode.com/apache-beam/joins-apache-beam/read)是否可以联接整行。
例如:
我有 2 个 CSV 格式的数据集,其中第一行是第 header 列。
第一个:
a,b,c,d
1,2,3,4
5,6,7,8
1,2,5,4
第二个:
c,d,e,f
3,4,9,10
我想在 c 和 d 列上左联接,这样我最终得到:
a,b,c,d,e,f
1,2,3,4,9,10
5,6,7,8,,
1,2,5,4,,
然而,Apache Beam 上的所有文档似乎都说 PCollection objects 在加入时需要是 KV<K, V>
类型,所以我将我的 PCollection objects 分解为 collection of KV<String, String>
objects(其中键是列 header,值是行值)。但是在那种情况下(您只有一个带有值的键)我看不到如何维护行格式。 KV(c,7) "know" 怎么会认为 KV(a,5) 来自同一行? Join 是指这种事情吗?
到目前为止我的代码:
PCollection<KV<String, String>> flightOutput = ...;
PCollection<KV<String, String>> arrivalWeatherDataForJoin = ...;
PCollection<KV<String, KV<String, String>>> output = Join.leftOuterJoin(flightOutput, arrivalWeatherDataForJoin, "");
是的,Join
是实用程序 class 来帮助像您这样的联接。它是 CoGropByKey
的包装器,参见 the corresponding section in the docs. The implementation of it is pretty short. Its tests 可能也有有用的示例。
您遇到的问题可能是由您选择密钥的方式引起的。
Join
库中的 KeyT
int KV<KeyT,V1>
表示您用来匹配记录的键,它包含所有连接字段。所以在你的情况下你可能需要分配这样的键(伪代码):
pCollection1:
Key Value
(3,4) (1,2,3,4)
(7,8) (5,6,7,8)
(5,4) (1,2,5,4)
pCollection2:
Key Value
(3,4) (3,4,9,10)
连接的结果看起来像这样(伪代码):
joinResultPCollection:
Key Value
(3,4) (1,2,3,4),(3,4,9,10)
(7,8) (5,6,7,8),nullValue
(5,4) (1,2,5,4),nullValue
因此,您可能需要在连接后添加另一个转换,才能真正将左侧和右侧合并为一个组合行。
因为您有 CSV,您可能可以使用像 "3,4"
这样的实际字符串作为键(和值)。或者您可以使用 Lists<>
或您的自定义行类型。
例如,这正是 Beam SQL Join implementation 所做的。
我无法理解 Apache Beam 中的联接(例如 http://www.waitingforcode.com/apache-beam/joins-apache-beam/read)是否可以联接整行。
例如:
我有 2 个 CSV 格式的数据集,其中第一行是第 header 列。
第一个:
a,b,c,d
1,2,3,4
5,6,7,8
1,2,5,4
第二个:
c,d,e,f
3,4,9,10
我想在 c 和 d 列上左联接,这样我最终得到:
a,b,c,d,e,f
1,2,3,4,9,10
5,6,7,8,,
1,2,5,4,,
然而,Apache Beam 上的所有文档似乎都说 PCollection objects 在加入时需要是 KV<K, V>
类型,所以我将我的 PCollection objects 分解为 collection of KV<String, String>
objects(其中键是列 header,值是行值)。但是在那种情况下(您只有一个带有值的键)我看不到如何维护行格式。 KV(c,7) "know" 怎么会认为 KV(a,5) 来自同一行? Join 是指这种事情吗?
到目前为止我的代码:
PCollection<KV<String, String>> flightOutput = ...;
PCollection<KV<String, String>> arrivalWeatherDataForJoin = ...;
PCollection<KV<String, KV<String, String>>> output = Join.leftOuterJoin(flightOutput, arrivalWeatherDataForJoin, "");
是的,Join
是实用程序 class 来帮助像您这样的联接。它是 CoGropByKey
的包装器,参见 the corresponding section in the docs. The implementation of it is pretty short. Its tests 可能也有有用的示例。
您遇到的问题可能是由您选择密钥的方式引起的。
Join
库中的 KeyT
int KV<KeyT,V1>
表示您用来匹配记录的键,它包含所有连接字段。所以在你的情况下你可能需要分配这样的键(伪代码):
pCollection1:
Key Value
(3,4) (1,2,3,4)
(7,8) (5,6,7,8)
(5,4) (1,2,5,4)
pCollection2:
Key Value
(3,4) (3,4,9,10)
连接的结果看起来像这样(伪代码):
joinResultPCollection:
Key Value
(3,4) (1,2,3,4),(3,4,9,10)
(7,8) (5,6,7,8),nullValue
(5,4) (1,2,5,4),nullValue
因此,您可能需要在连接后添加另一个转换,才能真正将左侧和右侧合并为一个组合行。
因为您有 CSV,您可能可以使用像 "3,4"
这样的实际字符串作为键(和值)。或者您可以使用 Lists<>
或您的自定义行类型。
例如,这正是 Beam SQL Join implementation 所做的。