将 pcollection 的每一行拆分为多个 pcollection?
Split each row of a pcollection into multiple pcollections?
在我进行了一些处理和按键分组之后,我得到了如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆分为单独的 pcollections,并行处理,然后将它们拼合在一起。如何将每一行拆分为不同的 pcollection?如果这不可行,是否有其他方法可以加快计算速度?
输入
key, value
1 [A, B, B, B]
2 [A, B, B, B]
3 [A, B, B, B]
4 [A, B, B, B]
5 [A, B, B, B]
输出:
key, value
1 (A, 0)
1 (B, 1)
1 (B, 2)
1 (B, 3)
2 (A, 0)
2 (B, 1)
2 (B, 2)
2 (B, 3)
...
在使用 Apache Beam 模型时,一个常见的误解是并行化方案是由 PCollection 定义的(可以理解,因为这是 Parallel Collection 的缩写)。实际上,每个 PCollection[1] 中的每个键 都定义了并行化。换句话说,Beam 模型并行处理键,但顺序处理单个键中的值。
您遇到的问题通常称为热键。当过多的值与单个键配对时会发生这种情况,从而限制了并行性。
要将数据操作到预期的输出,您将必须编辑现有管道以发出值,而不是所有元素都转到一个键。这有点困难,因为在您的示例中您希望输出带有元素的索引。如果是这种情况,那么无论你怎么切割,你都必须将某处的所有值合并到内存中的一个键才能得到正确的索引。
如果您不像上面的示例那样关心获取特定索引,请查看以下代码。此代码将每个元素分配给每个键内的随机分区。这有助于将每个键的元素数量分解为易于管理的数量。
data = [
(k, c) for k in range(1, 6) for c in ('A', 'B', 'B', 'B')
]
p = beam.Pipeline()
elems = p | beam.Create(data)
num_buckets = 4
class Preprocess(beam.DoFn):
def process(self, el):
key = str(el[0])
partition = random.randint(0, num_buckets)
yield (key, partition), el
class Postprocess(beam.DoFn):
def process(self, el):
(key, partition), values = el
index = 0
for el in values:
yield key, (el[1], partition, index)
index += 1
out = (elems | beam.ParDo(Preprocess())
| beam.GroupByKey()
| beam.ParDo(Postprocess()))
输入
key,value
1 A
1 B
1 B
1 B
2 A
2 B
2 B
2 B
3 A
3 B
...
潜在产出
key,(value,partition,index)
1 ('A', 1, 0)
1 ('B', 1, 1)
1 ('B', 2, 0)
1 ('B', 3, 0)
2 ('A', 3, 0)
2 ('B', 3, 1)
2 ('B', 1, 0)
2 ('B', 1, 1)
3 ('A', 3, 0)
3 ('B', 2, 0)
...
[1] 使用流式传输时,它是按键定义的 window
在我进行了一些处理和按键分组之后,我得到了如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆分为单独的 pcollections,并行处理,然后将它们拼合在一起。如何将每一行拆分为不同的 pcollection?如果这不可行,是否有其他方法可以加快计算速度?
输入
key, value
1 [A, B, B, B]
2 [A, B, B, B]
3 [A, B, B, B]
4 [A, B, B, B]
5 [A, B, B, B]
输出:
key, value
1 (A, 0)
1 (B, 1)
1 (B, 2)
1 (B, 3)
2 (A, 0)
2 (B, 1)
2 (B, 2)
2 (B, 3)
...
在使用 Apache Beam 模型时,一个常见的误解是并行化方案是由 PCollection 定义的(可以理解,因为这是 Parallel Collection 的缩写)。实际上,每个 PCollection[1] 中的每个键 都定义了并行化。换句话说,Beam 模型并行处理键,但顺序处理单个键中的值。
您遇到的问题通常称为热键。当过多的值与单个键配对时会发生这种情况,从而限制了并行性。
要将数据操作到预期的输出,您将必须编辑现有管道以发出值,而不是所有元素都转到一个键。这有点困难,因为在您的示例中您希望输出带有元素的索引。如果是这种情况,那么无论你怎么切割,你都必须将某处的所有值合并到内存中的一个键才能得到正确的索引。
如果您不像上面的示例那样关心获取特定索引,请查看以下代码。此代码将每个元素分配给每个键内的随机分区。这有助于将每个键的元素数量分解为易于管理的数量。
data = [
(k, c) for k in range(1, 6) for c in ('A', 'B', 'B', 'B')
]
p = beam.Pipeline()
elems = p | beam.Create(data)
num_buckets = 4
class Preprocess(beam.DoFn):
def process(self, el):
key = str(el[0])
partition = random.randint(0, num_buckets)
yield (key, partition), el
class Postprocess(beam.DoFn):
def process(self, el):
(key, partition), values = el
index = 0
for el in values:
yield key, (el[1], partition, index)
index += 1
out = (elems | beam.ParDo(Preprocess())
| beam.GroupByKey()
| beam.ParDo(Postprocess()))
输入
key,value
1 A
1 B
1 B
1 B
2 A
2 B
2 B
2 B
3 A
3 B
...
潜在产出
key,(value,partition,index)
1 ('A', 1, 0)
1 ('B', 1, 1)
1 ('B', 2, 0)
1 ('B', 3, 0)
2 ('A', 3, 0)
2 ('B', 3, 1)
2 ('B', 1, 0)
2 ('B', 1, 1)
3 ('A', 3, 0)
3 ('B', 2, 0)
...
[1] 使用流式传输时,它是按键定义的 window