为什么在过滤器中使用集合会导致 "org.apache.spark.SparkException: Task not serializable"?
Why does using a set in filter cause "org.apache.spark.SparkException: Task not serializable"?
我正在尝试根据列表中这些对象的字段来过滤 RDD 中的对象集合。
我尝试的方法与此处相同:
Filter based on another RDD in Spark
val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet
val usersRDD = userContext.loadUsers("/user.parquet")
这个有效:
usersRDD.filter(user => Set("Pete","John" ).contains( user.firstName )).first
当我尝试时
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first
我收到这个错误
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
我尝试这个时遇到同样的错误
val shortTestList = Set("Pete","John" )
usersRDD.filter(user => shortTestList .contains( user.firstName )).first
为什么在这些过滤语句中指定一组 names/String 时会出现此错误?
据我所知这应该可行,我没有在过滤器语句中的任何位置指定 SparkContext。那么为什么会出错呢?以及如何不出错?
我使用的Spark版本是1.5.2。
我也试过先播一组人名。
val namesToFilterOnBC = sc.broadcast(namesToFilterOn)
usersRDD.filter(user => namesToFilterOnBC.value.contains( user.firstName )).first
这又会导致同样的错误
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
原因是 val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet
属于一个包含不可序列化值的对象,因此出现错误。
当user => namesToFilterOn.contains( user.firstName )
转换成字节格式通过网络发送给执行器时,Spark检查是否有任何对不可序列化对象的引用,SparkContext就在其中。
似乎 Spark 找到了一个您引用不可序列化的 SparkContext 的地方并引发了异常。
一个解决方案是在 Scala 中将 val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet
或 val shortTestList = Set("Pete","John" )
包装为 object
的单独方法。你也可以在闭包里面使用其他val shortTestList
(如Job aborted due to stage failure: Task not serializable) or broadcast中所述。
您可能会发现文档 SIP-21 - Spores 对案例非常有用。
询问了 userContext 的开发人员并通过不显式实例化 userContext 而仅通过导入其功能解决了这个问题。
import userContext._
sc.loadUsers("/user.parquet")
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first
而不是
val userContext = new UserContext(sc)
userContext.loadUsers("/user.parquet")
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first
我正在尝试根据列表中这些对象的字段来过滤 RDD 中的对象集合。
我尝试的方法与此处相同: Filter based on another RDD in Spark
val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet
val usersRDD = userContext.loadUsers("/user.parquet")
这个有效:
usersRDD.filter(user => Set("Pete","John" ).contains( user.firstName )).first
当我尝试时
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first
我收到这个错误
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
我尝试这个时遇到同样的错误
val shortTestList = Set("Pete","John" )
usersRDD.filter(user => shortTestList .contains( user.firstName )).first
为什么在这些过滤语句中指定一组 names/String 时会出现此错误?
据我所知这应该可行,我没有在过滤器语句中的任何位置指定 SparkContext。那么为什么会出错呢?以及如何不出错?
我使用的Spark版本是1.5.2。
我也试过先播一组人名。
val namesToFilterOnBC = sc.broadcast(namesToFilterOn)
usersRDD.filter(user => namesToFilterOnBC.value.contains( user.firstName )).first
这又会导致同样的错误
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
原因是 val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet
属于一个包含不可序列化值的对象,因此出现错误。
当user => namesToFilterOn.contains( user.firstName )
转换成字节格式通过网络发送给执行器时,Spark检查是否有任何对不可序列化对象的引用,SparkContext就在其中。
似乎 Spark 找到了一个您引用不可序列化的 SparkContext 的地方并引发了异常。
一个解决方案是在 Scala 中将 val namesToFilterOn = sc.textFile("/names_to_filter_on.txt").collect.toSet
或 val shortTestList = Set("Pete","John" )
包装为 object
的单独方法。你也可以在闭包里面使用其他val shortTestList
(如Job aborted due to stage failure: Task not serializable) or broadcast中所述。
您可能会发现文档 SIP-21 - Spores 对案例非常有用。
询问了 userContext 的开发人员并通过不显式实例化 userContext 而仅通过导入其功能解决了这个问题。
import userContext._
sc.loadUsers("/user.parquet")
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first
而不是
val userContext = new UserContext(sc)
userContext.loadUsers("/user.parquet")
usersRDD.filter(user => namesToFilterOn.contains( user.firstName )).first