使用循环分支 Apache Beam 管道
Branching Apache Beam pipelines with loops
我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织 table:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
我的方法是执行一个 for 循环以按 class
进行过滤,因为我知道 类 先验列表,然后加入生成的 pcollections。
高级代码:
CLASSES = ["A", "B", "C"]
tables = [
(
data
| "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for cin CLASSES
]
和连接:
_ = (
tables
| "Flatten" >> beam.Flatten()
| "Join Collections" >> beam.GroupByKey()
| "Remove key" >> beam.MapTuple(lambda _, val: val)
| "Merge dicts" >> beam.ParDo(mergeDicts())
| "Write to GCS" >> beam.io.WriteToText(output_file)
)
with(根据 Peter Kim 的建议进行编辑):
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我的问题是,当管道在 Apache Beam 计算引擎中执行时,我获得了由列表的最后一个元素过滤的相同 pcollections,在本例中为 C。
[已添加] 对于所有调用的分支,Apache Beam 引擎似乎采用最终状态的迭代变量,这意味着迭代列表的最后一个元素。
我显然采用了错误的方法,但哪种方法应该是执行此操作的最佳方法?
根据您显示的结果 table,我假设您希望输出如下所示:
{'itemid': '1', 'value B': 10.3, 'value A': 0.2}
{'itemid': '2', 'value B': 0.2, 'value A': 3.0}
{'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}
您的 mergeDicts 正在覆盖值,因为字典每个键只能保存一个值。将 mergeDicts 更新为类似这样的内容以指定键:
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我在这里发布一个我自己找到的解决方案,但我没有检查它是否为正确答案,因为我想更好地理解Beam引擎的执行逻辑。
为了根据条件获得单独的 pcollection,我没有在循环中按项目过滤 table,而是使用了 beam.Partition
class。通过直接应用文档中的代码示例,我将 pcollection 分成多个 tables,准备加入。
这样就避免了这个问题,但是我不清楚为什么 for 循环没有像我预期的那样工作。
您 运行 遇到的是关于 closures, loops, and Python scoping 的令人惊讶的陷阱。您可以通过分配变量而不是将其从闭包中拉出来解决这个问题。例如
tables = [
(
data
# Pass it as a side input to Filter.
| "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
或
tables = [
(
data
# Explicitly capture it as a default value in the lambda.
| "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
分区在这里也很有效,既可以避免这种陷阱,也可以表达您的意图。
我正在尝试执行去规范化操作,我需要使用以下逻辑重新组织 table:
| itemid | class | value |
+--------+-------+-------+
| 1 | A | 0.2 | | itemid | value A | value B | value C |
| 1 | B | 10.3 | ==> +--------+---------+---------+---------+
| 2 | A | 3.0 | ==> | 1 | 0.2 | 10.3 | |
| 2 | B | 0.2 | ==> | 2 | 3.0 | 0.2 | |
| 3 | A | 0.0 | | 3 | 0.0 | 1.2 | 5.4 |
| 3 | B | 1.2 |
| 3 | C | 5.4 |
我的方法是执行一个 for 循环以按 class
进行过滤,因为我知道 类 先验列表,然后加入生成的 pcollections。
高级代码:
CLASSES = ["A", "B", "C"]
tables = [
(
data
| "Filter by Language" >> beam.Filter(lambda elem: elem["class"]==c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for cin CLASSES
]
和连接:
_ = (
tables
| "Flatten" >> beam.Flatten()
| "Join Collections" >> beam.GroupByKey()
| "Remove key" >> beam.MapTuple(lambda _, val: val)
| "Merge dicts" >> beam.ParDo(mergeDicts())
| "Write to GCS" >> beam.io.WriteToText(output_file)
)
with(根据 Peter Kim 的建议进行编辑):
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我的问题是,当管道在 Apache Beam 计算引擎中执行时,我获得了由列表的最后一个元素过滤的相同 pcollections,在本例中为 C。
[已添加] 对于所有调用的分支,Apache Beam 引擎似乎采用最终状态的迭代变量,这意味着迭代列表的最后一个元素。
我显然采用了错误的方法,但哪种方法应该是执行此操作的最佳方法?
根据您显示的结果 table,我假设您希望输出如下所示:
{'itemid': '1', 'value B': 10.3, 'value A': 0.2}
{'itemid': '2', 'value B': 0.2, 'value A': 3.0}
{'itemid': '3', 'value B': 1.2, 'value A': 0.0, 'value C': 5.4}
您的 mergeDicts 正在覆盖值,因为字典每个键只能保存一个值。将 mergeDicts 更新为类似这样的内容以指定键:
class mergeDicts(beam.DoFn):
process(self, elements):
result = {}
for dictionary in elements:
if len(dictionary)>0:
result["itemid"] = dictionary["itemid"]
result["value {}".format(dictionary["class"])] = dictionary["value"]
yield result
我在这里发布一个我自己找到的解决方案,但我没有检查它是否为正确答案,因为我想更好地理解Beam引擎的执行逻辑。
为了根据条件获得单独的 pcollection,我没有在循环中按项目过滤 table,而是使用了 beam.Partition
class。通过直接应用文档中的代码示例,我将 pcollection 分成多个 tables,准备加入。
这样就避免了这个问题,但是我不清楚为什么 for 循环没有像我预期的那样工作。
您 运行 遇到的是关于 closures, loops, and Python scoping 的令人惊讶的陷阱。您可以通过分配变量而不是将其从闭包中拉出来解决这个问题。例如
tables = [
(
data
# Pass it as a side input to Filter.
| "Filter by Language" >> beam.Filter(lambda elem, cls: elem["class"], c)
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
或
tables = [
(
data
# Explicitly capture it as a default value in the lambda.
| "Filter by Language" >> beam.Filter(lambda elem, cls=c: elem["class"])
| "Add id as key" >> beam.Map(lambda elem: (elem["itemid"], elem))
)
for c in CLASSES
]
分区在这里也很有效,既可以避免这种陷阱,也可以表达您的意图。