"Task not serializable" Spark 代码错误
"Task not serializable" error in Spark code
我从以下代码中收到 Task not serializable
错误。
var inputRDD = sc.textFile(inputPath).flatMap(line => line.split(":")).map{word => (word)}
var i = 1
var characters:Array[String] = new Array[String](3)
characters = Array("a","i","m")
for (i <- 1 to 4){
inputRDD.foreach { word =>
sc.broadcast(replacement)
val result = word.replaceAll(replacement, "b")
println(word,result)
replacement = characters(i)
}
}
我在网上找不到任何帮助。我需要一些帮助。
谢谢
此代码尝试在 RDD 的闭包中使用 SparkContext (sc
)。 sparkContext
不可序列化,并不意味着在其使用中被序列化。
我不清楚这段代码试图实现什么,但需要对其进行更改以从任何闭包中删除 sparkContext
的使用。
@maasg 已经回答了你的问题。但是代码错了,我想你需要一些帮助才能让它工作。不知道你有没有看完http://spark.apache.org/docs/latest/programming-guide.html。
你的错误:
第一行的 map{word => (word)}
什么都不做。它只是将每个单词映射到自身。
- 第二行的
i
没有用到。 for
循环声明了一个新变量。您不需要预先声明循环变量。
characters
初始值未使用,立即替换。为什么不从最终值开始呢?那么所有 var
都可以是 val
。 val
s 更易于使用,因为它们不会更改值。
- 在
foreach
中,您尝试为 replacement
创建一个未声明的广播变量。 sc.broadcast
必须在驱动程序中 运行 的代码中调用(foreach
的主体将在执行程序中 运行 )。 sc.broadcast
returns 一个广播变量,然后您可以在执行程序上 运行 的代码中使用它。相反,你只是从地面返回的变量。
broadcast
仅对大值(如 >10kB)很重要。暂时不用担心。
- 您尝试从 1 到 4 对 3 元素数组 (
characters
) 进行索引。有效索引是从 0 到 2。
foreach
不能用于修改 RDD。 RDD 是不可变的。您需要使用 map
来从原始 RDD 创建一个新的修改后的 RDD。
我猜你想做的是加载一个文件,用冒号分割并将三个字母(a
、i
、m
)替换为 b
。方法如下:
def replaceLetters(filename: String,
patterns: Seq[String],
replacement: String): RDD[String] = {
val words = sc.textFile(filename).flatMap(_.split(":"))
words.map { word =>
patterns.foldLeft(word) {
(word, pattern) => word.replaceAll(pattern, replacement)
}
}
}
// Call our function, print the results.
replaceLetters("my_file", "a,i,m".split(","), "b").collect.foreach(println(_))
我从以下代码中收到 Task not serializable
错误。
var inputRDD = sc.textFile(inputPath).flatMap(line => line.split(":")).map{word => (word)}
var i = 1
var characters:Array[String] = new Array[String](3)
characters = Array("a","i","m")
for (i <- 1 to 4){
inputRDD.foreach { word =>
sc.broadcast(replacement)
val result = word.replaceAll(replacement, "b")
println(word,result)
replacement = characters(i)
}
}
我在网上找不到任何帮助。我需要一些帮助。 谢谢
此代码尝试在 RDD 的闭包中使用 SparkContext (sc
)。 sparkContext
不可序列化,并不意味着在其使用中被序列化。
我不清楚这段代码试图实现什么,但需要对其进行更改以从任何闭包中删除 sparkContext
的使用。
@maasg 已经回答了你的问题。但是代码错了,我想你需要一些帮助才能让它工作。不知道你有没有看完http://spark.apache.org/docs/latest/programming-guide.html。
你的错误:
-
第一行的
map{word => (word)}
什么都不做。它只是将每个单词映射到自身。- 第二行的
i
没有用到。for
循环声明了一个新变量。您不需要预先声明循环变量。 characters
初始值未使用,立即替换。为什么不从最终值开始呢?那么所有var
都可以是val
。val
s 更易于使用,因为它们不会更改值。- 在
foreach
中,您尝试为replacement
创建一个未声明的广播变量。sc.broadcast
必须在驱动程序中 运行 的代码中调用(foreach
的主体将在执行程序中 运行 )。sc.broadcast
returns 一个广播变量,然后您可以在执行程序上 运行 的代码中使用它。相反,你只是从地面返回的变量。 broadcast
仅对大值(如 >10kB)很重要。暂时不用担心。- 您尝试从 1 到 4 对 3 元素数组 (
characters
) 进行索引。有效索引是从 0 到 2。 foreach
不能用于修改 RDD。 RDD 是不可变的。您需要使用map
来从原始 RDD 创建一个新的修改后的 RDD。
我猜你想做的是加载一个文件,用冒号分割并将三个字母(a
、i
、m
)替换为 b
。方法如下:
def replaceLetters(filename: String,
patterns: Seq[String],
replacement: String): RDD[String] = {
val words = sc.textFile(filename).flatMap(_.split(":"))
words.map { word =>
patterns.foldLeft(word) {
(word, pattern) => word.replaceAll(pattern, replacement)
}
}
}
// Call our function, print the results.
replaceLetters("my_file", "a,i,m".split(","), "b").collect.foreach(println(_))