RDD转换图,Python
RDD transformation map, Python
是否可以将Spark的map方法中的所有元素都转换为浮点数(double),除了第一个而不用for循环进行迭代?在伪代码中是这样的:
input = sc.textFile('file.csv').map(lambda line: line.split(',')) #create a rdd<list>
test = input.map(lambda line: line[0] else float(line)) #convert all elements of the list to float excepted the first one
虽然可以说这不是一个好的做法,但这是可能的。 RDD 是 objects 的同构 collection。如果您期望某种 header ,最好将其丢弃而不是一直拖到它。不过你可以尝试这样的事情:
from itertools import islice
# Dummy data
with open("/tmp/foo", "w") as fw:
fw.writelines(["foo", "1.0", "2.0", "3.0"])
def process_part(i, iter):
if i == 0:
# We could use enumerate as well
for x in islice(iter, 1):
yield x
for x in iter:
yield float(x)
(sc.textFile("foo.txt")
.mapPartitionsWithIndex(process_part)
.collect())
## ['"foo"', 1.0, 2.0, 3.0, 4.0]
如果您期望空分区,您首先计算元素:
rdd.mapPartitionsWithIndex(lambda i, iter: [(i, sum(1 for _ in iter))]).collect()
并将 0 替换为第一个 non-empty 分区的索引。
是否可以将Spark的map方法中的所有元素都转换为浮点数(double),除了第一个而不用for循环进行迭代?在伪代码中是这样的:
input = sc.textFile('file.csv').map(lambda line: line.split(',')) #create a rdd<list>
test = input.map(lambda line: line[0] else float(line)) #convert all elements of the list to float excepted the first one
虽然可以说这不是一个好的做法,但这是可能的。 RDD 是 objects 的同构 collection。如果您期望某种 header ,最好将其丢弃而不是一直拖到它。不过你可以尝试这样的事情:
from itertools import islice
# Dummy data
with open("/tmp/foo", "w") as fw:
fw.writelines(["foo", "1.0", "2.0", "3.0"])
def process_part(i, iter):
if i == 0:
# We could use enumerate as well
for x in islice(iter, 1):
yield x
for x in iter:
yield float(x)
(sc.textFile("foo.txt")
.mapPartitionsWithIndex(process_part)
.collect())
## ['"foo"', 1.0, 2.0, 3.0, 4.0]
如果您期望空分区,您首先计算元素:
rdd.mapPartitionsWithIndex(lambda i, iter: [(i, sum(1 for _ in iter))]).collect()
并将 0 替换为第一个 non-empty 分区的索引。