Wiki xml 解析器 - org.apache.spark.SparkException: 任务不可序列化
Wiki xml parser - org.apache.spark.SparkException: Task not serializable
我是 scala 和 spark 的新手,尝试了一些教程,这个来自 Advanced Analytics with Spark。以下代码应该有效:
import com.cloudera.datascience.common.XmlInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
val path = "/home/petr/Downloads/wiki/wiki"
val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val kvs = sc.newAPIHadoopFile(path, classOf[XmlInputFormat],
classOf[LongWritable], classOf[Text], conf)
val rawXmls = kvs.map(p => p._2.toString)
import edu.umd.cloud9.collection.wikipedia.language._
import edu.umd.cloud9.collection.wikipedia._
def wikiXmlToPlainText(xml: String): Option[(String, String)] = {
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, xml)
if (page.isEmpty) None
else Some((page.getTitle, page.getContent))
}
val plainText = rawXmls.flatMap(wikiXmlToPlainText)
但它给出了
scala> val plainText = rawXmls.flatMap(wikiXmlToPlainText)
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295)
...
运行 Spark v1.3.0 在本地(我只加载了大约 21MB 的 wiki 文章,只是为了测试它)。
所有 https://whosebug.com/search?q=org.apache.spark.SparkException%3A+Task+not+serializable 都没有给我任何线索...
谢谢。
第一个想到的猜测是:所有代码都包含在定义 SparkContext
的对象中。 Spark 尝试序列化此对象以将 wikiXmlToPlainText
函数传递给节点。尝试用唯一的一个函数创建不同的对象 wikiXmlToPlainText
.
尝试
import com.cloudera.datascience.common.XmlInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
val path = "/home/terrapin/Downloads/enwiki-20150304-pages-articles1.xml-p000000010p000010000"
val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val kvs = sc.newAPIHadoopFile(path, classOf[XmlInputFormat],
classOf[LongWritable], classOf[Text], conf)
val rawXmls = kvs.map(p => p._2.toString)
import edu.umd.cloud9.collection.wikipedia.language._
import edu.umd.cloud9.collection.wikipedia._
val plainText = rawXmls.flatMap{line =>
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, line)
if (page.isEmpty) None
else Some((page.getTitle, page.getContent))
}
我是 scala 和 spark 的新手,尝试了一些教程,这个来自 Advanced Analytics with Spark。以下代码应该有效:
import com.cloudera.datascience.common.XmlInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
val path = "/home/petr/Downloads/wiki/wiki"
val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val kvs = sc.newAPIHadoopFile(path, classOf[XmlInputFormat],
classOf[LongWritable], classOf[Text], conf)
val rawXmls = kvs.map(p => p._2.toString)
import edu.umd.cloud9.collection.wikipedia.language._
import edu.umd.cloud9.collection.wikipedia._
def wikiXmlToPlainText(xml: String): Option[(String, String)] = {
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, xml)
if (page.isEmpty) None
else Some((page.getTitle, page.getContent))
}
val plainText = rawXmls.flatMap(wikiXmlToPlainText)
但它给出了
scala> val plainText = rawXmls.flatMap(wikiXmlToPlainText)
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295)
...
运行 Spark v1.3.0 在本地(我只加载了大约 21MB 的 wiki 文章,只是为了测试它)。
所有 https://whosebug.com/search?q=org.apache.spark.SparkException%3A+Task+not+serializable 都没有给我任何线索...
谢谢。
第一个想到的猜测是:所有代码都包含在定义 SparkContext
的对象中。 Spark 尝试序列化此对象以将 wikiXmlToPlainText
函数传递给节点。尝试用唯一的一个函数创建不同的对象 wikiXmlToPlainText
.
尝试
import com.cloudera.datascience.common.XmlInputFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
val path = "/home/terrapin/Downloads/enwiki-20150304-pages-articles1.xml-p000000010p000010000"
val conf = new Configuration()
conf.set(XmlInputFormat.START_TAG_KEY, "<page>")
conf.set(XmlInputFormat.END_TAG_KEY, "</page>")
val kvs = sc.newAPIHadoopFile(path, classOf[XmlInputFormat],
classOf[LongWritable], classOf[Text], conf)
val rawXmls = kvs.map(p => p._2.toString)
import edu.umd.cloud9.collection.wikipedia.language._
import edu.umd.cloud9.collection.wikipedia._
val plainText = rawXmls.flatMap{line =>
val page = new EnglishWikipediaPage()
WikipediaPage.readPage(page, line)
if (page.isEmpty) None
else Some((page.getTitle, page.getContent))
}