使用数据流对对象进行 GroupByKey?
GroupByKey on an object using Dataflow?
我正在通过 apache-beam 编写一个简单的 python 管道来汇总用户投票。
在输入中,我有这样的逗号分隔行:
pollA,answerB
pollA,answerC
pollB,answerA
pollB,answerB
pollC,answerE
pollA,answerB
接下来,我使用 ParDo 函数将每一行转换成这样的对象:
输出:
{
pollId: pollA,
answerId: answerB,
votes: 1
}
函数:
class Split(beam.DoFn):
def process(self, element):
pollId, answerId = element.split(",")
return [{
'pollId': pollId,
'answerId': answerId,
'votes': 1
}]
现在,假设我有 3 个 answerB,我想按 answerId 将它们分组并计算它们以输出类似的内容:
{
pollId: pollA,
answerId: answerB,
votes: 3
}
我是 python 和 apache-beam 的新手,所以非常感谢您的帮助:)
一个答案是意识到您的每条记录都可以描述为:
- 键:
pollId + answerId
- 值:
1 // The vote
如果你因此有一个 PCollection 是 Key/Value 这种形式的对,你可以对那个集合执行 CombinePerKey(sum)
,这将聚合所有具有相同键的项目,将它们的值相加,给你一个新的 PCollection,由新的 Key/Value 对组成,其中它们的值是具有相同 pollId
和 answerId
.
的所有记录的总和
例如,请参阅 CombinePerKey Python 文档了解此函数的用法。
我正在通过 apache-beam 编写一个简单的 python 管道来汇总用户投票。
在输入中,我有这样的逗号分隔行:
pollA,answerB
pollA,answerC
pollB,answerA
pollB,answerB
pollC,answerE
pollA,answerB
接下来,我使用 ParDo 函数将每一行转换成这样的对象:
输出:
{
pollId: pollA,
answerId: answerB,
votes: 1
}
函数:
class Split(beam.DoFn):
def process(self, element):
pollId, answerId = element.split(",")
return [{
'pollId': pollId,
'answerId': answerId,
'votes': 1
}]
现在,假设我有 3 个 answerB,我想按 answerId 将它们分组并计算它们以输出类似的内容:
{
pollId: pollA,
answerId: answerB,
votes: 3
}
我是 python 和 apache-beam 的新手,所以非常感谢您的帮助:)
一个答案是意识到您的每条记录都可以描述为:
- 键:
pollId + answerId
- 值:
1 // The vote
如果你因此有一个 PCollection 是 Key/Value 这种形式的对,你可以对那个集合执行 CombinePerKey(sum)
,这将聚合所有具有相同键的项目,将它们的值相加,给你一个新的 PCollection,由新的 Key/Value 对组成,其中它们的值是具有相同 pollId
和 answerId
.
例如,请参阅 CombinePerKey Python 文档了解此函数的用法。