使用 Python 的 Apache Beam:如何计算会话 window 中的最小值,并将其应用于所有相关的 PCollections

Apache Beam with Python : How to compute the minimum in a session window, and apply it to all related PCollections

我正在使用 Apache Beam 的 Python SDK 来处理代表流分析命中的字典。多亏了会话 windows,这些点击才得以汇总。我的 DataFlow 真正需要做的就是应用这些会话 windows,并为所有相关点击分配一个会话 ID。

作为会话 ID,我发现我会使用第一次点击的时间戳(结合每个用户的 cookie ID)。这是我的管道:

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
        PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
    id_label='hit_id',
    timestamp_attribute='time')

hits = msgs | 'Parse' >> beam.Map(my_parser)

windowed_hits = hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))

visit_id = (windowed_hits | 'ExtractTimestamp' >> beam.Map(my_extracter)
    | 'GetMinimum' >> beam.CombineGlobally(my_min).without_defaults())

windowed_hits | 'SetVisitId' >>
    beam.Map(set_visit_id, visit_id=beam.pvalue.AsSingleton(visit_id))

my_parser 正在应用 literal_eval 将字符串转换为字典。 my_extracter 正在从命中中取出时间戳。 set_visit_id 只是接受一个参数并将其分配给键 visit_id.

这似乎不起作用。调试时,我的 visit_id 分支似乎正常工作,它在计算最小值之前等待会话结束。但是当用作辅助输入时,我只得到一个 pvalue.EmptySideInput。我怎样才能得到我想要的结果,为什么我的代码 return 是一个空的边输入?

编辑: 我用 AsIter 替换了 AsSingleton,以了解这里出了什么问题。我得到的是一个 _FilteringIterable :

所以我猜问题是这个_target_window,但我不明白为什么它的范围从 TS + 60 到 TS + 120。可能是因为 WindowedValue 的时间戳?这似乎是可能的,因为 _target_window 的边界似乎源自其舍入值。

我最终通过丢弃任何 Combine 并将其替换为 GroupByKey 来完成我想做的事情。

def my_parser(msg):
    result = literal_eval(msg)
    return result

def set_key(hit):
    return (hit['cid'], hit)

def set_vid2(keyed_hits):
    k, hits = keyed_hits
    visit_id = min([h['time'] for h in hits])
    for h in hits:
        h['visit_id'] = visit_id
    return hits

def unpack_list(l):
    for d in l:
        yield d

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/{PROJECT_ID}/topics/{SOURCE_TOPIC}'.format(
        PROJECT_ID=PROJECT_ID, SOURCE_TOPIC=SOURCE_TOPIC),
    id_label='hit_id',
    timestamp_attribute='time')

hits = msgs | 'Parse' >> beam.Map(my_parser)

keyed_hits = hits | 'SetKey' >> beam.Map(set_key)

windowed_hits = (keyed_hits | 'Windowing' >> beam.WindowInto(beam.window.Sessions(1 * 60))
    | 'Grouping' >> beam.GroupByKey())

clean_hits = windowed_hits | 'ComputeMin' >> beam.Map(set_vid2)

clean_hits | 'Unpack' >> beam.FlatMap(unpack_list)

在 GroupByKey 之后,我有一个包含命中列表的 PCollection(按 cookie ID + 会话 windows 分组)。然后,一旦访问 ID 被计算并设置为每次点击,我将我的点击列表的 PCollection 转换为点击的 PCollection,其中 unpack_list.

我不确定这是正确的方法,也不确定它是否对性能有任何影响。