避免在 Apache Spark 中使用 Java 数据结构以避免复制数据

Avoid the use of Java data structures in Apache Spark to avoid copying the data

我有一个 MySQL 数据库,其中一个 table 包含大约 1 亿条记录(~25GB,~5 列)。使用 Apache Spark,我通过 JDBC 连接器提取此数据并将其存储在 DataFrame 中。 从这里开始,我对数据进行了一些预处理(例如替换 NULL 值),因此我绝对需要遍历每条记录。 然后我想执行降维和特征选择(例如使用 PCA),执行聚类(例如 K-Means),然后在新数据上测试模型。

我已经在 Spark 的 Java API 中实现了这个,但是它太慢了(对我来说)因为我做了很多数据从 DataFrame 复制到 java.util.Vector 和 java.util.List(以便能够遍历所有记录并进行预处理),然后返回到 DataFrame(因为 Spark 中的 PCA 需要 DataFrame 作为输入)。

我曾尝试将信息从数据库中提取到 org.apache.spark.sql.Column 中,但找不到迭代它的方法。 我还尝试通过使用 org.apache.spark.mllib.linalg.{DenseVector, SparseVector} 来避免使用 Java 数据结构(例如 List 和 Vector),但也无法使其正常工作。 最后,我还考虑过使用 JavaRDD(通过从 DataFrame 和自定义模式创建它),但无法完全解决。

经过冗长的描述,我的问题是:有没有一种方法可以完成第一段中提到的所有步骤,而无需将所有数据复制到 Java 数据结构中? 也许我尝试过的选项之一实际上可以工作,但我似乎无法找到如何工作,因为关于 Spark 的文档和文献有点稀缺。

从你问题的措辞来看,似乎对Spark处理的阶段有些混淆。

首先,我们通过指定输入和转换告诉 Spark 要做什么。此时,唯一已知的是 (a) 各个处理阶段的分区数量和 (b) 数据模式。 org.apache.spark.sql.Column 在此阶段用于标识与列关联的元数据。但是,它不包含任何数据。事实上,现阶段根本没有数据。

其次,我们告诉 Spark 在 dataframe/dataset 上执行一个动作。这就是处理的开始。输入被读取并流经各种转换并进入最终的操作操作,无论是 collectsave 还是其他。

所以,这就解释了为什么你不能 "extract information from the database into" Column

至于你的问题的核心,如果没有看到你的代码并且确切地知道你想要完成的是什么,很难发表评论,但是可以肯定地说,在类型之间进行大量迁移是一个坏主意。

以下几个问题可能有助于引导您获得更好的结果:

  • 为什么不能通过直接在Row实例上操作来执行所需的数据转换?

  • 将一些转换代码包装成 UDF 或 UDAF 会方便吗?

希望对您有所帮助。