数据流中 python zip 函数的等效项是什么?
what's the equivalent of the python zip function in dataflow?
我正在使用 python apache_beam 版本的数据流。我有大约 300 个文件,每个文件有 400 万个条目。整个东西大约5Gb,存储在一个gs bucket上。
我可以通过读取每个文件轻松生成数组 {x_1, ... x_n}
的 PCollection,但我现在需要执行的操作类似于 python zip 函数:我想要一个范围从 0 的 PCollection到 n-1,其中每个元素 i
包含文件中所有 x_i
的数组。我尝试对每个元素 yield
ing (i, element)
然后 运行ning GroupByKey,但这太慢且效率低下(由于内存,它根本不会在本地 运行约束,在云上花了 24 小时,而我确信如果我愿意,我至少可以加载所有数据集)。
如何重构管道以干净地执行此操作?
正如jkff在上面的评论中指出的那样,代码确实是正确的,并且该过程是对tensorflow算法进行编程的推荐方式。应用于每个元素的 DoFn 是瓶颈。
我正在使用 python apache_beam 版本的数据流。我有大约 300 个文件,每个文件有 400 万个条目。整个东西大约5Gb,存储在一个gs bucket上。
我可以通过读取每个文件轻松生成数组 {x_1, ... x_n}
的 PCollection,但我现在需要执行的操作类似于 python zip 函数:我想要一个范围从 0 的 PCollection到 n-1,其中每个元素 i
包含文件中所有 x_i
的数组。我尝试对每个元素 yield
ing (i, element)
然后 运行ning GroupByKey,但这太慢且效率低下(由于内存,它根本不会在本地 运行约束,在云上花了 24 小时,而我确信如果我愿意,我至少可以加载所有数据集)。
如何重构管道以干净地执行此操作?
正如jkff在上面的评论中指出的那样,代码确实是正确的,并且该过程是对tensorflow算法进行编程的推荐方式。应用于每个元素的 DoFn 是瓶颈。