"Task not serializable" Spark 代码错误

"Task not serializable" error in Spark code

我从以下代码中收到 Task not serializable 错误。

   var inputRDD = sc.textFile(inputPath).flatMap(line => line.split(":")).map{word => (word)} 
   var i = 1
   var characters:Array[String] = new Array[String](3)
   characters = Array("a","i","m")    
   for (i <- 1 to 4){
     inputRDD.foreach { word =>
          sc.broadcast(replacement)
          val result = word.replaceAll(replacement, "b")
          println(word,result)
          replacement = characters(i)        
      }
     }

我在网上找不到任何帮助。我需要一些帮助。 谢谢

此代码尝试在 RDD 的闭包中使用 SparkContext (sc)。 sparkContext 不可序列化,并不意味着在其使用中被序列化。

我不清楚这段代码试图实现什么,但需要对其进行更改以从任何闭包中删除 sparkContext 的使用。

@maasg 已经回答了你的问题。但是代码错了,我想你需要一些帮助才能让它工作。不知道你有没有看完http://spark.apache.org/docs/latest/programming-guide.html

你的错误:

    第一行的
  • map{word => (word)} 什么都不做。它只是将每个单词映射到自身。
  • 第二行的i没有用到。 for 循环声明了一个新变量。您不需要预先声明循环变量。
  • characters初始值未使用,立即替换。为什么不从最终值开始呢?那么所有 var 都可以是 valvals 更易于使用,因为它们不会更改值。
  • foreach 中,您尝试为 replacement 创建一个未声明的广播变量。 sc.broadcast 必须在驱动程序中 运行 的代码中调用(foreach 的主体将在执行程序中 运行 )。 sc.broadcast returns 一个广播变量,然后您可以在执行程序上 运行 的代码中使用它。相反,你只是从地面返回的变量。
  • broadcast 仅对大值(如 >10kB)很重要。暂时不用担心。
  • 您尝试从 1 到 4 对 3 元素数组 (characters) 进行索引。有效索引是从 0 到 2。
  • foreach 不能用于修改 RDD。 RDD 是不可变的。您需要使用 map 来从原始 RDD 创建一个新的修改后的 RDD。

我猜你想做的是加载一个文件,用冒号分割并将三个字母(aim)替换为 b。方法如下:

def replaceLetters(filename: String,
                   patterns: Seq[String],
                   replacement: String): RDD[String] = {
  val words = sc.textFile(filename).flatMap(_.split(":"))
  words.map { word =>
    patterns.foldLeft(word) {
      (word, pattern) => word.replaceAll(pattern, replacement)
    }
  }
}

// Call our function, print the results.
replaceLetters("my_file", "a,i,m".split(","), "b").collect.foreach(println(_))