在 Spark 中嵌套并行化?什么是正确的方法?

Nesting parallelizations in Spark? What's the right approach?

嵌套并行化?

假设我正在尝试在 Spark 中执行相当于 "nested for loops" 的操作。类似于常规语言,假设我在内部循环中有一个例程以 the Pi Average Spark example does (see Estimating Pi)

的方式估计 Pi
i = 1000; j = 10^6; counter = 0.0;

for ( int i =0; i < iLimit; i++)
    for ( int j=0; j < jLimit ; j++)
        counter += PiEstimator();

estimateOfAllAverages = counter / i;

我可以在 Spark 中嵌套并行化调用吗?我正在尝试,但还没有解决问题。很乐意 post 错误和代码,但我想我问的是一个更概念性的问题,即这是否是 Spark 中的正确方法。

我已经可以并行化单个 Spark Example / Pi Estimate,现在我想执行 1000 次以查看它是否收敛于 Pi。 (这涉及到我们正在尝试解决的一个更大的问题,如果需要更接近 MVCE 的东西,我很乐意添加)

底线问题我只需要有人直接回答:这是使用嵌套并行化调用的正确方法吗?如果不是请具体指点,谢谢!这是我认为正确方法的伪代码方法:

// use accumulator to keep track of each Pi Estimate result

sparkContext.parallelize(arrayOf1000, slices).map{ Function call

     sparkContext.parallelize(arrayOf10^6, slices).map{
            // do the 10^6 thing here and update accumulator with each result
    }
}

// take average of accumulator to see if all 1000 Pi estimates converge on Pi

背景:我 , after some waffling I decided to post a new question with a different characterization. I also tried to ask this on the Spark User maillist 但那里也没有骰子。在此先感谢您的帮助。

这甚至是不可能的,因为 SparkContext 不可序列化。如果你想要一个嵌套的 for 循环,那么你最好的选择是使用 cartesian

val nestedForRDD = rdd1.cartesian(rdd2)
nestedForRDD.map((rdd1TypeVal, rdd2TypeVal) => {
  //Do your inner-nested evaluation code here
})

请记住,就像双 for 循环一样,这是以大小为代价的。

没有。你不能。

SparkContext 只能从 spark Driver 节点访问。内部 parallelization() 调用将尝试从无权访问 SparkContext 的工作节点执行 SparkContext。

在 Pi 的示例中,在嵌套的 for 循环中,您可以通过对过程进行单个循环 i * j 次并对所有循环求和然后在最后除以 j 来获得相同的答案。如果您有要在外循环中应用的步骤,请在循环内执行它们,但通过为每个内循环组分配特定键来创建不同的组。不知道你想在外循环中做什么样的事情,这里很难举个例子。

对于仅求平均以提高收敛性的简单情况,它相对容易。不要做嵌套循环,只需用 i * j 个元素创建一个 rdd,然后将函数应用于每个元素。

这可能看起来像(使用 pySpark ): ( f 是你想要应用的任何函数,记住它会传递 RDD 中的每个元素,所以即使你不在你的函数中使用它,也要用输入定义你的 f )

x = RandomRDDs.uniformRDD(sc, i*j)
function_values = x.map(f)

from operator import add   
sum_of_values = function_values.reduce(add)
averaged_value = sum_of_values/j (if you are only averaging over the outer loop)

如果您想在外循环中执行操作,我会分配一个索引 (zipWIthIndex),然后使用索引模 j 创建一个键。然后每个不同的键将是一个虚拟的内部循环循环,您可以使用 aggregateByKey、foldByKey 或 reduceByKey 等运算符仅对这些记录执行操作。如果将不同的密钥分发到不同的分区,这可能会对性能造成一些影响。

另一种方法是将 rdd 重新分区到 j 个分区,然后使用 foreachPartition 函数对每个分区应用一个函数。

第三种选择是运行内部循环并行j次,将结果连接到一个分布式文件中,然后在将其读入Spark后进行外部循环操作。