我如何在 Apache Beam 中像 Spark 一样实现 zipWithIndex?

How can I implement zipWithIndex like Spark in Apache Beam?

Pcollection<String> p1 = {"a","b","c"}

PCollection< KV<Integer,String> > p2 = p1.apply("some operation ") 
//{(1,"a"),(2,"b"),(3,"c")}

我需要让它像 Apache Spark 这样的大文件可扩展,这样它的工作方式如下:

sc.textFile("./filename").zipWithIndex

我的目标是通过以可扩展的方式分配行号来保持大文件中行之间的顺序。

如何通过 Apache Beam 获取结果?

一些相关帖子:

没有执行此操作的内置方法。 PCollections 在 Beam 中是无序的,可能是无界的,并且在多个 worker 上并行处理。 PCollection 来自具有已知顺序的源这一事实无法在 Beam 模型中直接观察到。我认为更简单的方法是在 Beam 管道中使用文件之前对其进行预处理。

(复制my response from user@beam.apache.org)

这很有趣。所以如果我理解你的算法,它会是这样的(伪代码):

A = ReadWithShardedLineNumbers(myFile) : output K<ShardOffset+LocalLineNumber>, V<Data>
B = A.ExtractShardOffsetKeys() : output K<ShardOffset>, V<LocalLineNumber>
C = B.PerKeySum() : output K<ShardOffset>, V<ShardTotalLines>
D = C.GlobalSortAndPrefixSum() : output K<ShardOffset> V<ShardLineNumberOffset>
E = [A,D].JoinAndCalculateGlobalLineNumbers() : output V<GlobalLineNumber+Data>

这有几个假设:

  1. ReadWithShardedLineNumbers: Sources可以输出他们的分片偏移量,偏移量是全局排序的
  2. GlobalSortAndPrefixSum: 所有读分片的总和可以放入内存进行总排序

假设 #2 不会对所有数据大小都成立,并且会因运行器而异,具体取决于读取分片的粒度。但对于一些实际的文件大小子集来说,这似乎是可行的。

此外,我相信上面的伪代码可以在 Beam 中表示,不需要 SplittableDoFn。