使用 Spark 过滤大型数据集中的列
Filter columns in large dataset with Spark
我有一个数据集,它有 1,000,000 行乘以大约 390,000 列。字段都是二进制的,不是0就是1,数据很稀疏
我一直在使用 Spark 来处理这些数据。我当前的任务是过滤数据——我只想要已预选的 1000 列中的数据。这是我用来完成此任务的当前代码:
val result = bigdata.map(_.zipWithIndex.filter{case (value, index) => selectedColumns.contains(index)})
bigdata
只是一个 RDD[Array[Int]]
但是,此代码需要相当长的时间才能 运行。我确信有一种更有效的方法来过滤我的数据集,而不涉及单独进入和过滤每一行。将我的数据加载到 DataFrame 中,并通过 DataFrame API 对其进行操作会使事情变得 faster/easier 吗?我应该研究基于列存储的数据库吗?
加速执行的最简单方法是将其与 partitionBy 并行化:
bigdata.partitionBy(new HashPartitioner(numPartitions)).foreachPartition(...)
foreachPartition 接收一个迭代器,您可以在其上进行映射和过滤。
numPartitions 是一个 val,您可以使用所需的并行分区数量进行设置。
您可以从提高过滤器效率开始着手。请注意:
- 您的
RDD
包含 Array[Int]
。这意味着您可以在 O(1) 时间内访问每一行的第 n 个元素
- #selectedColumns << #columns
考虑到这两个事实,很明显,遍历每一行的所有元素是没有意义的,更不用说 contains
调用了。相反,您可以简单地 map
而不是 selectedColumns
// Optional if selectedColumns are not ordered
val orderedSelectedColumns = selectedColumns.toList.sorted.toArray
rdd.map(row => selectedColumns.map(row))
比较时间复杂度:
zipWithIndex
+ filter
(假设当 contains
为 O(1) 时的最佳情况)- O(#rows * # columns)
map
- O(#rows * #selectedColumns)
我有一个数据集,它有 1,000,000 行乘以大约 390,000 列。字段都是二进制的,不是0就是1,数据很稀疏
我一直在使用 Spark 来处理这些数据。我当前的任务是过滤数据——我只想要已预选的 1000 列中的数据。这是我用来完成此任务的当前代码:
val result = bigdata.map(_.zipWithIndex.filter{case (value, index) => selectedColumns.contains(index)})
bigdata
只是一个 RDD[Array[Int]]
但是,此代码需要相当长的时间才能 运行。我确信有一种更有效的方法来过滤我的数据集,而不涉及单独进入和过滤每一行。将我的数据加载到 DataFrame 中,并通过 DataFrame API 对其进行操作会使事情变得 faster/easier 吗?我应该研究基于列存储的数据库吗?
加速执行的最简单方法是将其与 partitionBy 并行化:
bigdata.partitionBy(new HashPartitioner(numPartitions)).foreachPartition(...)
foreachPartition 接收一个迭代器,您可以在其上进行映射和过滤。
numPartitions 是一个 val,您可以使用所需的并行分区数量进行设置。
您可以从提高过滤器效率开始着手。请注意:
- 您的
RDD
包含Array[Int]
。这意味着您可以在 O(1) 时间内访问每一行的第 n 个元素 - #selectedColumns << #columns
考虑到这两个事实,很明显,遍历每一行的所有元素是没有意义的,更不用说 contains
调用了。相反,您可以简单地 map
而不是 selectedColumns
// Optional if selectedColumns are not ordered
val orderedSelectedColumns = selectedColumns.toList.sorted.toArray
rdd.map(row => selectedColumns.map(row))
比较时间复杂度:
zipWithIndex
+filter
(假设当contains
为 O(1) 时的最佳情况)- O(#rows * # columns)map
- O(#rows * #selectedColumns)