Beam:CombinePerKey(max) 挂在数据流作业中

Beam: CombinePerKey(max) hang in dataflow job

我正在尝试通过 pub\sub 方式从 GCS 加载数据,并通过 userid 获取用户最高等级。以下代码在 DirectRunner 中运行良好,但作业在数据流中的 CombinePerKey(max) 中挂起。

这是代码

class ParseAndFilterFn(beam.DoFn):
    def process(self, element):
        text_line = element.strip()
        data = {}
        try:
            data = json.loads(text_line.decode('utf-8'))
            if 'user_id' in data and data['user_id'] and 'level' in data and data['level']:
                yield {
                    'user': data['user_id'],
                    'level': data['level'],
                    'ts': data['ts']
                }

def str2timestamp(t, fmt="%Y-%m-%dT%H:%M:%S.%fZ"):
    return time.mktime(datetime.strptime(t, fmt).timetuple())

class FormatFieldValueFn(beam.DoFn):
    def process(self, element):
        yield {
            "field": element[0],
            "value": element[1]
        }

...

        raw_event = (
                    p
                    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
                    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
                    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
                    | "Read File from GCS" >> beam.io.ReadAllFromText()
            )

        filtered_events = (
            raw_event
            | "ParseAndFilterFn" >> beam.ParDo(ParseAndFilterFn())
        )

        raw_events = (
            filtered_events
            | "AddEventTimestamps" >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, str2timestamp(elem['ts'])))
        )

        window_events = (
            raw_events
            | "UseFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(5 * 60))
        )

        user_max_level = (
            window_events
            | 'Group By User ID' >> beam.Map(lambda elem: (elem['user'], elem['level']))
            | 'Compute Max Level Per User' >> beam.CombinePerKey(max)
        )

        (user_max_level
         | "FormatFieldValueFn" >> beam.ParDo(FormatFieldValueFn())
        )

        p.run().wait_until_finish()        

然后我把一个新的zip文件放到GCS,然后数据流的管道是运行,但是挂在Compute Max Level Per User

有什么我遗漏的吗?

问题的根源可能与 Combine 转换上的水印和延迟有关(您可以阅读概念摘要 here)。水印可能是一个问题的原因是因为您使用 beam.Map 手动设置元素的时间戳,即使从 PubSub 源读取时已经设置了水印,因为它是一个设置自己的无界源时间戳。

ReadFromPubSub transform 有一个标记为 timestamp_attribute 的参数,这是在 PubSub 中使用属性时间戳的预期方式。如果将此参数设置为 ts,则 ReadFromPubSub 应发出时间戳已设置为 ts 的元素,并且水印也应适当设置。

如果这不起作用,您还可以查看其他内容。仔细检查时间戳是否设置正确是一个很好的初始步骤(将 ReadFromPubSub 生成的元素的时间戳与 ts 的值进行比较)。另一种可能性是在 windows 上设置触发器可能会有所帮助。例如,processing time trigger 可能能够防止 windows 永远等待水印赶上,尽管根据您的管道的需要它可能不合适。作为额外说明,您在上面截屏的指标有时对于 python 流式传输不可靠,因此如果您需要进行细粒度调试,您可以通过制作您可以通读的转换输出日志来获得更好的运气。