yarn 模式下的 Spark RDD map 不允许访问变量?
Spark RDD map in yarn mode does not allow access to variables?
我在 mapr 集群上安装了全新的 spark 1.2.1,在测试它时我发现它在本地模式下运行良好,但在 yarn 模式下它似乎无法访问变量,如果播出。准确的说是下面的测试代码
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object JustSpark extends App {
val conf = new org.apache.spark.SparkConf().setAppName("SimpleApplication")
val sc = new SparkContext(conf)
val a = List(1,3,4,5,6)
val b = List("a","b","c")
val bBC= sc.broadcast(b)
val data = sc.parallelize(a)
val transform = data map ( t => { "hi" })
transform.take(3) foreach (println _)
val transformx2 = data map ( t => { bBC.value.size })
transformx2.take(3) foreach (println _)
//val transform2 = data map ( t => { b.size })
//transform2.take(3) foreach (println _)
}
在本地模式下工作,但在 yarn 下失败。更准确地说,transform2
和 transformx2
这两种方法都失败了,如果 --master local[8]
,它们都有效。
我正在用 sbt 编译它并用提交工具发送
/opt/mapr/spark/spark-1.2.1/bin/spark-submit --class JustSpark --master yarn target/scala-2.10/simulator_2.10-1.0.jar
知道发生了什么事吗?失败消息只是声称在它应该访问变量的地方有一个 java 空指针异常。还有其他方法可以在 RDD 映射中传递变量吗?
我认为罪魁祸首是
val transform2 = data map ( t => { b.size })
特别是访问局部变量 b 。您实际上可能会在日志文件中看到 java.io.NotSerializableException .
应该发生什么:Spark 将尝试序列化任何引用的对象。这意味着在这种情况下 entire JustSpark class - 因为它的一个成员被引用了。
为什么会失败? 您的 class 不是可序列化的。 因此 Spark 无法通过网络发送它。特别是您有对 SparkContext 的引用 - 它不扩展 Serializable
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
所以 - 你的第一个代码 - 只广播变量值 - 是正确的方法。
我会做出一个很好的猜测:这是因为您正在使用 App
。有关详细信息,请参阅 https://issues.apache.org/jira/browse/SPARK-4170。改为编写 main()
方法。
这是广播的原始示例,来自 spark 源,更改为使用列表而不是数组:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object MultiBroadcastTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test")
val sc = new SparkContext(sparkConf)
val slices = if (args.length > 0) args(0).toInt else 2
val num = if (args.length > 1) args(1).toInt else 1000000
val arr1 = (1 to num).toList
val arr2 = (1 to num).toList
val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
(barr1.value.size, barr2.value.size)
}
observedSizes.collect().foreach(i => println(i))
sc.stop()
}}
我在我的环境中编译它并且它有效。
那么有什么区别呢?
有问题的示例使用 extends App
,而原始示例是普通单例。
所以我将代码降级为 "doIt()" 函数
object JustDoSpark extends App{
def doIt() {
...
}
doIt()
你猜怎么着。成功了。
问题确实与序列化有关,但方式不同。将代码放在对象的主体中似乎会导致问题。
我在 mapr 集群上安装了全新的 spark 1.2.1,在测试它时我发现它在本地模式下运行良好,但在 yarn 模式下它似乎无法访问变量,如果播出。准确的说是下面的测试代码
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object JustSpark extends App {
val conf = new org.apache.spark.SparkConf().setAppName("SimpleApplication")
val sc = new SparkContext(conf)
val a = List(1,3,4,5,6)
val b = List("a","b","c")
val bBC= sc.broadcast(b)
val data = sc.parallelize(a)
val transform = data map ( t => { "hi" })
transform.take(3) foreach (println _)
val transformx2 = data map ( t => { bBC.value.size })
transformx2.take(3) foreach (println _)
//val transform2 = data map ( t => { b.size })
//transform2.take(3) foreach (println _)
}
在本地模式下工作,但在 yarn 下失败。更准确地说,transform2
和 transformx2
这两种方法都失败了,如果 --master local[8]
,它们都有效。
我正在用 sbt 编译它并用提交工具发送
/opt/mapr/spark/spark-1.2.1/bin/spark-submit --class JustSpark --master yarn target/scala-2.10/simulator_2.10-1.0.jar
知道发生了什么事吗?失败消息只是声称在它应该访问变量的地方有一个 java 空指针异常。还有其他方法可以在 RDD 映射中传递变量吗?
我认为罪魁祸首是
val transform2 = data map ( t => { b.size })
特别是访问局部变量 b 。您实际上可能会在日志文件中看到 java.io.NotSerializableException .
应该发生什么:Spark 将尝试序列化任何引用的对象。这意味着在这种情况下 entire JustSpark class - 因为它的一个成员被引用了。
为什么会失败? 您的 class 不是可序列化的。 因此 Spark 无法通过网络发送它。特别是您有对 SparkContext 的引用 - 它不扩展 Serializable
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
所以 - 你的第一个代码 - 只广播变量值 - 是正确的方法。
我会做出一个很好的猜测:这是因为您正在使用 App
。有关详细信息,请参阅 https://issues.apache.org/jira/browse/SPARK-4170。改为编写 main()
方法。
这是广播的原始示例,来自 spark 源,更改为使用列表而不是数组:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object MultiBroadcastTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test")
val sc = new SparkContext(sparkConf)
val slices = if (args.length > 0) args(0).toInt else 2
val num = if (args.length > 1) args(1).toInt else 1000000
val arr1 = (1 to num).toList
val arr2 = (1 to num).toList
val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
(barr1.value.size, barr2.value.size)
}
observedSizes.collect().foreach(i => println(i))
sc.stop()
}}
我在我的环境中编译它并且它有效。
那么有什么区别呢?
有问题的示例使用 extends App
,而原始示例是普通单例。
所以我将代码降级为 "doIt()" 函数
object JustDoSpark extends App{
def doIt() {
...
}
doIt()
你猜怎么着。成功了。
问题确实与序列化有关,但方式不同。将代码放在对象的主体中似乎会导致问题。