pyspark:'PipelinedRDD' 对象不可迭代
pyspark: 'PipelinedRDD' object is not iterable
我收到此错误,但我不知道为什么。
基本上我从这段代码中出错:
a = data.mapPartitions(helper(locations))
其中数据是一个 RDD,我的助手定义为:
def helper(iterator, locations):
for x in iterator:
c = locations[x]
yield c
(位置只是一个数据点数组)
我看不出问题出在哪里,但我也不是 pyspark 的佼佼者所以有人可以告诉我为什么我得到 'PipelinedRDD' object is not iterable from this code?
RDD可以使用map和lambda函数进行迭代。我已经使用以下方法迭代了流水线 RDD
lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")
pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))
pair_result = pairs1.union(pairs2)
pair_result.reduceByKey(lambda a, b: a + ','+ b)
result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]
===> result_ll = [list(elem) for elem in result]
TypeError: 'PipelinedRDD' object is not iterable
我用 map 函数代替了迭代
result_ll = result.map( lambda elem: list(elem))
希望这有助于相应地修改您的代码
我更喜欢另一个问题中的答案 link :
Can not access Pipelined Rdd in pyspark
您不能遍历 RDD,您需要先调用一个操作将数据返回给驱动程序。
快速示例:
`>>> test = sc.parallelize([1,2,3])
>>> for i in test:
... print i
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'RDD' object is not iterable`
但例如您可以使用“.collect()”
`>>> for i in test.collect():
... print i
1
2
3`
我收到此错误,但我不知道为什么。 基本上我从这段代码中出错:
a = data.mapPartitions(helper(locations))
其中数据是一个 RDD,我的助手定义为:
def helper(iterator, locations):
for x in iterator:
c = locations[x]
yield c
(位置只是一个数据点数组) 我看不出问题出在哪里,但我也不是 pyspark 的佼佼者所以有人可以告诉我为什么我得到 'PipelinedRDD' object is not iterable from this code?
RDD可以使用map和lambda函数进行迭代。我已经使用以下方法迭代了流水线 RDD
lines1 = sc.textFile("\..\file1.csv")
lines2 = sc.textFile("\..\file2.csv")
pairs1 = lines1.map(lambda s: (int(s), 'file1'))
pairs2 = lines2.map(lambda s: (int(s), 'file2'))
pair_result = pairs1.union(pairs2)
pair_result.reduceByKey(lambda a, b: a + ','+ b)
result = pair.map(lambda l: tuple(l[:1]) + tuple(l[1].split(',')))
result_ll = [list(elem) for elem in result]
===> result_ll = [list(elem) for elem in result]
TypeError: 'PipelinedRDD' object is not iterable
我用 map 函数代替了迭代
result_ll = result.map( lambda elem: list(elem))
希望这有助于相应地修改您的代码
我更喜欢另一个问题中的答案 link : Can not access Pipelined Rdd in pyspark
您不能遍历 RDD,您需要先调用一个操作将数据返回给驱动程序。 快速示例:
`>>> test = sc.parallelize([1,2,3])
>>> for i in test:
... print i
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'RDD' object is not iterable`
但例如您可以使用“.collect()”
`>>> for i in test.collect():
... print i
1
2
3`