Spark DataFrame 过滤:保留属于列表的元素
Spark DataFrame filtering: retain element belonging to a list
我在 Zeppelin 笔记本上使用 Spark 1.5.1 和 Scala。
- 我有一个 DataFrame,其中有一个名为 userID 的列,类型为 Long。
- 我总共有大约 400 万行和 200,000 个唯一用户 ID。
- 我还有一个要排除的 50,000 个用户 ID 的列表。
- 我可以轻松构建要保留的用户 ID 列表。
删除属于要排除的用户的所有行的最佳方法是什么?
问同一个问题的另一种方法是:保留属于用户的行的最佳方法是什么?
我看到了 并应用了它的解决方案(见下面的代码),但是执行速度很慢,知道我在我的本地机器上是 运行 SPARK 1.5.1,我有16GB 的体面 RAM 内存和初始 DataFrame 适合内存。
这是我正在应用的代码:
import org.apache.spark.sql.functions.lit
val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))
在上面的代码中:
- initialDataFrame 有 3885068 行,每行有 5 列,其中一列称为 userID,它包含 Long 值。
- listOfUsersToKeep 是一个数组 [Long],它包含 150,000 个 Long 用户 ID。
我想知道是否有比我正在使用的更有效的解决方案。
谢谢
您可以使用 join
:
val usersToKeep = sc.parallelize(
listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")
val finalDataFrame = usersToKeep
.join(initialDataFrame, $"userID" === $"userID_")
.drop("userID_")
或者一个广播变量和一个 UDF:
import org.apache.spark.sql.functions.udf
val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
val finalDataFrame = initialDataFrame.where(checkUser($"userID"))
广播DataFrame应该也是可能的:
import org.apache.spark.sql.functions.broadcast
initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")
我在 Zeppelin 笔记本上使用 Spark 1.5.1 和 Scala。
- 我有一个 DataFrame,其中有一个名为 userID 的列,类型为 Long。
- 我总共有大约 400 万行和 200,000 个唯一用户 ID。
- 我还有一个要排除的 50,000 个用户 ID 的列表。
- 我可以轻松构建要保留的用户 ID 列表。
删除属于要排除的用户的所有行的最佳方法是什么?
问同一个问题的另一种方法是:保留属于用户的行的最佳方法是什么?
我看到了
这是我正在应用的代码:
import org.apache.spark.sql.functions.lit
val finalDataFrame = initialDataFrame.where($"userID".in(listOfUsersToKeep.map(lit(_)):_*))
在上面的代码中:
- initialDataFrame 有 3885068 行,每行有 5 列,其中一列称为 userID,它包含 Long 值。
- listOfUsersToKeep 是一个数组 [Long],它包含 150,000 个 Long 用户 ID。
我想知道是否有比我正在使用的更有效的解决方案。
谢谢
您可以使用 join
:
val usersToKeep = sc.parallelize(
listOfUsersToKeep.map(Tuple1(_))).toDF("userID_")
val finalDataFrame = usersToKeep
.join(initialDataFrame, $"userID" === $"userID_")
.drop("userID_")
或者一个广播变量和一个 UDF:
import org.apache.spark.sql.functions.udf
val usersToKeepBD = sc.broadcast(listOfUsersToKeep.toSet)
val checkUser = udf((id: Long) => usersToKeepBD.value.contains(id))
val finalDataFrame = initialDataFrame.where(checkUser($"userID"))
广播DataFrame应该也是可能的:
import org.apache.spark.sql.functions.broadcast
initialDataFrame.join(broadcast(usersToKeep), $"userID" === $"userID_")