为什么在过滤器中使用集合会导致 "org.apache.spark.SparkException: Task not serializable"?

Why does using a set in filter cause "org.apache.spark.SparkException: Task not serializable"?

我正在尝试根据列表中这些对象的字段来过滤 RDD 中的对象集合。

我尝试的方法与此处相同: Filter based on another RDD in Spark

val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet

val usersRDD = userContext.loadUsers("/user.parquet")

这个有效:

usersRDD.filter(user =>  Set("Pete","John" ).contains( user.firstName )).first

当我尝试时

usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first

我收到这个错误

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext

我尝试这个时遇到同样的错误

val shortTestList = Set("Pete","John" )

usersRDD.filter(user => shortTestList .contains( user.firstName )).first

为什么在这些过滤语句中指定一组 names/String 时会出现此错误?

据我所知这应该可行,我没有在过滤器语句中的任何位置指定 SparkContext。那么为什么会出错呢?以及如何不出错?

我使用的Spark版本是1.5.2。

我也试过先播一组人名。

val namesToFilterOnBC = sc.broadcast(namesToFilterOn)
usersRDD.filter(user => namesToFilterOnBC.value.contains( user.firstName )).first

这又会导致同样的错误

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext

原因是 val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet 属于一个包含不可序列化值的对象,因此出现错误。

user => namesToFilterOn.contains( user.firstName )转换成字节格式通过网络发送给执行器时,Spark检查是否有任何对不可序列化对象的引用,SparkContext就在其中。

似乎 Spark 找到了一个您引用不可序列化的 SparkContext 的地方并引发了异常。

一个解决方案是在 Scala 中将 val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSetval shortTestList = Set("Pete","John" ) 包装为 object 的单独方法。你也可以在闭包里面使用其他val shortTestList(如Job aborted due to stage failure: Task not serializable) or broadcast中所述。

您可能会发现文档 SIP-21 - Spores 对案例非常有用。

询问了 userContext 的开发人员并通过不显式实例化 userContext 而仅通过导入其功能解决了这个问题。

import userContext._
sc.loadUsers("/user.parquet")
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first

而不是

val userContext = new UserContext(sc)
userContext.loadUsers("/user.parquet")
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first