RDD创建和变量绑定

RDD creation and variable binding

我有一个非常简单的代码:

def fun(x, n):
    return (x, n)

rdds = []
for i in range(2):
    rdd = sc.parallelize(range(5*i, 5*(i+1)))
    rdd = rdd.map(lambda x: fun(x, i))
    rdds.append(rdd)

a = sc.union(rdds)
print a.collect()

我原以为输出如下:

[(0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]

但是,输出如下:

[(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1)]

至少可以说,这令人困惑。

看来,由于RDD的惰性评估,用于创建RDD的i的值是调用collect()时它所承载的值,即1(来自for 循环的最后 运行。

现在,元组的两个元素都派生自 i

但看起来,对于元组的第一个元素,i 的值是 0 和 1,而对于元组的第二个元素,i 的值是 2。

有人可以解释一下发生了什么吗?

谢谢。

sc.parallelize() 是一个将立即执行的动作。因此 i 的两个值,即 01 都将被使用。

但在 rdd.map() 的情况下,只有 i 的最后一个值将在您稍后调用 collect() 时使用。

rdd = sc.parallelize(range(5*i, 5*(i+1)))
rdd = rdd.map(lambda x: fun(x, i))

这里rdd.map不会对rdd进行变换,只会创建DAG(Directed Acyclic Graph),即lambda函数不会作用于rdd的元素。

当您调用 collect() 时,将调用 lambda 函数,但此时 i 的值为 1。如果您在调用 collect 之前重新分配 i=10,则 i 将被使用。

只是改变

rdd = rdd.map(lambda x: fun(x, i))

rdd = rdd.map(lambda x, i=i: (x, i))

那只是Python,看看这个

https://docs.python.org/2.7/tutorial/controlflow.html#default-argument-values