是否可以在两个 PCollections 上的 apache beam 中执行 zip 操作?
Is it possible to do a zip operation in apache beam on two PCollections?
我有一个 PCollection[str]
,我想生成随机对。
来自 Apache Spark,我的策略是:
- 复制原始PCollection
- 随机洗牌
- 用原始 PCollection 压缩它
但是我似乎找不到压缩 2 个 PCollections 的方法...
如何应用 ParDo transform to both PCollections that attach keys to elements and running the two PCollections through a CoGroupByKey 转换?
请注意,Beam 不保证 PCollection 中元素的顺序,因此输出元素可能会在任何步骤后重新排序,但对于您的用例来说这应该没问题,因为您只需要一些随机顺序。
这很有趣,但不是很常见的用例,因为正如@chamikara 所说,Dataflow 中没有顺序保证。但是,我考虑过实施一个解决方案,在该解决方案中,您可以打乱输入 PCollection,然后根据 state 对连续元素进行配对。我发现了一些注意事项,但我认为无论如何都值得分享。
首先,我使用了 Python SDK,但 Dataflow Runner 尚不支持有状态的 DoFn。它可以与 Direct Runner 一起使用,但:1) 它不可扩展,并且 2) 没有多线程就很难打乱记录。当然,后者的一个简单解决方案是将已经洗牌的 PCollection 提供给管道(我们可以使用不同的作业来预处理数据)。否则,我们可以将此示例改编为 Java SDK。
现在,我决定尝试洗牌并将其与单个管道配对。我真的不知道这是否有帮助或使事情变得更复杂,但可以找到代码 here。
简而言之,有状态的 DoFn 查看缓冲区,如果缓冲区为空,则放入当前元素。否则,它从缓冲区中弹出前一个元素并输出一个元组 (previous_element, current_element):
class PairRecordsFn(beam.DoFn):
"""Pairs two consecutive elements after shuffle"""
BUFFER = BagStateSpec('buffer', PickleCoder())
def process(self, element, buffer=beam.DoFn.StateParam(BUFFER)):
try:
previous_element = list(buffer.read())[0]
except:
previous_element = []
unused_key, value = element
if previous_element:
yield (previous_element, value)
buffer.clear()
else:
buffer.add(value)
管道根据使用有状态 DoFn 的需要将键添加到输入元素。这里会有一个折衷,因为您可以使用 beam.Map(lambda x: (1, x))
为所有元素分配相同的键。这不会很好地并行化,但这不是问题,因为无论如何我们都在使用 Direct Runner(如果使用 Java SDK,请记住这一点)。但是,它不会洗牌记录。相反,如果我们洗牌到大量键,我们将得到更多无法配对的 "orphaned" 元素(因为每个键都保留状态,我们随机分配它们,我们可以有一个奇数每个键的记录数):
pairs = (p
| 'Create Events' >> beam.Create(data)
| 'Add Keys' >> beam.Map(lambda x: (randint(1,4), x))
| 'Pair Records' >> beam.ParDo(PairRecordsFn())
| 'Check Results' >> beam.ParDo(LogFn()))
在我的例子中,我得到了类似的东西:
INFO:root:('one', 'three')
INFO:root:('two', 'five')
INFO:root:('zero', 'six')
INFO:root:('four', 'seven')
INFO:root:('ten', 'twelve')
INFO:root:('nine', 'thirteen')
INFO:root:('eight', 'fourteen')
INFO:root:('eleven', 'sixteen')
...
编辑:我想到了另一种使用 Sample.FixedSizeGlobally
组合器的方法。好处是它可以更好地打乱数据,但您需要先验地知道元素的数量(否则我们需要对数据进行初始传递)并且它似乎 return 所有元素在一起。简而言之,我对同一个 PCollection 进行了两次初始化,但应用了不同的洗牌顺序并在有状态的 DoFn 中分配索引。这将保证索引在同一 PCollection 中的元素之间是唯一的(即使不能保证顺序)。在我的例子中,两个 PCollection 都对 [0, 31] 范围内的每个键都有一个记录。 CoGroupByKey 转换将在同一索引上加入两个 PCollections,从而具有随机的元素对:
pc1 = (p
| 'Create Events 1' >> beam.Create(data)
| 'Sample 1' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 1' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 1' >> beam.Map(lambda x: (1, x))
| 'Assign Index 1' >> beam.ParDo(IndexAssigningStatefulDoFn()))
pc2 = (p
| 'Create Events 2' >> beam.Create(data)
| 'Sample 2' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 2' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 2' >> beam.Map(lambda x: (2, x))
| 'Assign Index 2' >> beam.ParDo(IndexAssigningStatefulDoFn()))
zipped = ((pc1, pc2)
| 'Zip Shuffled PCollections' >> beam.CoGroupByKey()
| 'Drop Index' >> beam.Map(lambda (x, y):y)
| 'Check Results' >> beam.ParDo(LogFn()))
完整代码here
结果:
INFO:root:(['ten'], ['nineteen'])
INFO:root:(['twenty-three'], ['seven'])
INFO:root:(['twenty-five'], ['twenty'])
INFO:root:(['twelve'], ['twenty-one'])
INFO:root:(['twenty-six'], ['twenty-five'])
INFO:root:(['zero'], ['twenty-three'])
...
我有一个 PCollection[str]
,我想生成随机对。
来自 Apache Spark,我的策略是:
- 复制原始PCollection
- 随机洗牌
- 用原始 PCollection 压缩它
但是我似乎找不到压缩 2 个 PCollections 的方法...
如何应用 ParDo transform to both PCollections that attach keys to elements and running the two PCollections through a CoGroupByKey 转换?
请注意,Beam 不保证 PCollection 中元素的顺序,因此输出元素可能会在任何步骤后重新排序,但对于您的用例来说这应该没问题,因为您只需要一些随机顺序。
这很有趣,但不是很常见的用例,因为正如@chamikara 所说,Dataflow 中没有顺序保证。但是,我考虑过实施一个解决方案,在该解决方案中,您可以打乱输入 PCollection,然后根据 state 对连续元素进行配对。我发现了一些注意事项,但我认为无论如何都值得分享。
首先,我使用了 Python SDK,但 Dataflow Runner 尚不支持有状态的 DoFn。它可以与 Direct Runner 一起使用,但:1) 它不可扩展,并且 2) 没有多线程就很难打乱记录。当然,后者的一个简单解决方案是将已经洗牌的 PCollection 提供给管道(我们可以使用不同的作业来预处理数据)。否则,我们可以将此示例改编为 Java SDK。
现在,我决定尝试洗牌并将其与单个管道配对。我真的不知道这是否有帮助或使事情变得更复杂,但可以找到代码 here。
简而言之,有状态的 DoFn 查看缓冲区,如果缓冲区为空,则放入当前元素。否则,它从缓冲区中弹出前一个元素并输出一个元组 (previous_element, current_element):
class PairRecordsFn(beam.DoFn):
"""Pairs two consecutive elements after shuffle"""
BUFFER = BagStateSpec('buffer', PickleCoder())
def process(self, element, buffer=beam.DoFn.StateParam(BUFFER)):
try:
previous_element = list(buffer.read())[0]
except:
previous_element = []
unused_key, value = element
if previous_element:
yield (previous_element, value)
buffer.clear()
else:
buffer.add(value)
管道根据使用有状态 DoFn 的需要将键添加到输入元素。这里会有一个折衷,因为您可以使用 beam.Map(lambda x: (1, x))
为所有元素分配相同的键。这不会很好地并行化,但这不是问题,因为无论如何我们都在使用 Direct Runner(如果使用 Java SDK,请记住这一点)。但是,它不会洗牌记录。相反,如果我们洗牌到大量键,我们将得到更多无法配对的 "orphaned" 元素(因为每个键都保留状态,我们随机分配它们,我们可以有一个奇数每个键的记录数):
pairs = (p
| 'Create Events' >> beam.Create(data)
| 'Add Keys' >> beam.Map(lambda x: (randint(1,4), x))
| 'Pair Records' >> beam.ParDo(PairRecordsFn())
| 'Check Results' >> beam.ParDo(LogFn()))
在我的例子中,我得到了类似的东西:
INFO:root:('one', 'three')
INFO:root:('two', 'five')
INFO:root:('zero', 'six')
INFO:root:('four', 'seven')
INFO:root:('ten', 'twelve')
INFO:root:('nine', 'thirteen')
INFO:root:('eight', 'fourteen')
INFO:root:('eleven', 'sixteen')
...
编辑:我想到了另一种使用 Sample.FixedSizeGlobally
组合器的方法。好处是它可以更好地打乱数据,但您需要先验地知道元素的数量(否则我们需要对数据进行初始传递)并且它似乎 return 所有元素在一起。简而言之,我对同一个 PCollection 进行了两次初始化,但应用了不同的洗牌顺序并在有状态的 DoFn 中分配索引。这将保证索引在同一 PCollection 中的元素之间是唯一的(即使不能保证顺序)。在我的例子中,两个 PCollection 都对 [0, 31] 范围内的每个键都有一个记录。 CoGroupByKey 转换将在同一索引上加入两个 PCollections,从而具有随机的元素对:
pc1 = (p
| 'Create Events 1' >> beam.Create(data)
| 'Sample 1' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 1' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 1' >> beam.Map(lambda x: (1, x))
| 'Assign Index 1' >> beam.ParDo(IndexAssigningStatefulDoFn()))
pc2 = (p
| 'Create Events 2' >> beam.Create(data)
| 'Sample 2' >> combine.Sample.FixedSizeGlobally(NUM_ELEMENTS)
| 'Split Sample 2' >> beam.ParDo(SplitFn())
| 'Add Dummy Key 2' >> beam.Map(lambda x: (2, x))
| 'Assign Index 2' >> beam.ParDo(IndexAssigningStatefulDoFn()))
zipped = ((pc1, pc2)
| 'Zip Shuffled PCollections' >> beam.CoGroupByKey()
| 'Drop Index' >> beam.Map(lambda (x, y):y)
| 'Check Results' >> beam.ParDo(LogFn()))
完整代码here
结果:
INFO:root:(['ten'], ['nineteen'])
INFO:root:(['twenty-three'], ['seven'])
INFO:root:(['twenty-five'], ['twenty'])
INFO:root:(['twelve'], ['twenty-one'])
INFO:root:(['twenty-six'], ['twenty-five'])
INFO:root:(['zero'], ['twenty-three'])
...