如何比较 python 中同一个 PCollection 中两个键的所有值?

How to compare all the values of two keys with in same PCollection in python?

我是 Apache 新手 Beam/dataflow。我正在阅读 Apache Beam 中的 BigQuery table,我想按两个不同的列进行分组,并比较两个不同键的所有值。我创建了一个包含两个不同列(ID、日期)的元组作为键。以下是 table

中的示例数据
  ID         Date        P_id    position
  "abc"    2019-08-01   "rt56"      5
  "abc"    2019-08-01   "rt57"      6
  "abc"    2019-08-01   "rt58"      7
  "abc"    2019-08-02   "rt56"      2 
  "abc"    2019-08-02   "rt57"      4
  "abc"    2019-08-02   "rt58"      7

现在我想比较 P_ids 对 ("abc", 2019-08-01) 和 ("abc", 2019-08-02) 的位置,看看如果任何 P_id 位置发生变化,则在 table "status" 中添加另一列为 True。所以我的新 table 应该像下面这样

我正在尝试使用以下代码

  ID         Date        P_id    position  Status
  "abc"    2019-08-01   "rt56"      5       False (as this is first date)
  "abc"    2019-08-01   "rt57"      6
  "abc"    2019-08-01   "rt58"      7
  "abc"    2019-08-02   "rt56"      2       True
  "abc"    2019-08-02   "rt57"      4
  "abc"    2019-08-02   "rt58"      7
(
p 
| "get_key_tuple" >> beam.ParDo(lambda element: tuple(element["Id"], element["Date]))
| "group_by" >> beam.GroupByKey()
| "compare_and_add_status" >> beam.ParDo(compare_pos)
)

但我不知道我应该如何处理函数 compare_pos()

考虑到我有一个非常大的 table 和很多 ID,如果我能得到一些关于如何有效地比较位置并创建一个新列来了解状态的想法将非常有帮助。

Beam 的 GroupByKey 采用 2 元组的 PCollection 和 returns PCollection,其中每个元素都是键的 2 元组和与该键关联的所有值的(无序)可迭代对象。例如,如果您的原始集合包含元素

(k1, v1)
(k1, v2)
(k1, v3)
(k2, v4)

GroupByKey 的结果将是一个包含

等元素的 PCollection
(k1, [v1, v3, v2])
(k2, [v4])

在你的例子中,你的键和值本身就是元组。因此,您可以获取原始集合并应用 Map(lambda elt: ((elt['Id'], elt['Date']), (elt['P_id'], elt['position']))),这将为您提供包含元素

的 PCollection
  ("abc", 2019-08-01),   ("rt56", 5)
  ("abc", 2019-08-01),   ("rt57", 6)
  ("abc", 2019-08-01),   ("rt58", 7)
  ("abc", 2019-08-02),   ("rt56", 2)
  ("abc", 2019-08-02),   ("rt57", 4)
  ("abc", 2019-08-02),   ("rt58", 7)

应用 GroupByKey 后将变为

  ("abc", 2019-08-01),   [("rt56", 5), ("rt57", 6), ("rt58", 7)]
  ("abc", 2019-08-02),   [("rt56", 2), ("rt57", 4), ("rt58", 7)]

此时您的 compare_pos 函数可以检查与给定 ID, Date 对对应的所有 P_id, position 元组,并执行任何需要的逻辑来发出需要更改的内容(使用对应的键)。

我可能对 OP 的理解有误,但如果@robertwb 的建议不起作用,请尝试按以下分组:

| "Create k, v tuple" >> beam.Map(
                    lambda elem: ((elem["P_id"], elem["ID"]), [elem["Date"], elem["position"]]))
| "Group by key" >> beam.GroupByKey()

这将输出以下结构:

(('rt56', 'abc'), [['2019-08-01', 5], ['2019-08-02', 2]])
(('rt57', 'abc'), [['2019-08-01', 6], ['2019-08-02', 4]])
(('rt58', 'abc'), [['2019-08-01', 7], ['2019-08-02', 7]])

这应该允许您单独比较生成的 PCollection 中的每个元素,而不是交叉比较 PCollection 中的元素。如果我是正确的,这应该更适合 Beam 的执行模型。

这是基于我的假设,即您想检查给定 P_id 的头寸在两个日期之间是否发生了变化。