Beam:CombinePerKey(max) 挂在数据流作业中
Beam: CombinePerKey(max) hang in dataflow job
我正在尝试通过 pub\sub
方式从 GCS
加载数据,并通过 userid
获取用户最高等级。以下代码在 DirectRunner
中运行良好,但作业在数据流中的 CombinePerKey(max)
中挂起。
这是代码
class ParseAndFilterFn(beam.DoFn):
def process(self, element):
text_line = element.strip()
data = {}
try:
data = json.loads(text_line.decode('utf-8'))
if 'user_id' in data and data['user_id'] and 'level' in data and data['level']:
yield {
'user': data['user_id'],
'level': data['level'],
'ts': data['ts']
}
def str2timestamp(t, fmt="%Y-%m-%dT%H:%M:%S.%fZ"):
return time.mktime(datetime.strptime(t, fmt).timetuple())
class FormatFieldValueFn(beam.DoFn):
def process(self, element):
yield {
"field": element[0],
"value": element[1]
}
...
raw_event = (
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
| "Read File from GCS" >> beam.io.ReadAllFromText()
)
filtered_events = (
raw_event
| "ParseAndFilterFn" >> beam.ParDo(ParseAndFilterFn())
)
raw_events = (
filtered_events
| "AddEventTimestamps" >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, str2timestamp(elem['ts'])))
)
window_events = (
raw_events
| "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(5 * 60))
)
user_max_level = (
window_events
| 'Group By User ID' >> beam.Map(lambda elem: (elem['user'], elem['level']))
| 'Compute Max Level Per User' >> beam.CombinePerKey(max)
)
(user_max_level
| "FormatFieldValueFn" >> beam.ParDo(FormatFieldValueFn())
)
p.run().wait_until_finish()
然后我把一个新的zip文件放到GCS
,然后数据流的管道是运行,但是挂在Compute Max Level Per User
有什么我遗漏的吗?
问题的根源可能与 Combine 转换上的水印和延迟有关(您可以阅读概念摘要 here)。水印可能是一个问题的原因是因为您使用 beam.Map
手动设置元素的时间戳,即使从 PubSub 源读取时已经设置了水印,因为它是一个设置自己的无界源时间戳。
ReadFromPubSub
transform 有一个标记为 timestamp_attribute
的参数,这是在 PubSub 中使用属性时间戳的预期方式。如果将此参数设置为 ts
,则 ReadFromPubSub
应发出时间戳已设置为 ts
的元素,并且水印也应适当设置。
如果这不起作用,您还可以查看其他内容。仔细检查时间戳是否设置正确是一个很好的初始步骤(将 ReadFromPubSub
生成的元素的时间戳与 ts
的值进行比较)。另一种可能性是在 windows 上设置触发器可能会有所帮助。例如,processing time trigger 可能能够防止 windows 永远等待水印赶上,尽管根据您的管道的需要它可能不合适。作为额外说明,您在上面截屏的指标有时对于 python 流式传输不可靠,因此如果您需要进行细粒度调试,您可以通过制作您可以通读的转换输出日志来获得更好的运气。
我正在尝试通过 pub\sub
方式从 GCS
加载数据,并通过 userid
获取用户最高等级。以下代码在 DirectRunner
中运行良好,但作业在数据流中的 CombinePerKey(max)
中挂起。
这是代码
class ParseAndFilterFn(beam.DoFn):
def process(self, element):
text_line = element.strip()
data = {}
try:
data = json.loads(text_line.decode('utf-8'))
if 'user_id' in data and data['user_id'] and 'level' in data and data['level']:
yield {
'user': data['user_id'],
'level': data['level'],
'ts': data['ts']
}
def str2timestamp(t, fmt="%Y-%m-%dT%H:%M:%S.%fZ"):
return time.mktime(datetime.strptime(t, fmt).timetuple())
class FormatFieldValueFn(beam.DoFn):
def process(self, element):
yield {
"field": element[0],
"value": element[1]
}
...
raw_event = (
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
| "Read File from GCS" >> beam.io.ReadAllFromText()
)
filtered_events = (
raw_event
| "ParseAndFilterFn" >> beam.ParDo(ParseAndFilterFn())
)
raw_events = (
filtered_events
| "AddEventTimestamps" >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, str2timestamp(elem['ts'])))
)
window_events = (
raw_events
| "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(5 * 60))
)
user_max_level = (
window_events
| 'Group By User ID' >> beam.Map(lambda elem: (elem['user'], elem['level']))
| 'Compute Max Level Per User' >> beam.CombinePerKey(max)
)
(user_max_level
| "FormatFieldValueFn" >> beam.ParDo(FormatFieldValueFn())
)
p.run().wait_until_finish()
然后我把一个新的zip文件放到GCS
,然后数据流的管道是运行,但是挂在Compute Max Level Per User
有什么我遗漏的吗?
问题的根源可能与 Combine 转换上的水印和延迟有关(您可以阅读概念摘要 here)。水印可能是一个问题的原因是因为您使用 beam.Map
手动设置元素的时间戳,即使从 PubSub 源读取时已经设置了水印,因为它是一个设置自己的无界源时间戳。
ReadFromPubSub
transform 有一个标记为 timestamp_attribute
的参数,这是在 PubSub 中使用属性时间戳的预期方式。如果将此参数设置为 ts
,则 ReadFromPubSub
应发出时间戳已设置为 ts
的元素,并且水印也应适当设置。
如果这不起作用,您还可以查看其他内容。仔细检查时间戳是否设置正确是一个很好的初始步骤(将 ReadFromPubSub
生成的元素的时间戳与 ts
的值进行比较)。另一种可能性是在 windows 上设置触发器可能会有所帮助。例如,processing time trigger 可能能够防止 windows 永远等待水印赶上,尽管根据您的管道的需要它可能不合适。作为额外说明,您在上面截屏的指标有时对于 python 流式传输不可靠,因此如果您需要进行细粒度调试,您可以通过制作您可以通读的转换输出日志来获得更好的运气。