如何从任务中创建 RDD?
How to create RDD from within Task?
通常在从列表创建 RDD 时,您可以只使用 SparkContext.parallelize
方法,但您不能在任务中使用 spark 上下文,因为它不可序列化。我需要从任务中的字符串列表创建 RDD。有办法吗?
我已经尝试在任务中创建一个新的 SparkContext,但它给我一个关于在同一个 JVM 中不支持多个 spark 上下文的错误,我需要设置 spark.driver.allowMultipleContexts = true
。根据 Apache 用户组,that setting however does not yet seem to be supported
就我而言,这是不可能的,这几乎不是序列化或对多个 Spark 上下文的支持的问题。一个基本的限制是核心 Spark 架构。由于 Spark 上下文由驱动程序维护,而任务在工作人员上执行,因此从任务内部创建 RDD 需要将更改从工作人员推送到驱动程序。我并不是说这在技术上是不可能的,但整个想法似乎相当麻烦。
从任务内部创建 Spark 上下文看起来更糟。首先,这意味着上下文是在工人身上创建的,出于所有实际目的,工人之间不会相互交流。每个工作人员都将获得自己的上下文,该上下文只能对给定工作人员可访问的数据进行操作。最后,保留 worker 状态绝对不是契约的一部分,因此在任务中创建的任何上下文都应该在任务完成后简单地进行垃圾回收。
如果无法使用多个作业来处理问题,您可以尝试使用 mapPartitions
,如下所示:
val rdd = sc.parallelize(1 to 100)
val tmp = rdd.mapPartitions(iter => {
val results = Map(
"odd" -> scala.collection.mutable.ArrayBuffer.empty[Int],
"even" -> scala.collection.mutable.ArrayBuffer.empty[Int]
)
for(i <- iter) {
if (i % 2 != 0) results("odd") += i
else results("even") += i
}
Iterator(results)
})
val odd = tmp.flatMap(_("odd"))
val even = tmp.flatMap(_("even"))
通常在从列表创建 RDD 时,您可以只使用 SparkContext.parallelize
方法,但您不能在任务中使用 spark 上下文,因为它不可序列化。我需要从任务中的字符串列表创建 RDD。有办法吗?
我已经尝试在任务中创建一个新的 SparkContext,但它给我一个关于在同一个 JVM 中不支持多个 spark 上下文的错误,我需要设置 spark.driver.allowMultipleContexts = true
。根据 Apache 用户组,that setting however does not yet seem to be supported
就我而言,这是不可能的,这几乎不是序列化或对多个 Spark 上下文的支持的问题。一个基本的限制是核心 Spark 架构。由于 Spark 上下文由驱动程序维护,而任务在工作人员上执行,因此从任务内部创建 RDD 需要将更改从工作人员推送到驱动程序。我并不是说这在技术上是不可能的,但整个想法似乎相当麻烦。
从任务内部创建 Spark 上下文看起来更糟。首先,这意味着上下文是在工人身上创建的,出于所有实际目的,工人之间不会相互交流。每个工作人员都将获得自己的上下文,该上下文只能对给定工作人员可访问的数据进行操作。最后,保留 worker 状态绝对不是契约的一部分,因此在任务中创建的任何上下文都应该在任务完成后简单地进行垃圾回收。
如果无法使用多个作业来处理问题,您可以尝试使用 mapPartitions
,如下所示:
val rdd = sc.parallelize(1 to 100)
val tmp = rdd.mapPartitions(iter => {
val results = Map(
"odd" -> scala.collection.mutable.ArrayBuffer.empty[Int],
"even" -> scala.collection.mutable.ArrayBuffer.empty[Int]
)
for(i <- iter) {
if (i % 2 != 0) results("odd") += i
else results("even") += i
}
Iterator(results)
})
val odd = tmp.flatMap(_("odd"))
val even = tmp.flatMap(_("even"))