在 pyspark 中广播大型阵列(~ 8GB)

Broadcast large array in pyspark (~ 8GB)

在 Pyspark 中,我正在尝试广播一个大小约为 8GB 的​​大型 numpy 数组。但它因错误 "OverflowError: cannot serialize a string larger than 4GiB" 而失败。我有 15g 的执行程序内存和 25g 的驱动程序内存。我试过使用默认和 kyro 序列化程序。两者都不起作用并显示相同的错误。 谁能建议如何消除此错误以及处理大型广播变量的最有效方法?

PySpark 不使用 Java 侧面序列化进行广播,因此使用 Kryo 或任何其他序列化设置无济于事。这只是 version 4.

之前 pickle 协议的限制

理论上应该可以调整 PySpark 代码以使用 Python 3.4+ 中特定版本的协议,但总的来说我不认为这样做值得。通常在 PySpark 中广播大变量,因为它不在执行者之间共享。

如果您真的需要这个,最简单的解决方案就是将数组拆分为多个大小小于 4GB 的块。它不会使 PySpark 广播更有效率,但应该可以解决您的问题。

offset = ...
a_huge_array = np.array(...)

a_huge_array_block_1 = sc.broadcast(a_huge_array[0:offset])
a_huge_array_block_2 = sc.broadcast(a_huge_array[offset:2*offset])
...

更聪明的处理方法是使用本地 file-system 而不是变量来分发文件,并通过 memory-mapping. You can for example use flat files or Memory-Mapped SQLite.

访问这些文件

这不是PySpark的问题,这是Spark实现的限制。

Spark使用scala数组来存储广播元素,由于Scala的最大Integer是2*10^9,所以字符串总字节数是2*2*10^9 = 4GB,可以查看Spark代码。