有没有一种方法可以按一段时间对数据流管道中的其他字段执行计算的字段对记录进行分组?
Is there a way to group records by fields performing calculations on other fields in a dataflow pipeline for a period?
我正在尝试使用 Dataflow 创建一个流式传输管道,该管道从 PubSub 主题读取消息并将分组结果写入 BigQuery table。我不想使用任何模板。目前我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以执行从 Pubsub 到达的数据的转换过程(消息的结构是什么table 预计)。在此过程中,我想按字段 "A" 和 "B" 进行分组,并计算总出现次数、字段 "C" 的总和以及字段 "D" 的平均值。
PubSub主题发布的消息如下:
{"A":"Alpha", "B":"V1", "C":3, "D":12}
{"A":"Alpha", "B":"V1", "C":5, "D":14}
{"A":"Alpha", "B":"V1", "C":3, "D":22}
{"A":"Beta", "B":"V1", "C":2, "D":6}
{"A":"Beta", "B":"V1", "C":7, "D":19}
{"A":"Beta", "B":"V2", "C":3, "D":10}
{"A":"Beta", "B":"V2", "C":5, "D":12}
这些记录的输出应该是这样的:
{"A-B":"AlphaV1", "Occurs":3, "sum_C":11, "avg_D":16}
{"A-B":"BetaV1", "Occurs":2, "sum_C":9, "avg_D":12.5}
{"A-B":"BetaV2", "Occurs":2, "sum_C":8, "avg_D":11}
如何在 Apache Beam 中定义一个函数来执行该过程?
谢谢!
您可以使用简单的 GroupByKey 和自定义聚合来完成所有这些操作。您需要自己思考一个大问题:您希望如何 window 您的数据?
您需要 window 您的数据,因为运行器需要弄清楚何时停止等待同一键上的更多数据。如果您遇到困难,我们很乐意多聊聊 windowing。
这是执行聚合的方法,我们只是 "assume" windowing:
def compute_keys(elm):
key = '%s%s' % (elm.get('A'), elm.get('B'))
return (key, elm)
def perform_aggregations_per_key(key_values):
key, values = key_values
values = list(values) # This will load all values for a single key into memory!
sum_C = sum(v['C'] for v in values)
avg_D = sum(v['D'] for v in values) / len(values)
occurs = len(values)
return {'A-B': key,
'Occurs': occurs,
'sum_C': sum_C,
'avg_D': avg_D}
my_inputs = (p | ReadFromPubSub(.....))
windowed_inputs = (my_inputs
| beam.WindowInto(....)) # You need to window your stream
result = (windowed_inputs
| beam.Map(compute_keys)
| beam.GroupByKey()
| beam.Map(perform_aggregations_per_key))
我正在尝试使用 Dataflow 创建一个流式传输管道,该管道从 PubSub 主题读取消息并将分组结果写入 BigQuery table。我不想使用任何模板。目前我只想在从 Google VM 实例执行的 Python3 脚本中创建一个管道,以执行从 Pubsub 到达的数据的转换过程(消息的结构是什么table 预计)。在此过程中,我想按字段 "A" 和 "B" 进行分组,并计算总出现次数、字段 "C" 的总和以及字段 "D" 的平均值。
PubSub主题发布的消息如下:
{"A":"Alpha", "B":"V1", "C":3, "D":12}
{"A":"Alpha", "B":"V1", "C":5, "D":14}
{"A":"Alpha", "B":"V1", "C":3, "D":22}
{"A":"Beta", "B":"V1", "C":2, "D":6}
{"A":"Beta", "B":"V1", "C":7, "D":19}
{"A":"Beta", "B":"V2", "C":3, "D":10}
{"A":"Beta", "B":"V2", "C":5, "D":12}
这些记录的输出应该是这样的:
{"A-B":"AlphaV1", "Occurs":3, "sum_C":11, "avg_D":16}
{"A-B":"BetaV1", "Occurs":2, "sum_C":9, "avg_D":12.5}
{"A-B":"BetaV2", "Occurs":2, "sum_C":8, "avg_D":11}
如何在 Apache Beam 中定义一个函数来执行该过程?
谢谢!
您可以使用简单的 GroupByKey 和自定义聚合来完成所有这些操作。您需要自己思考一个大问题:您希望如何 window 您的数据?
您需要 window 您的数据,因为运行器需要弄清楚何时停止等待同一键上的更多数据。如果您遇到困难,我们很乐意多聊聊 windowing。
这是执行聚合的方法,我们只是 "assume" windowing:
def compute_keys(elm):
key = '%s%s' % (elm.get('A'), elm.get('B'))
return (key, elm)
def perform_aggregations_per_key(key_values):
key, values = key_values
values = list(values) # This will load all values for a single key into memory!
sum_C = sum(v['C'] for v in values)
avg_D = sum(v['D'] for v in values) / len(values)
occurs = len(values)
return {'A-B': key,
'Occurs': occurs,
'sum_C': sum_C,
'avg_D': avg_D}
my_inputs = (p | ReadFromPubSub(.....))
windowed_inputs = (my_inputs
| beam.WindowInto(....)) # You need to window your stream
result = (windowed_inputs
| beam.Map(compute_keys)
| beam.GroupByKey()
| beam.Map(perform_aggregations_per_key))