Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD manually constructed
Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually
我有以下代码可以从事件中检测最常用的顶级域。
我用它通过 Spark SQL 获取日期。
功能本身已经过测试并且可以正常工作。我使用 Amazon EMR 和 spark-shell。当 spark 将任务发送到节点时,几乎立即,我收到了一个很长的堆栈跟踪,最后 "SparkException: Task not serializable" 没有任何具体内容。这是怎么回事?
import scala.io.Source
val suffixesStr =
Source.fromURL("https://publicsuffix.org/list/public_suffix_list.dat").mkString
val suffList =
suffixesStr.lines.filter(line => !line.startsWith("//") && line.trim() != "")
val suffListRDD = sc.parallelize(suffList.toList).collect()
val cleanDomain = (domain: String) => {
var secLevelSuffix =
suffListRDD.find(suffix => domain.endsWith("."+suffix) && suffix.contains("."))
var regex = """[^.]+\.[^.]+$""".r
if (!secLevelSuffix.isEmpty){
regex = """[^.]+\.[^.]+\.[^.]+$""".r
}
var cleanDomain = regex.findFirstMatchIn(domain).map(_ group 0)
cleanDomain.getOrElse("")
}
val getDomain = (url: String) => {
val domain = """(?i)^(?:(?:https?):\/\/)?(?:(?:www|www1|www2|www3)\.)?([^:?#/\s]+)""".r.findFirstMatchIn(url).map(_ group 1)
var res = domain.getOrElse("")
res = res.toLowerCase()
if (res.contains("google.com")){
res = res.replace("google.com.br", "google.com")
}else{
res = cleanDomain(res)
}
res
}
sqlContext.udf.register("getDomain", getDomain)
val domains = sqlContext.sql("SELECT count(*) c, domain from (SELECT getDomain(page_url) as domain FROM events) t group by domain order by c desc")
domains.take(20).foreach(println)
当您在这种情况下以编程方式定义 RDD 时,不要忘记将不会复制到工作节点的内容标记为 @transient.
你的情况:
@transient val suffixesStr = ...
@transient val suffList = ...
我有以下代码可以从事件中检测最常用的顶级域。 我用它通过 Spark SQL 获取日期。
功能本身已经过测试并且可以正常工作。我使用 Amazon EMR 和 spark-shell。当 spark 将任务发送到节点时,几乎立即,我收到了一个很长的堆栈跟踪,最后 "SparkException: Task not serializable" 没有任何具体内容。这是怎么回事?
import scala.io.Source
val suffixesStr =
Source.fromURL("https://publicsuffix.org/list/public_suffix_list.dat").mkString
val suffList =
suffixesStr.lines.filter(line => !line.startsWith("//") && line.trim() != "")
val suffListRDD = sc.parallelize(suffList.toList).collect()
val cleanDomain = (domain: String) => {
var secLevelSuffix =
suffListRDD.find(suffix => domain.endsWith("."+suffix) && suffix.contains("."))
var regex = """[^.]+\.[^.]+$""".r
if (!secLevelSuffix.isEmpty){
regex = """[^.]+\.[^.]+\.[^.]+$""".r
}
var cleanDomain = regex.findFirstMatchIn(domain).map(_ group 0)
cleanDomain.getOrElse("")
}
val getDomain = (url: String) => {
val domain = """(?i)^(?:(?:https?):\/\/)?(?:(?:www|www1|www2|www3)\.)?([^:?#/\s]+)""".r.findFirstMatchIn(url).map(_ group 1)
var res = domain.getOrElse("")
res = res.toLowerCase()
if (res.contains("google.com")){
res = res.replace("google.com.br", "google.com")
}else{
res = cleanDomain(res)
}
res
}
sqlContext.udf.register("getDomain", getDomain)
val domains = sqlContext.sql("SELECT count(*) c, domain from (SELECT getDomain(page_url) as domain FROM events) t group by domain order by c desc")
domains.take(20).foreach(println)
当您在这种情况下以编程方式定义 RDD 时,不要忘记将不会复制到工作节点的内容标记为 @transient.
你的情况:
@transient val suffixesStr = ...
@transient val suffList = ...