在 Spark 应用程序中保存 RDD 的元素
Saving elements of RDD in Spark application
我是 运行 集群上的 spark 应用程序。我想对 RDD 中的每个元素执行一些操作并将每个元素保存到文本文件中。
我在 myRDD
上呼叫 foreach
myRDD.foreach(process)
def process(elements):
// some operation that extracts the strings
// and converts to myList
myList = ...
with open("somefile.txt", "a+") as myfile:
print "----SAVED----"
myfile.writelines(myList)
但是,我找不到 somefile.txt,即使我确实发现打印语句没有任何 errors/warning。 somefile.txt 保存在哪里?如果我的方法不正确,如何保存RDD的单个元素?
好的,所以这是有问题的原因是 RDD 不一定在单个节点上。当您调用 foreach
时,您的 RDD 分布在您的节点上。您需要使用 collect
或 use the built in file writer, but this won't modify it.
将 RDD 收集到驱动程序节点
collect
可能更好,但它也是瓶颈,因为现在所有数据都被收集到单个节点(驱动程序节点)。
编辑:我将用一些代码回答您的其他问题...
def process(element):
#process element to a list
return myList
def writeList(myList):
with open('somefile.txt', 'a+') as f:
f.writelines(myList)
#in main
myListRDD = myRDD.map(process)
myListRDD.collect().foreach(writeList)
那应该在高效的同时做你想做的事。因为我们为列表处理到一个新的 RDD,所以我们能够并行进行所有处理,所以唯一的线性操作是文件写入,它需要在单个节点上发生以实现数据一致性。
myRDD.map(convertToList).saveAsTextFile(<hdfs output path>)
使用这种方法,您将能够扩展您的应用程序,如果您必须将所有数据传输到驱动程序中,那么您将保证输出数据足够小以适合驱动程序内存,否则您将开始拥有烦恼。
如果您要求所有数据仅以一个文件结尾,那么(这种方法与将所有输出传输到驱动程序有类似的问题,不可扩展):
myRDD.map(generateList).coalesce(1).saveAsTextFile(<hdfs output path>)
如果您需要在将列表存储到文件之前将其转换为字符串,则:
myRDD.map(generateList).map(listToString).saveAsTextFile(<hdfs output path>)
显然,您可以将列表转换为第一个映射中的字符串并节省额外的步骤。
我是 运行 集群上的 spark 应用程序。我想对 RDD 中的每个元素执行一些操作并将每个元素保存到文本文件中。
我在 myRDD
上呼叫foreach
myRDD.foreach(process)
def process(elements):
// some operation that extracts the strings
// and converts to myList
myList = ...
with open("somefile.txt", "a+") as myfile:
print "----SAVED----"
myfile.writelines(myList)
但是,我找不到 somefile.txt,即使我确实发现打印语句没有任何 errors/warning。 somefile.txt 保存在哪里?如果我的方法不正确,如何保存RDD的单个元素?
好的,所以这是有问题的原因是 RDD 不一定在单个节点上。当您调用 foreach
时,您的 RDD 分布在您的节点上。您需要使用 collect
或 use the built in file writer, but this won't modify it.
collect
可能更好,但它也是瓶颈,因为现在所有数据都被收集到单个节点(驱动程序节点)。
编辑:我将用一些代码回答您的其他问题...
def process(element):
#process element to a list
return myList
def writeList(myList):
with open('somefile.txt', 'a+') as f:
f.writelines(myList)
#in main
myListRDD = myRDD.map(process)
myListRDD.collect().foreach(writeList)
那应该在高效的同时做你想做的事。因为我们为列表处理到一个新的 RDD,所以我们能够并行进行所有处理,所以唯一的线性操作是文件写入,它需要在单个节点上发生以实现数据一致性。
myRDD.map(convertToList).saveAsTextFile(<hdfs output path>)
使用这种方法,您将能够扩展您的应用程序,如果您必须将所有数据传输到驱动程序中,那么您将保证输出数据足够小以适合驱动程序内存,否则您将开始拥有烦恼。
如果您要求所有数据仅以一个文件结尾,那么(这种方法与将所有输出传输到驱动程序有类似的问题,不可扩展):
myRDD.map(generateList).coalesce(1).saveAsTextFile(<hdfs output path>)
如果您需要在将列表存储到文件之前将其转换为字符串,则:
myRDD.map(generateList).map(listToString).saveAsTextFile(<hdfs output path>)
显然,您可以将列表转换为第一个映射中的字符串并节省额外的步骤。